from __future__ import annotations import json import os import re import stat import tempfile from datetime import datetime from pathlib import Path from .assets import asset_text from .executor import CommandResult, ExecutionContext, SecureCheckError from .models import TaskDefinition, TaskResult P10K_REMOTE_URL = "https://git.h3campus.fr/Johnny/Install_zsh/raw/branch/main/.p10k.zsh" P10K_THEME_GIT_URL = "https://github.com/romkatv/powerlevel10k.git" AIDE_DEFAULT_CONF = """database=file:/var/lib/aide/aide.db gzip_dbout=yes group=default dbinfo=file:/var/lib/aide/aide.db dbinfo=file:/var/lib/aide/aide.db.gz verbose=5 report_url=file:/var/log/aide/aide.log checksum=sha512 file = p+u+g+s+m+acl+selinux+xattrs+sha512 """ def _result( context: ExecutionContext, task: TaskDefinition, started_at: datetime, *, changed: bool, details: list[str] | None = None, ) -> TaskResult: return context.make_result(task, success=True, changed=changed, started_at=started_at, details=details or []) def _write_report(context: ExecutionContext, name: str, content: str) -> Path: timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") report_path = context.paths.report_dir / f"{timestamp}-{name}.log" context.runner.write_text_file(report_path, content, mode=0o640, requires_root=False) return report_path def _append_package_details(context: ExecutionContext, details: list[str], report) -> bool: changed = report.changed added_label = "Seraient ajoutés" if context.dry_run else "Ajoutés" if report.already_present: details.append(f"Déjà présents: {', '.join(report.already_present)}") if report.installed: details.append(f"{added_label}: {', '.join(report.installed)}") return changed def system_update(context: ExecutionContext, task: TaskDefinition) -> TaskResult: started_at = datetime.now() context.runner.update_package_index() context.runner.upgrade_system() details = ["Index des paquets rafraîchi", "Mises à jour système appliquées"] return _result(context, task, started_at, changed=True, details=details) def automatic_updates(context: ExecutionContext, task: TaskDefinition) -> TaskResult: started_at = datetime.now() manager = context.system.package_manager changed = False details: list[str] = [] if manager == "apt-get": pkg_report = context.runner.ensure_packages_report(["unattended-upgrades", "apt-listchanges"]) changed |= _append_package_details(context, details, pkg_report) content_20 = """APT::Periodic::Update-Package-Lists "1"; APT::Periodic::Download-Upgradeable-Packages "1"; APT::Periodic::AutocleanInterval "7"; APT::Periodic::Unattended-Upgrade "1"; """ content_52 = """Unattended-Upgrade::Automatic-Reboot "false"; Unattended-Upgrade::Automatic-Reboot-Time "03:30"; Unattended-Upgrade::Remove-Unused-Dependencies "true"; Unattended-Upgrade::Remove-New-Unused-Dependencies "true"; """ changed |= context.runner.write_text_file(Path("/etc/apt/apt.conf.d/20auto-upgrades"), content_20, requires_root=True) changed |= context.runner.write_text_file( Path("/etc/apt/apt.conf.d/52securecheck-unattended-upgrades"), content_52, requires_root=True, ) context.runner.enable_service("unattended-upgrades.service") details.append("Mises à jour automatiques APT configurées") elif manager in {"dnf", "yum"}: pkg_report = context.runner.ensure_packages_report(["dnf-automatic"]) changed |= _append_package_details(context, details, pkg_report) changed |= context.runner.write_text_file( Path("/etc/dnf/automatic.conf"), """[commands] apply_updates = yes upgrade_type = default [emitters] emit_via = stdio system_name = securecheck """, requires_root=True, ) context.runner.enable_service("dnf-automatic.timer") details.append("Mises à jour automatiques DNF configurées") else: raise SecureCheckError("Les mises à jour automatiques ne sont pas prises en charge sur ce système") return _result(context, task, started_at, changed=changed, details=details) def _parse_lynis_result(result: CommandResult) -> tuple[int | None, int | None, list[str]]: score = None hardening = None issues: list[str] = [] for line in result.stdout.splitlines(): stripped = line.strip() lowered = stripped.lower() if match := re.search(r"final score\s*:\s*(\d+)", stripped, re.IGNORECASE): score = int(match.group(1)) elif match := re.search(r"hardening index score\s*:\s*(\d+)", stripped, re.IGNORECASE): hardening = int(match.group(1)) if any(keyword in lowered for keyword in ("warning", "suggest", "recommend", "failed", "error")): if stripped and not stripped.startswith("Tip"): issues.append(stripped) return score, hardening, sorted(set(issues)) def lynis_audit(context: ExecutionContext, task: TaskDefinition) -> TaskResult: started_at = datetime.now() details: list[str] = [] pkg_report = context.runner.ensure_packages_report(["lynis"]) changed = _append_package_details(context, details, pkg_report) result = context.runner.run(["lynis", "audit", "system", "--quick"], requires_root=True, check=False) report_body = "\n".join( [ "=== SecureCheck / Lynis ===", f"Return code: {result.returncode}", "", result.stdout, result.stderr, ] ).strip() + "\n" report_path = _write_report(context, "lynis", report_body) details.append(f"Rapport Lynis: {report_path}") score, hardening, issues = _parse_lynis_result(result) if score is not None: details.append(f"Score Lynis: {score}") if hardening is not None: details.append(f"Hardening index: {hardening}") if issues: details.append("Modifications recommandées Lynis :") details.extend(f" • {issue}" for issue in issues[:10]) success = result.returncode == 0 and not issues return context.make_result( task, success=success, changed=changed, started_at=started_at, details=details, error=None if success else "Lynis a détecté des recommandations", ) def rootkit_check(context: ExecutionContext, task: TaskDefinition) -> TaskResult: started_at = datetime.now() details: list[str] = [] pkg_report = context.runner.ensure_packages_report(["rkhunter", "chkrootkit"]) changed = _append_package_details(context, details, pkg_report) update_result = context.runner.run(["rkhunter", "--update"], requires_root=True, check=False) details.append(f"rkhunter update rc={update_result.returncode}") propupd_result = context.runner.run(["rkhunter", "--propupd"], requires_root=True, check=False) details.append(f"rkhunter propupd rc={propupd_result.returncode}") rkhunter_result = context.runner.run( ["rkhunter", "--check", "--skip-keypress", "--report-warnings-only"], requires_root=True, check=False, ) chkrootkit_result = context.runner.run(["chkrootkit", "-q"], requires_root=True, check=False) report_payload = { "rkhunter_check_returncode": rkhunter_result.returncode, "chkrootkit_returncode": chkrootkit_result.returncode, "rkhunter_stdout": rkhunter_result.stdout, "rkhunter_stderr": rkhunter_result.stderr, "chkrootkit_stdout": chkrootkit_result.stdout, "chkrootkit_stderr": chkrootkit_result.stderr, } report_path = context.paths.report_dir / f"{datetime.now().strftime('%Y%m%d-%H%M%S')}-rootkit-report.json" context.runner.write_text_file(report_path, json.dumps(report_payload, indent=2) + "\n", mode=0o640, requires_root=False) details.append(f"Rapport rootkits: {report_path}") # rkhunter: rc=0 clean, rc=1 warnings trouvés, rc=2+ erreur critique (outil n'a pas pu tourner) rkhunter_ran = rkhunter_result.returncode <= 1 rkhunter_warnings = [line for line in rkhunter_result.stdout.splitlines() if line.startswith("Warning:")] if rkhunter_warnings: details.append(f"rkhunter: {len(rkhunter_warnings)} warning(s) à vérifier dans le rapport") # chkrootkit: rc=0 signifie que l'outil a tourné (pas qu'il n'y a pas d'infection — les INFECTED sont dans stdout) chkrootkit_ran = chkrootkit_result.returncode == 0 infected_lines = [line for line in chkrootkit_result.stdout.splitlines() if "INFECTED" in line] if infected_lines: details.append(f"chkrootkit: {len(infected_lines)} détection(s) à vérifier dans le rapport") success = rkhunter_ran and chkrootkit_ran error = None if success else "Les outils de vérification n'ont pas pu s'exécuter correctement" return context.make_result(task, success=success, changed=changed, started_at=started_at, details=details, error=error) def log_rotation(context: ExecutionContext, task: TaskDefinition) -> TaskResult: started_at = datetime.now() details: list[str] = [] pkg_report = context.runner.ensure_packages_report(["logrotate"]) changed = _append_package_details(context, details, pkg_report) log_target = "/var/log/securecheck/*.log" report_target = "/var/log/securecheck/reports/*" content = f"""{log_target} {report_target} {{ rotate 7 daily missingok notifempty compress delaycompress copytruncate create 0640 root adm }} """ changed |= context.runner.write_text_file(Path("/etc/logrotate.d/securecheck"), content, requires_root=True) details.append("Rotation des logs SecureCheck configurée") return _result(context, task, started_at, changed=changed, details=details) def zsh_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult: started_at = datetime.now() manager = context.system.package_manager details: list[str] = [] packages = ["zsh", "git", "curl"] if manager == "apt-get": packages += ["zsh-autosuggestions", "zsh-syntax-highlighting"] for optional in ("zsh-theme-powerlevel10k", "fonts-powerline"): if context.runner.package_available(optional): packages.append(optional) elif manager == "pacman": for optional in ("zsh-autosuggestions", "zsh-syntax-highlighting", "zsh-theme-powerlevel10k"): if context.runner.package_available(optional): packages.append(optional) pkg_report = context.runner.ensure_packages_report(packages) changed = _append_package_details(context, details, pkg_report) try: p10k_content = context.runner.download_text(P10K_REMOTE_URL) p10k_source = f"source distante: {P10K_REMOTE_URL}" except Exception: # noqa: BLE001 p10k_content = asset_text("p10k.zsh") p10k_source = "source locale embarquée: assets/p10k.zsh" if not p10k_content: p10k_content = asset_text("p10k.zsh") p10k_source = "source locale embarquée: assets/p10k.zsh" zshrc_path = context.system.target_home / ".zshrc" p10k_path = context.system.target_home / ".p10k.zsh" theme_repo_path = context.system.target_home / ".powerlevel10k" theme_system_paths = [ Path("/usr/share/powerlevel10k/powerlevel10k.zsh-theme"), Path("/usr/share/zsh-theme-powerlevel10k/powerlevel10k.zsh-theme"), ] if not any(path.exists() for path in theme_system_paths): if not theme_repo_path.exists(): context.runner.run( ["git", "clone", "--depth=1", P10K_THEME_GIT_URL, str(theme_repo_path)], run_as_user=context.system.target_user, ) changed = True details.append(f"Theme powerlevel10k cloné dans {theme_repo_path}") else: details.append(f"Theme powerlevel10k déjà présent dans {theme_repo_path}") zshrc_content = """# Fichier généré par SecureCheck if [[ -r "${XDG_CACHE_HOME:-$HOME/.cache}/p10k-instant-prompt-${(%):-%n}.zsh" ]]; then source "${XDG_CACHE_HOME:-$HOME/.cache}/p10k-instant-prompt-${(%):-%n}.zsh" fi export HISTFILE="$HOME/.zsh_history" export HISTSIZE=10000 export SAVEHIST=10000 setopt appendhistory setopt histignoredups setopt sharehistory setopt autocd autoload -Uz compinit compinit if [ -f /usr/share/powerlevel10k/powerlevel10k.zsh-theme ]; then source /usr/share/powerlevel10k/powerlevel10k.zsh-theme elif [ -f /usr/share/zsh-theme-powerlevel10k/powerlevel10k.zsh-theme ]; then source /usr/share/zsh-theme-powerlevel10k/powerlevel10k.zsh-theme elif [ -f "$HOME/.powerlevel10k/powerlevel10k.zsh-theme" ]; then source "$HOME/.powerlevel10k/powerlevel10k.zsh-theme" fi if [ -f /usr/share/zsh-autosuggestions/zsh-autosuggestions.zsh ]; then source /usr/share/zsh-autosuggestions/zsh-autosuggestions.zsh fi if [ -f /usr/share/zsh-syntax-highlighting/zsh-syntax-highlighting.zsh ]; then source /usr/share/zsh-syntax-highlighting/zsh-syntax-highlighting.zsh fi bindkey '^[[A' history-search-backward bindkey '^[[B' history-search-forward alias ll='ls -alF' alias la='ls -A' alias l='ls -CF' alias update-system='sudo apt-get update && sudo apt-get dist-upgrade -y' [[ -f "$HOME/.p10k.zsh" ]] && source "$HOME/.p10k.zsh" """ changed |= context.runner.write_text_file( p10k_path, p10k_content, mode=0o644, owner_uid=context.system.target_uid, owner_gid=context.system.target_gid, ) changed |= context.runner.write_text_file( zshrc_path, zshrc_content, mode=0o644, owner_uid=context.system.target_uid, owner_gid=context.system.target_gid, ) zsh_path = "/usr/bin/zsh" if Path("/usr/bin/zsh").exists() else "/bin/zsh" if Path(zsh_path).exists(): changed |= context.runner.ensure_user_shell(zsh_path) details.append(f"Configuration zsh appliquée pour {context.system.target_user}") details.append(f"Fichier p10k copié vers {p10k_path}") details.append(p10k_source) return _result(context, task, started_at, changed=changed, details=details) def utilities_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult: started_at = datetime.now() manager = context.system.package_manager if manager == "apt-get": packages = [ "ncdu", "needrestart", "git", "curl", "fail2ban", "htop", "nmon", "duf", "net-tools", "tmux", "tree", "vim", "ca-certificates", ] elif manager in {"dnf", "yum"}: packages = [ "ncdu", "git", "curl", "fail2ban", "htop", "nmon", "duf", "net-tools", "tmux", "tree", "vim-enhanced", "libpam-tmpdir", "clamav", "apparmor", "wazuh-agent", "aide", "aide-common", ] else: packages = [ "ncdu", "git", "curl", "htop", "nmon", "duf", "net-tools", "tmux", "tree", "vim", "libpam-tmpdir", "clamav", "apparmor", "wazuh-agent", "aide", "aide-common", ] details: list[str] = [] pkg_report = context.runner.ensure_packages_report(packages) changed = _append_package_details(context, details, pkg_report) if context.runner.command_exists("systemctl") and context.runner.command_exists("fail2ban-client"): context.runner.enable_service("fail2ban.service") if context.runner.command_exists("systemctl") and context.runner.command_exists("avahi-daemon"): context.runner.run(["systemctl", "disable", "--now", "avahi-daemon"], requires_root=True, check=False) details.append("Service avahi-daemon stoppé/désactivé") if context.runner.package_available("apparmor") or context.runner.command_exists("apparmor_status"): context.runner.run(["systemctl", "enable", "--now", "apparmor"], requires_root=True, check=False) details.append("AppArmor activé") if context.runner.package_available("clamav") or context.runner.command_exists("clamd"): context.runner.run(["systemctl", "enable", "--now", "clamav-freshclam"], requires_root=True, check=False) context.runner.run(["systemctl", "enable", "--now", "clamav-daemon"], requires_root=True, check=False) details.append("ClamAV (daemon + freshclam) activé") if context.runner.package_available("aide") or context.runner.package_available("aide-common"): aide_conf_path = Path("/etc/aide/aide.conf") if not aide_conf_path.exists() or aide_conf_path.read_text(encoding="utf-8") != AIDE_DEFAULT_CONF: context.runner.write_text_file(aide_conf_path, AIDE_DEFAULT_CONF, mode=0o644, requires_root=True) details.append("Configuration AIDE appliquée") default_env = Path("/etc/default/aide") if not default_env.exists() or default_env.read_text(encoding="utf-8") != 'MAILTO=""\n': context.runner.write_text_file(default_env, 'MAILTO=""\n', mode=0o644, requires_root=True) details.append("MAILTO AIDE désactivé") aide_db_new = Path("/var/lib/aide/aide.db.new") if not aide_db_new.exists(): context.runner.run(["aideinit"], requires_root=True, check=False) details.append("AIDE initialisé (aideinit)") else: details.append("AIDE déjà initialisé") if aide_db_new.exists(): existing_db = Path("/var/lib/aide/aide.db") if not existing_db.exists() or aide_db_new.read_bytes() != existing_db.read_bytes(): context.runner.run(["cp", "-f", str(aide_db_new), "/var/lib/aide/aide.db"], requires_root=True, check=False) details.append("Base AIDE mise à jour") if context.runner.command_exists("systemctl"): context.runner.run(["systemctl", "enable", "--now", "dailyaidecheck.timer"], requires_root=True, check=False) details.append("Timers AIDE activés") if context.runner.command_exists("systemctl"): context.runner.run(["systemctl", "enable", "--now", "wazuh-agent"], requires_root=True, check=False) details.append("Wazuh agent activé (configuration server sur 192.168.1.219 à gérer manuellement)") details.append("Utilitaires système et sécurité installés / vérifiés") return _result(context, task, started_at, changed=changed, details=details) def zram_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult: started_at = datetime.now() changed = False details: list[str] = [] ram_mb = context.runner.read_memory_mb() zram_mb = max(512, ram_mb // 2) defaults_content = f"""# Géré par SecureCheck ALGO=zstd PERCENT=50 PRIORITY=100 ZRAM_SIZE={zram_mb} """ start_script = """#!/bin/sh set -eu modprobe zram || true sleep 1 echo zstd > /sys/block/zram0/comp_algorithm echo ${ZRAM_SIZE}M > /sys/block/zram0/disksize mkswap /dev/zram0 swapon -p 100 /dev/zram0 """ stop_script = """#!/bin/sh swapoff /dev/zram0 2>/dev/null || true echo 1 > /sys/block/zram0/reset 2>/dev/null || true rmmod zram 2>/dev/null || true """ service_content = """[Unit] Description=SecureCheck zram swap After=local-fs.target Before=swap.target [Service] Type=oneshot RemainAfterExit=yes EnvironmentFile=/etc/default/securecheck-zram ExecStart=/usr/local/bin/securecheck-zram-start ExecStop=/usr/local/bin/securecheck-zram-stop [Install] WantedBy=multi-user.target """ changed |= context.runner.write_text_file(Path("/etc/default/securecheck-zram"), defaults_content, requires_root=True) changed |= context.runner.write_text_file(Path("/usr/local/bin/securecheck-zram-start"), start_script, mode=0o755, requires_root=True) changed |= context.runner.write_text_file(Path("/usr/local/bin/securecheck-zram-stop"), stop_script, mode=0o755, requires_root=True) changed |= context.runner.write_text_file(Path("/etc/systemd/system/securecheck-zram.service"), service_content, requires_root=True) context.runner.run(["systemctl", "daemon-reload"], requires_root=True) context.runner.enable_service("securecheck-zram.service") details.append(f"zram configuré à {zram_mb} Mo") return _result(context, task, started_at, changed=changed, details=details) def firewall_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult: started_at = datetime.now() manager = context.system.package_manager changed = False details: list[str] = [] if manager == "apt-get": pkg_report = context.runner.ensure_packages_report(["ufw"]) changed |= _append_package_details(context, details, pkg_report) context.runner.run(["ufw", "default", "deny", "incoming"], requires_root=True) context.runner.run(["ufw", "default", "allow", "outgoing"], requires_root=True) ssh_rule = context.runner.run(["ufw", "status"], requires_root=True, check=False) if "22/tcp" not in ssh_rule.stdout and "OpenSSH" not in ssh_rule.stdout: context.runner.run(["ufw", "allow", "22/tcp"], requires_root=True) changed = True context.runner.run(["ufw", "--force", "enable"], requires_root=True) details.append("Pare-feu UFW activé") elif manager in {"dnf", "yum"}: pkg_report = context.runner.ensure_packages_report(["firewalld"]) changed |= _append_package_details(context, details, pkg_report) context.runner.enable_service("firewalld.service") context.runner.run(["firewall-cmd", "--permanent", "--add-service=ssh"], requires_root=True) context.runner.run(["firewall-cmd", "--reload"], requires_root=True) details.append("Pare-feu firewalld activé") else: raise SecureCheckError("Pare-feu automatique non pris en charge sur ce système") return _result(context, task, started_at, changed=changed, details=details) def docker_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult: started_at = datetime.now() manager = context.system.package_manager changed = False details: list[str] = [] if manager == "apt-get": pkg_report = context.runner.ensure_packages_report(["docker.io", "docker-compose-v2"]) changed |= _append_package_details(context, details, pkg_report) elif manager in {"dnf", "yum", "pacman"}: pkg_report = context.runner.ensure_packages_report(["docker"]) changed |= _append_package_details(context, details, pkg_report) else: raise SecureCheckError("Docker n'est pas pris en charge sur ce système") daemon_payload = { "log-driver": "json-file", "log-opts": { "max-size": "10m", "max-file": "3", }, } changed |= context.runner.write_json_file(Path("/etc/docker/daemon.json"), daemon_payload, requires_root=True) context.runner.enable_service("docker.service") context.runner.run(["usermod", "-aG", "docker", context.system.target_user], requires_root=True, check=False) version_result = context.runner.run(["docker", "--version"], requires_root=False, check=False) details.append(version_result.stdout.strip() or "Docker installé / vérifié") return _result(context, task, started_at, changed=changed, details=details) def system_hardening(context: ExecutionContext, task: TaskDefinition) -> TaskResult: started_at = datetime.now() details: list[str] = [] script_content = asset_text("system_hardening.sh") tmp_dir = context.paths.state_dir tmp_dir.mkdir(parents=True, exist_ok=True) script_path: Path | None = None try: with tempfile.NamedTemporaryFile( mode="w", suffix=".sh", delete=False, dir=tmp_dir, encoding="utf-8" ) as fh: fh.write(script_content) script_path = Path(fh.name) current_mode = stat.S_IMODE(script_path.stat().st_mode) script_path.chmod(current_mode | 0o111) env = os.environ.copy() env.update({ "AUTO_YES": "yes", "AUTO_SSH_PORT": "22", "AUTO_CHANGE_ROOT_PWD": "no", "AUTO_DISABLE_ROOT_LOGIN": "no", "AUTO_SKIP_LYNIS": "no", "AUTO_ENABLE_FAIL2BAN": "yes", "AUTO_ENABLE_UFW": "yes", "AUTO_ENABLE_AIDE": "yes", "AUTO_ENABLE_CLAMAV": "yes", "AUTO_SKIP_PORTS_DETECTION": "no", "DEBIAN_FRONTEND": "noninteractive", }) result = context.runner.run( ["bash", str(script_path), "--unattended"], requires_root=True, check=False, capture_output=True, env=env, ) ok_steps = [line for line in result.stdout.splitlines() if "[OK]" in line] warn_steps = [line for line in result.stdout.splitlines() if "[WARN]" in line] err_steps = [line for line in result.stdout.splitlines() if "[ERR]" in line] details.append(f"{len(ok_steps)} étape(s) réussie(s)") if warn_steps: details.append(f"{len(warn_steps)} avertissement(s)") if err_steps: details.append(f"{len(err_steps)} erreur(s)") for line in err_steps[:5]: details.append(f" {line.strip()}") score_line = next( (line for line in result.stdout.splitlines() if "Score:" in line and "Lynis" in line), None, ) if score_line: details.append(score_line.strip()) log_path = Path("/var/log/system_hardening.log") if log_path.exists(): details.append(f"Log complet: {log_path}") backup_path = next(Path("/root").glob("backup_hardening_*"), None) if Path("/root").exists() else None if backup_path: details.append(f"Sauvegardes: {backup_path}") success = result.returncode == 0 and not err_steps error = None if success else f"Le script s'est terminé avec le code {result.returncode}" return context.make_result(task, success=success, changed=True, started_at=started_at, details=details, error=error) finally: if script_path and script_path.exists(): script_path.unlink(missing_ok=True) def bind(task: TaskDefinition, func) -> TaskDefinition: return TaskDefinition( key=task.key, label=task.label, description=task.description, category=task.category, requires_root=task.requires_root, default_selected=task.default_selected, handler=lambda context, _task=task, _func=func: _func(context, _task), )