Files
SecureCheck/securecheck/tasks.py
2026-04-05 18:56:26 +02:00

450 lines
18 KiB
Python

from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from .assets import asset_text
from .executor import ExecutionContext, SecureCheckError
from .models import TaskDefinition, TaskResult
P10K_REMOTE_URL = "https://git.h3campus.fr/Johnny/Install_zsh/raw/branch/main/.p10k.zsh"
P10K_THEME_GIT_URL = "https://github.com/romkatv/powerlevel10k.git"
def _result(
context: ExecutionContext,
task: TaskDefinition,
started_at: datetime,
*,
changed: bool,
details: list[str] | None = None,
) -> TaskResult:
return context.make_result(task, success=True, changed=changed, started_at=started_at, details=details or [])
def _write_report(context: ExecutionContext, name: str, content: str) -> Path:
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
report_path = context.paths.report_dir / f"{timestamp}-{name}.log"
context.runner.write_text_file(report_path, content, mode=0o640, requires_root=False)
return report_path
def _append_package_details(context: ExecutionContext, details: list[str], report) -> bool:
changed = report.changed
added_label = "Seraient ajoutés" if context.dry_run else "Ajoutés"
if report.already_present:
details.append(f"Déjà présents: {', '.join(report.already_present)}")
if report.installed:
details.append(f"{added_label}: {', '.join(report.installed)}")
return changed
def system_update(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
context.runner.update_package_index()
context.runner.upgrade_system()
details = ["Index des paquets rafraîchi", "Mises à jour système appliquées"]
return _result(context, task, started_at, changed=True, details=details)
def automatic_updates(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
manager = context.system.package_manager
changed = False
details: list[str] = []
if manager == "apt-get":
pkg_report = context.runner.ensure_packages_report(["unattended-upgrades", "apt-listchanges"])
changed |= _append_package_details(context, details, pkg_report)
content_20 = """APT::Periodic::Update-Package-Lists "1";
APT::Periodic::Download-Upgradeable-Packages "1";
APT::Periodic::AutocleanInterval "7";
APT::Periodic::Unattended-Upgrade "1";
"""
content_52 = """Unattended-Upgrade::Automatic-Reboot "false";
Unattended-Upgrade::Automatic-Reboot-Time "03:30";
Unattended-Upgrade::Remove-Unused-Dependencies "true";
Unattended-Upgrade::Remove-New-Unused-Dependencies "true";
"""
changed |= context.runner.write_text_file(Path("/etc/apt/apt.conf.d/20auto-upgrades"), content_20, requires_root=True)
changed |= context.runner.write_text_file(
Path("/etc/apt/apt.conf.d/52securecheck-unattended-upgrades"),
content_52,
requires_root=True,
)
context.runner.enable_service("unattended-upgrades.service")
details.append("Mises à jour automatiques APT configurées")
elif manager in {"dnf", "yum"}:
pkg_report = context.runner.ensure_packages_report(["dnf-automatic"])
changed |= _append_package_details(context, details, pkg_report)
changed |= context.runner.write_text_file(
Path("/etc/dnf/automatic.conf"),
"""[commands]
apply_updates = yes
upgrade_type = default
[emitters]
emit_via = stdio
system_name = securecheck
""",
requires_root=True,
)
context.runner.enable_service("dnf-automatic.timer")
details.append("Mises à jour automatiques DNF configurées")
else:
raise SecureCheckError("Les mises à jour automatiques ne sont pas prises en charge sur ce système")
return _result(context, task, started_at, changed=changed, details=details)
def lynis_audit(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
details: list[str] = []
pkg_report = context.runner.ensure_packages_report(["lynis"])
changed = _append_package_details(context, details, pkg_report)
result = context.runner.run(["lynis", "audit", "system", "--quick"], requires_root=True, check=False)
report_body = "\n".join(
[
"=== SecureCheck / Lynis ===",
f"Return code: {result.returncode}",
"",
result.stdout,
result.stderr,
]
).strip() + "\n"
report_path = _write_report(context, "lynis", report_body)
details.append(f"Rapport Lynis: {report_path}")
success = result.returncode == 0
return context.make_result(task, success=success, changed=changed, started_at=started_at, details=details, error=None if success else "Lynis a remonté une erreur")
def rootkit_check(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
details: list[str] = []
pkg_report = context.runner.ensure_packages_report(["rkhunter", "chkrootkit"])
changed = _append_package_details(context, details, pkg_report)
update_result = context.runner.run(["rkhunter", "--update"], requires_root=True, check=False)
details.append(f"rkhunter update rc={update_result.returncode}")
propupd_result = context.runner.run(["rkhunter", "--propupd"], requires_root=True, check=False)
details.append(f"rkhunter propupd rc={propupd_result.returncode}")
rkhunter_result = context.runner.run(
["rkhunter", "--check", "--skip-keypress", "--report-warnings-only"],
requires_root=True,
check=False,
)
chkrootkit_result = context.runner.run(["chkrootkit", "-q"], requires_root=True, check=False)
report_payload = {
"rkhunter_check_returncode": rkhunter_result.returncode,
"chkrootkit_returncode": chkrootkit_result.returncode,
"rkhunter_stdout": rkhunter_result.stdout,
"rkhunter_stderr": rkhunter_result.stderr,
"chkrootkit_stdout": chkrootkit_result.stdout,
"chkrootkit_stderr": chkrootkit_result.stderr,
}
report_path = context.paths.report_dir / f"{datetime.now().strftime('%Y%m%d-%H%M%S')}-rootkit-report.json"
context.runner.write_text_file(report_path, json.dumps(report_payload, indent=2) + "\n", mode=0o640, requires_root=False)
details.append(f"Rapport rootkits: {report_path}")
success = rkhunter_result.returncode == 0 and chkrootkit_result.returncode == 0
return context.make_result(task, success=success, changed=changed, started_at=started_at, details=details, error=None if success else "Vérification rootkit incomplète ou avec alertes")
def log_rotation(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
details: list[str] = []
pkg_report = context.runner.ensure_packages_report(["logrotate"])
changed = _append_package_details(context, details, pkg_report)
log_target = "/var/log/securecheck/*.log"
report_target = "/var/log/securecheck/reports/*"
content = f"""{log_target} {report_target} {{
rotate 7
daily
missingok
notifempty
compress
delaycompress
copytruncate
create 0640 root adm
}}
"""
changed |= context.runner.write_text_file(Path("/etc/logrotate.d/securecheck"), content, requires_root=True)
details.append("Rotation des logs SecureCheck configurée")
return _result(context, task, started_at, changed=changed, details=details)
def zsh_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
manager = context.system.package_manager
details: list[str] = []
packages = ["zsh", "git", "curl"]
if manager == "apt-get":
packages += ["zsh-autosuggestions", "zsh-syntax-highlighting"]
for optional in ("zsh-theme-powerlevel10k", "fonts-powerline"):
if context.runner.package_available(optional):
packages.append(optional)
elif manager == "pacman":
for optional in ("zsh-autosuggestions", "zsh-syntax-highlighting", "zsh-theme-powerlevel10k"):
if context.runner.package_available(optional):
packages.append(optional)
pkg_report = context.runner.ensure_packages_report(packages)
changed = _append_package_details(context, details, pkg_report)
try:
p10k_content = context.runner.download_text(P10K_REMOTE_URL)
p10k_source = f"source distante: {P10K_REMOTE_URL}"
except Exception: # noqa: BLE001
p10k_content = asset_text("p10k.zsh")
p10k_source = "source locale embarquée: assets/p10k.zsh"
if not p10k_content:
p10k_content = asset_text("p10k.zsh")
p10k_source = "source locale embarquée: assets/p10k.zsh"
zshrc_path = context.system.target_home / ".zshrc"
p10k_path = context.system.target_home / ".p10k.zsh"
theme_repo_path = context.system.target_home / ".powerlevel10k"
theme_system_paths = [
Path("/usr/share/powerlevel10k/powerlevel10k.zsh-theme"),
Path("/usr/share/zsh-theme-powerlevel10k/powerlevel10k.zsh-theme"),
]
if not any(path.exists() for path in theme_system_paths):
if not theme_repo_path.exists():
context.runner.run(
["git", "clone", "--depth=1", P10K_THEME_GIT_URL, str(theme_repo_path)],
run_as_user=context.system.target_user,
)
changed = True
details.append(f"Theme powerlevel10k cloné dans {theme_repo_path}")
else:
details.append(f"Theme powerlevel10k déjà présent dans {theme_repo_path}")
zshrc_content = """# Fichier généré par SecureCheck
if [[ -r "${XDG_CACHE_HOME:-$HOME/.cache}/p10k-instant-prompt-${(%):-%n}.zsh" ]]; then
source "${XDG_CACHE_HOME:-$HOME/.cache}/p10k-instant-prompt-${(%):-%n}.zsh"
fi
export HISTFILE="$HOME/.zsh_history"
export HISTSIZE=10000
export SAVEHIST=10000
setopt appendhistory
setopt histignoredups
setopt sharehistory
setopt autocd
autoload -Uz compinit
compinit
if [ -f /usr/share/powerlevel10k/powerlevel10k.zsh-theme ]; then
source /usr/share/powerlevel10k/powerlevel10k.zsh-theme
elif [ -f /usr/share/zsh-theme-powerlevel10k/powerlevel10k.zsh-theme ]; then
source /usr/share/zsh-theme-powerlevel10k/powerlevel10k.zsh-theme
elif [ -f "$HOME/.powerlevel10k/powerlevel10k.zsh-theme" ]; then
source "$HOME/.powerlevel10k/powerlevel10k.zsh-theme"
fi
if [ -f /usr/share/zsh-autosuggestions/zsh-autosuggestions.zsh ]; then
source /usr/share/zsh-autosuggestions/zsh-autosuggestions.zsh
fi
if [ -f /usr/share/zsh-syntax-highlighting/zsh-syntax-highlighting.zsh ]; then
source /usr/share/zsh-syntax-highlighting/zsh-syntax-highlighting.zsh
fi
bindkey '^[[A' history-search-backward
bindkey '^[[B' history-search-forward
alias ll='ls -alF'
alias la='ls -A'
alias l='ls -CF'
alias update-system='sudo apt-get update && sudo apt-get dist-upgrade -y'
[[ -f "$HOME/.p10k.zsh" ]] && source "$HOME/.p10k.zsh"
"""
changed |= context.runner.write_text_file(
p10k_path,
p10k_content,
mode=0o644,
owner_uid=context.system.target_uid,
owner_gid=context.system.target_gid,
)
changed |= context.runner.write_text_file(
zshrc_path,
zshrc_content,
mode=0o644,
owner_uid=context.system.target_uid,
owner_gid=context.system.target_gid,
)
zsh_path = "/usr/bin/zsh" if Path("/usr/bin/zsh").exists() else "/bin/zsh"
if Path(zsh_path).exists():
changed |= context.runner.ensure_user_shell(zsh_path)
details.append(f"Configuration zsh appliquée pour {context.system.target_user}")
details.append(f"Fichier p10k copié vers {p10k_path}")
details.append(p10k_source)
return _result(context, task, started_at, changed=changed, details=details)
def utilities_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
manager = context.system.package_manager
if manager == "apt-get":
packages = [
"ncdu",
"needrestart",
"git",
"curl",
"fail2ban",
"htop",
"nmon",
"duf",
"net-tools",
"tmux",
"tree",
"vim",
"ca-certificates",
]
elif manager in {"dnf", "yum"}:
packages = ["ncdu", "git", "curl", "fail2ban", "htop", "nmon", "duf", "net-tools", "tmux", "tree", "vim-enhanced"]
else:
packages = ["ncdu", "git", "curl", "htop", "nmon", "duf", "net-tools", "tmux", "tree", "vim"]
details: list[str] = []
pkg_report = context.runner.ensure_packages_report(packages)
changed = _append_package_details(context, details, pkg_report)
if context.runner.command_exists("systemctl") and context.runner.command_exists("fail2ban-client"):
context.runner.enable_service("fail2ban.service")
details.append("Utilitaires système et sécurité installés / vérifiés")
return _result(context, task, started_at, changed=changed, details=details)
def zram_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
changed = False
details: list[str] = []
ram_mb = context.runner.read_memory_mb()
zram_mb = max(512, ram_mb // 2)
defaults_content = f"""# Géré par SecureCheck
ALGO=zstd
PERCENT=50
PRIORITY=100
ZRAM_SIZE={zram_mb}
"""
start_script = """#!/bin/sh
set -eu
modprobe zram || true
sleep 1
echo zstd > /sys/block/zram0/comp_algorithm
echo ${ZRAM_SIZE}M > /sys/block/zram0/disksize
mkswap /dev/zram0
swapon -p 100 /dev/zram0
"""
stop_script = """#!/bin/sh
swapoff /dev/zram0 2>/dev/null || true
echo 1 > /sys/block/zram0/reset 2>/dev/null || true
rmmod zram 2>/dev/null || true
"""
service_content = """[Unit]
Description=SecureCheck zram swap
After=local-fs.target
Before=swap.target
[Service]
Type=oneshot
RemainAfterExit=yes
EnvironmentFile=/etc/default/securecheck-zram
ExecStart=/usr/local/bin/securecheck-zram-start
ExecStop=/usr/local/bin/securecheck-zram-stop
[Install]
WantedBy=multi-user.target
"""
changed |= context.runner.write_text_file(Path("/etc/default/securecheck-zram"), defaults_content, requires_root=True)
changed |= context.runner.write_text_file(Path("/usr/local/bin/securecheck-zram-start"), start_script, mode=0o755, requires_root=True)
changed |= context.runner.write_text_file(Path("/usr/local/bin/securecheck-zram-stop"), stop_script, mode=0o755, requires_root=True)
changed |= context.runner.write_text_file(Path("/etc/systemd/system/securecheck-zram.service"), service_content, requires_root=True)
context.runner.run(["systemctl", "daemon-reload"], requires_root=True)
context.runner.enable_service("securecheck-zram.service")
details.append(f"zram configuré à {zram_mb} Mo")
return _result(context, task, started_at, changed=changed, details=details)
def firewall_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
manager = context.system.package_manager
changed = False
details: list[str] = []
if manager == "apt-get":
pkg_report = context.runner.ensure_packages_report(["ufw"])
changed |= _append_package_details(context, details, pkg_report)
context.runner.run(["ufw", "default", "deny", "incoming"], requires_root=True)
context.runner.run(["ufw", "default", "allow", "outgoing"], requires_root=True)
ssh_rule = context.runner.run(["ufw", "status"], requires_root=True, check=False)
if "22/tcp" not in ssh_rule.stdout and "OpenSSH" not in ssh_rule.stdout:
context.runner.run(["ufw", "allow", "22/tcp"], requires_root=True)
changed = True
context.runner.run(["ufw", "--force", "enable"], requires_root=True)
details.append("Pare-feu UFW activé")
elif manager in {"dnf", "yum"}:
pkg_report = context.runner.ensure_packages_report(["firewalld"])
changed |= _append_package_details(context, details, pkg_report)
context.runner.enable_service("firewalld.service")
context.runner.run(["firewall-cmd", "--permanent", "--add-service=ssh"], requires_root=True)
context.runner.run(["firewall-cmd", "--reload"], requires_root=True)
details.append("Pare-feu firewalld activé")
else:
raise SecureCheckError("Pare-feu automatique non pris en charge sur ce système")
return _result(context, task, started_at, changed=changed, details=details)
def docker_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
manager = context.system.package_manager
changed = False
details: list[str] = []
if manager == "apt-get":
pkg_report = context.runner.ensure_packages_report(["docker.io", "docker-compose-v2"])
changed |= _append_package_details(context, details, pkg_report)
elif manager in {"dnf", "yum", "pacman"}:
pkg_report = context.runner.ensure_packages_report(["docker"])
changed |= _append_package_details(context, details, pkg_report)
else:
raise SecureCheckError("Docker n'est pas pris en charge sur ce système")
daemon_payload = {
"log-driver": "json-file",
"log-opts": {
"max-size": "10m",
"max-file": "3",
},
}
changed |= context.runner.write_json_file(Path("/etc/docker/daemon.json"), daemon_payload, requires_root=True)
context.runner.enable_service("docker.service")
context.runner.run(["usermod", "-aG", "docker", context.system.target_user], requires_root=True, check=False)
version_result = context.runner.run(["docker", "--version"], requires_root=False, check=False)
details.append(version_result.stdout.strip() or "Docker installé / vérifié")
return _result(context, task, started_at, changed=changed, details=details)
def bind(task: TaskDefinition, func) -> TaskDefinition:
return TaskDefinition(
key=task.key,
label=task.label,
description=task.description,
category=task.category,
requires_root=task.requires_root,
default_selected=task.default_selected,
handler=lambda context, _task=task, _func=func: _func(context, _task),
)