Files
SecureCheck/securecheck/tasks.py
2026-04-06 08:37:54 +02:00

665 lines
26 KiB
Python

from __future__ import annotations
import json
import os
import re
import stat
import tempfile
from datetime import datetime
from pathlib import Path
from .assets import asset_text
from .executor import CommandResult, ExecutionContext, SecureCheckError
from .models import TaskDefinition, TaskResult
P10K_REMOTE_URL = "https://git.h3campus.fr/Johnny/Install_zsh/raw/branch/main/.p10k.zsh"
P10K_THEME_GIT_URL = "https://github.com/romkatv/powerlevel10k.git"
AIDE_DEFAULT_CONF = """database=file:/var/lib/aide/aide.db
gzip_dbout=yes
group=default
dbinfo=file:/var/lib/aide/aide.db
dbinfo=file:/var/lib/aide/aide.db.gz
verbose=5
report_url=file:/var/log/aide/aide.log
checksum=sha512
file = p+u+g+s+m+acl+selinux+xattrs+sha512
"""
def _result(
context: ExecutionContext,
task: TaskDefinition,
started_at: datetime,
*,
changed: bool,
details: list[str] | None = None,
) -> TaskResult:
return context.make_result(task, success=True, changed=changed, started_at=started_at, details=details or [])
def _write_report(context: ExecutionContext, name: str, content: str) -> Path:
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
report_path = context.paths.report_dir / f"{timestamp}-{name}.log"
context.runner.write_text_file(report_path, content, mode=0o640, requires_root=False)
return report_path
def _append_package_details(context: ExecutionContext, details: list[str], report) -> bool:
changed = report.changed
added_label = "Seraient ajoutés" if context.dry_run else "Ajoutés"
if report.already_present:
details.append(f"Déjà présents: {', '.join(report.already_present)}")
if report.installed:
details.append(f"{added_label}: {', '.join(report.installed)}")
return changed
def system_update(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
context.runner.update_package_index()
context.runner.upgrade_system()
details = ["Index des paquets rafraîchi", "Mises à jour système appliquées"]
return _result(context, task, started_at, changed=True, details=details)
def automatic_updates(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
manager = context.system.package_manager
changed = False
details: list[str] = []
if manager == "apt-get":
pkg_report = context.runner.ensure_packages_report(["unattended-upgrades", "apt-listchanges"])
changed |= _append_package_details(context, details, pkg_report)
content_20 = """APT::Periodic::Update-Package-Lists "1";
APT::Periodic::Download-Upgradeable-Packages "1";
APT::Periodic::AutocleanInterval "7";
APT::Periodic::Unattended-Upgrade "1";
"""
content_52 = """Unattended-Upgrade::Automatic-Reboot "false";
Unattended-Upgrade::Automatic-Reboot-Time "03:30";
Unattended-Upgrade::Remove-Unused-Dependencies "true";
Unattended-Upgrade::Remove-New-Unused-Dependencies "true";
"""
changed |= context.runner.write_text_file(Path("/etc/apt/apt.conf.d/20auto-upgrades"), content_20, requires_root=True)
changed |= context.runner.write_text_file(
Path("/etc/apt/apt.conf.d/52securecheck-unattended-upgrades"),
content_52,
requires_root=True,
)
context.runner.enable_service("unattended-upgrades.service")
details.append("Mises à jour automatiques APT configurées")
elif manager in {"dnf", "yum"}:
pkg_report = context.runner.ensure_packages_report(["dnf-automatic"])
changed |= _append_package_details(context, details, pkg_report)
changed |= context.runner.write_text_file(
Path("/etc/dnf/automatic.conf"),
"""[commands]
apply_updates = yes
upgrade_type = default
[emitters]
emit_via = stdio
system_name = securecheck
""",
requires_root=True,
)
context.runner.enable_service("dnf-automatic.timer")
details.append("Mises à jour automatiques DNF configurées")
else:
raise SecureCheckError("Les mises à jour automatiques ne sont pas prises en charge sur ce système")
return _result(context, task, started_at, changed=changed, details=details)
def _parse_lynis_result(result: CommandResult) -> tuple[int | None, int | None, list[str]]:
score = None
hardening = None
issues: list[str] = []
for line in result.stdout.splitlines():
stripped = line.strip()
lowered = stripped.lower()
if match := re.search(r"final score\s*:\s*(\d+)", stripped, re.IGNORECASE):
score = int(match.group(1))
elif match := re.search(r"hardening index score\s*:\s*(\d+)", stripped, re.IGNORECASE):
hardening = int(match.group(1))
if any(keyword in lowered for keyword in ("warning", "suggest", "recommend", "failed", "error")):
if stripped and not stripped.startswith("Tip"):
issues.append(stripped)
return score, hardening, sorted(set(issues))
def lynis_audit(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
details: list[str] = []
pkg_report = context.runner.ensure_packages_report(["lynis"])
changed = _append_package_details(context, details, pkg_report)
result = context.runner.run(["lynis", "audit", "system", "--quick"], requires_root=True, check=False)
report_body = "\n".join(
[
"=== SecureCheck / Lynis ===",
f"Return code: {result.returncode}",
"",
result.stdout,
result.stderr,
]
).strip() + "\n"
report_path = _write_report(context, "lynis", report_body)
details.append(f"Rapport Lynis: {report_path}")
score, hardening, issues = _parse_lynis_result(result)
if score is not None:
details.append(f"Score Lynis: {score}")
if hardening is not None:
details.append(f"Hardening index: {hardening}")
if issues:
details.append("Modifications recommandées Lynis :")
details.extend(f"{issue}" for issue in issues[:10])
success = result.returncode == 0 and not issues
return context.make_result(
task,
success=success,
changed=changed,
started_at=started_at,
details=details,
error=None if success else "Lynis a détecté des recommandations",
)
def rootkit_check(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
details: list[str] = []
pkg_report = context.runner.ensure_packages_report(["rkhunter", "chkrootkit"])
changed = _append_package_details(context, details, pkg_report)
update_result = context.runner.run(["rkhunter", "--update"], requires_root=True, check=False)
details.append(f"rkhunter update rc={update_result.returncode}")
propupd_result = context.runner.run(["rkhunter", "--propupd"], requires_root=True, check=False)
details.append(f"rkhunter propupd rc={propupd_result.returncode}")
rkhunter_result = context.runner.run(
["rkhunter", "--check", "--skip-keypress", "--report-warnings-only"],
requires_root=True,
check=False,
)
chkrootkit_result = context.runner.run(["chkrootkit", "-q"], requires_root=True, check=False)
report_payload = {
"rkhunter_check_returncode": rkhunter_result.returncode,
"chkrootkit_returncode": chkrootkit_result.returncode,
"rkhunter_stdout": rkhunter_result.stdout,
"rkhunter_stderr": rkhunter_result.stderr,
"chkrootkit_stdout": chkrootkit_result.stdout,
"chkrootkit_stderr": chkrootkit_result.stderr,
}
report_path = context.paths.report_dir / f"{datetime.now().strftime('%Y%m%d-%H%M%S')}-rootkit-report.json"
context.runner.write_text_file(report_path, json.dumps(report_payload, indent=2) + "\n", mode=0o640, requires_root=False)
details.append(f"Rapport rootkits: {report_path}")
# rkhunter: rc=0 clean, rc=1 warnings trouvés, rc=2+ erreur critique (outil n'a pas pu tourner)
rkhunter_ran = rkhunter_result.returncode <= 1
rkhunter_warnings = [line for line in rkhunter_result.stdout.splitlines() if line.startswith("Warning:")]
if rkhunter_warnings:
details.append(f"rkhunter: {len(rkhunter_warnings)} warning(s) à vérifier dans le rapport")
# chkrootkit: rc=0 signifie que l'outil a tourné (pas qu'il n'y a pas d'infection — les INFECTED sont dans stdout)
chkrootkit_ran = chkrootkit_result.returncode == 0
infected_lines = [line for line in chkrootkit_result.stdout.splitlines() if "INFECTED" in line]
if infected_lines:
details.append(f"chkrootkit: {len(infected_lines)} détection(s) à vérifier dans le rapport")
success = rkhunter_ran and chkrootkit_ran
error = None if success else "Les outils de vérification n'ont pas pu s'exécuter correctement"
return context.make_result(task, success=success, changed=changed, started_at=started_at, details=details, error=error)
def log_rotation(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
details: list[str] = []
pkg_report = context.runner.ensure_packages_report(["logrotate"])
changed = _append_package_details(context, details, pkg_report)
log_target = "/var/log/securecheck/*.log"
report_target = "/var/log/securecheck/reports/*"
content = f"""{log_target} {report_target} {{
rotate 7
daily
missingok
notifempty
compress
delaycompress
copytruncate
create 0640 root adm
}}
"""
changed |= context.runner.write_text_file(Path("/etc/logrotate.d/securecheck"), content, requires_root=True)
details.append("Rotation des logs SecureCheck configurée")
return _result(context, task, started_at, changed=changed, details=details)
def zsh_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
manager = context.system.package_manager
details: list[str] = []
packages = ["zsh", "git", "curl"]
if manager == "apt-get":
packages += ["zsh-autosuggestions", "zsh-syntax-highlighting"]
for optional in ("zsh-theme-powerlevel10k", "fonts-powerline"):
if context.runner.package_available(optional):
packages.append(optional)
elif manager == "pacman":
for optional in ("zsh-autosuggestions", "zsh-syntax-highlighting", "zsh-theme-powerlevel10k"):
if context.runner.package_available(optional):
packages.append(optional)
pkg_report = context.runner.ensure_packages_report(packages)
changed = _append_package_details(context, details, pkg_report)
try:
p10k_content = context.runner.download_text(P10K_REMOTE_URL)
p10k_source = f"source distante: {P10K_REMOTE_URL}"
except Exception: # noqa: BLE001
p10k_content = asset_text("p10k.zsh")
p10k_source = "source locale embarquée: assets/p10k.zsh"
if not p10k_content:
p10k_content = asset_text("p10k.zsh")
p10k_source = "source locale embarquée: assets/p10k.zsh"
zshrc_path = context.system.target_home / ".zshrc"
p10k_path = context.system.target_home / ".p10k.zsh"
theme_repo_path = context.system.target_home / ".powerlevel10k"
theme_system_paths = [
Path("/usr/share/powerlevel10k/powerlevel10k.zsh-theme"),
Path("/usr/share/zsh-theme-powerlevel10k/powerlevel10k.zsh-theme"),
]
if not any(path.exists() for path in theme_system_paths):
if not theme_repo_path.exists():
context.runner.run(
["git", "clone", "--depth=1", P10K_THEME_GIT_URL, str(theme_repo_path)],
run_as_user=context.system.target_user,
)
changed = True
details.append(f"Theme powerlevel10k cloné dans {theme_repo_path}")
else:
details.append(f"Theme powerlevel10k déjà présent dans {theme_repo_path}")
zshrc_content = """# Fichier généré par SecureCheck
if [[ -r "${XDG_CACHE_HOME:-$HOME/.cache}/p10k-instant-prompt-${(%):-%n}.zsh" ]]; then
source "${XDG_CACHE_HOME:-$HOME/.cache}/p10k-instant-prompt-${(%):-%n}.zsh"
fi
export HISTFILE="$HOME/.zsh_history"
export HISTSIZE=10000
export SAVEHIST=10000
setopt appendhistory
setopt histignoredups
setopt sharehistory
setopt autocd
autoload -Uz compinit
compinit
if [ -f /usr/share/powerlevel10k/powerlevel10k.zsh-theme ]; then
source /usr/share/powerlevel10k/powerlevel10k.zsh-theme
elif [ -f /usr/share/zsh-theme-powerlevel10k/powerlevel10k.zsh-theme ]; then
source /usr/share/zsh-theme-powerlevel10k/powerlevel10k.zsh-theme
elif [ -f "$HOME/.powerlevel10k/powerlevel10k.zsh-theme" ]; then
source "$HOME/.powerlevel10k/powerlevel10k.zsh-theme"
fi
if [ -f /usr/share/zsh-autosuggestions/zsh-autosuggestions.zsh ]; then
source /usr/share/zsh-autosuggestions/zsh-autosuggestions.zsh
fi
if [ -f /usr/share/zsh-syntax-highlighting/zsh-syntax-highlighting.zsh ]; then
source /usr/share/zsh-syntax-highlighting/zsh-syntax-highlighting.zsh
fi
bindkey '^[[A' history-search-backward
bindkey '^[[B' history-search-forward
alias ll='ls -alF'
alias la='ls -A'
alias l='ls -CF'
alias update-system='sudo apt-get update && sudo apt-get dist-upgrade -y'
[[ -f "$HOME/.p10k.zsh" ]] && source "$HOME/.p10k.zsh"
"""
changed |= context.runner.write_text_file(
p10k_path,
p10k_content,
mode=0o644,
owner_uid=context.system.target_uid,
owner_gid=context.system.target_gid,
)
changed |= context.runner.write_text_file(
zshrc_path,
zshrc_content,
mode=0o644,
owner_uid=context.system.target_uid,
owner_gid=context.system.target_gid,
)
zsh_path = "/usr/bin/zsh" if Path("/usr/bin/zsh").exists() else "/bin/zsh"
if Path(zsh_path).exists():
changed |= context.runner.ensure_user_shell(zsh_path)
details.append(f"Configuration zsh appliquée pour {context.system.target_user}")
details.append(f"Fichier p10k copié vers {p10k_path}")
details.append(p10k_source)
return _result(context, task, started_at, changed=changed, details=details)
def utilities_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
manager = context.system.package_manager
if manager == "apt-get":
packages = [
"ncdu",
"needrestart",
"git",
"curl",
"fail2ban",
"htop",
"nmon",
"duf",
"net-tools",
"tmux",
"tree",
"vim",
"ca-certificates",
]
elif manager in {"dnf", "yum"}:
packages = [
"ncdu",
"git",
"curl",
"fail2ban",
"htop",
"nmon",
"duf",
"net-tools",
"tmux",
"tree",
"vim-enhanced",
"libpam-tmpdir",
"clamav",
"apparmor",
"wazuh-agent",
"aide",
"aide-common",
]
else:
packages = [
"ncdu",
"git",
"curl",
"htop",
"nmon",
"duf",
"net-tools",
"tmux",
"tree",
"vim",
"libpam-tmpdir",
"clamav",
"apparmor",
"wazuh-agent",
"aide",
"aide-common",
]
details: list[str] = []
pkg_report = context.runner.ensure_packages_report(packages)
changed = _append_package_details(context, details, pkg_report)
if context.runner.command_exists("systemctl") and context.runner.command_exists("fail2ban-client"):
context.runner.enable_service("fail2ban.service")
if context.runner.command_exists("systemctl") and context.runner.command_exists("avahi-daemon"):
context.runner.run(["systemctl", "disable", "--now", "avahi-daemon"], requires_root=True, check=False)
details.append("Service avahi-daemon stoppé/désactivé")
if context.runner.package_available("apparmor") or context.runner.command_exists("apparmor_status"):
context.runner.run(["systemctl", "enable", "--now", "apparmor"], requires_root=True, check=False)
details.append("AppArmor activé")
if context.runner.package_available("clamav") or context.runner.command_exists("clamd"):
context.runner.run(["systemctl", "enable", "--now", "clamav-freshclam"], requires_root=True, check=False)
context.runner.run(["systemctl", "enable", "--now", "clamav-daemon"], requires_root=True, check=False)
details.append("ClamAV (daemon + freshclam) activé")
if context.runner.package_available("aide") or context.runner.package_available("aide-common"):
aide_conf_path = Path("/etc/aide/aide.conf")
if not aide_conf_path.exists() or aide_conf_path.read_text(encoding="utf-8") != AIDE_DEFAULT_CONF:
context.runner.write_text_file(aide_conf_path, AIDE_DEFAULT_CONF, mode=0o644, requires_root=True)
details.append("Configuration AIDE appliquée")
default_env = Path("/etc/default/aide")
if not default_env.exists() or default_env.read_text(encoding="utf-8") != 'MAILTO=""\n':
context.runner.write_text_file(default_env, 'MAILTO=""\n', mode=0o644, requires_root=True)
details.append("MAILTO AIDE désactivé")
aide_db_new = Path("/var/lib/aide/aide.db.new")
if not aide_db_new.exists():
context.runner.run(["aideinit"], requires_root=True, check=False)
details.append("AIDE initialisé (aideinit)")
else:
details.append("AIDE déjà initialisé")
if aide_db_new.exists():
existing_db = Path("/var/lib/aide/aide.db")
if not existing_db.exists() or aide_db_new.read_bytes() != existing_db.read_bytes():
context.runner.run(["cp", "-f", str(aide_db_new), "/var/lib/aide/aide.db"], requires_root=True, check=False)
details.append("Base AIDE mise à jour")
if context.runner.command_exists("systemctl"):
context.runner.run(["systemctl", "enable", "--now", "dailyaidecheck.timer"], requires_root=True, check=False)
details.append("Timers AIDE activés")
if context.runner.command_exists("systemctl"):
context.runner.run(["systemctl", "enable", "--now", "wazuh-agent"], requires_root=True, check=False)
details.append("Wazuh agent activé (configuration server sur 192.168.1.219 à gérer manuellement)")
details.append("Utilitaires système et sécurité installés / vérifiés")
return _result(context, task, started_at, changed=changed, details=details)
def zram_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
changed = False
details: list[str] = []
ram_mb = context.runner.read_memory_mb()
zram_mb = max(512, ram_mb // 2)
defaults_content = f"""# Géré par SecureCheck
ALGO=zstd
PERCENT=50
PRIORITY=100
ZRAM_SIZE={zram_mb}
"""
start_script = """#!/bin/sh
set -eu
modprobe zram || true
sleep 1
echo zstd > /sys/block/zram0/comp_algorithm
echo ${ZRAM_SIZE}M > /sys/block/zram0/disksize
mkswap /dev/zram0
swapon -p 100 /dev/zram0
"""
stop_script = """#!/bin/sh
swapoff /dev/zram0 2>/dev/null || true
echo 1 > /sys/block/zram0/reset 2>/dev/null || true
rmmod zram 2>/dev/null || true
"""
service_content = """[Unit]
Description=SecureCheck zram swap
After=local-fs.target
Before=swap.target
[Service]
Type=oneshot
RemainAfterExit=yes
EnvironmentFile=/etc/default/securecheck-zram
ExecStart=/usr/local/bin/securecheck-zram-start
ExecStop=/usr/local/bin/securecheck-zram-stop
[Install]
WantedBy=multi-user.target
"""
changed |= context.runner.write_text_file(Path("/etc/default/securecheck-zram"), defaults_content, requires_root=True)
changed |= context.runner.write_text_file(Path("/usr/local/bin/securecheck-zram-start"), start_script, mode=0o755, requires_root=True)
changed |= context.runner.write_text_file(Path("/usr/local/bin/securecheck-zram-stop"), stop_script, mode=0o755, requires_root=True)
changed |= context.runner.write_text_file(Path("/etc/systemd/system/securecheck-zram.service"), service_content, requires_root=True)
context.runner.run(["systemctl", "daemon-reload"], requires_root=True)
context.runner.enable_service("securecheck-zram.service")
details.append(f"zram configuré à {zram_mb} Mo")
return _result(context, task, started_at, changed=changed, details=details)
def firewall_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
manager = context.system.package_manager
changed = False
details: list[str] = []
if manager == "apt-get":
pkg_report = context.runner.ensure_packages_report(["ufw"])
changed |= _append_package_details(context, details, pkg_report)
context.runner.run(["ufw", "default", "deny", "incoming"], requires_root=True)
context.runner.run(["ufw", "default", "allow", "outgoing"], requires_root=True)
ssh_rule = context.runner.run(["ufw", "status"], requires_root=True, check=False)
if "22/tcp" not in ssh_rule.stdout and "OpenSSH" not in ssh_rule.stdout:
context.runner.run(["ufw", "allow", "22/tcp"], requires_root=True)
changed = True
context.runner.run(["ufw", "--force", "enable"], requires_root=True)
details.append("Pare-feu UFW activé")
elif manager in {"dnf", "yum"}:
pkg_report = context.runner.ensure_packages_report(["firewalld"])
changed |= _append_package_details(context, details, pkg_report)
context.runner.enable_service("firewalld.service")
context.runner.run(["firewall-cmd", "--permanent", "--add-service=ssh"], requires_root=True)
context.runner.run(["firewall-cmd", "--reload"], requires_root=True)
details.append("Pare-feu firewalld activé")
else:
raise SecureCheckError("Pare-feu automatique non pris en charge sur ce système")
return _result(context, task, started_at, changed=changed, details=details)
def docker_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
manager = context.system.package_manager
changed = False
details: list[str] = []
if manager == "apt-get":
pkg_report = context.runner.ensure_packages_report(["docker.io", "docker-compose-v2"])
changed |= _append_package_details(context, details, pkg_report)
elif manager in {"dnf", "yum", "pacman"}:
pkg_report = context.runner.ensure_packages_report(["docker"])
changed |= _append_package_details(context, details, pkg_report)
else:
raise SecureCheckError("Docker n'est pas pris en charge sur ce système")
daemon_payload = {
"log-driver": "json-file",
"log-opts": {
"max-size": "10m",
"max-file": "3",
},
}
changed |= context.runner.write_json_file(Path("/etc/docker/daemon.json"), daemon_payload, requires_root=True)
context.runner.enable_service("docker.service")
context.runner.run(["usermod", "-aG", "docker", context.system.target_user], requires_root=True, check=False)
version_result = context.runner.run(["docker", "--version"], requires_root=False, check=False)
details.append(version_result.stdout.strip() or "Docker installé / vérifié")
return _result(context, task, started_at, changed=changed, details=details)
def system_hardening(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
details: list[str] = []
script_content = asset_text("system_hardening.sh")
tmp_dir = context.paths.state_dir
tmp_dir.mkdir(parents=True, exist_ok=True)
script_path: Path | None = None
try:
with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False, dir=tmp_dir, encoding="utf-8"
) as fh:
fh.write(script_content)
script_path = Path(fh.name)
current_mode = stat.S_IMODE(script_path.stat().st_mode)
script_path.chmod(current_mode | 0o111)
env = os.environ.copy()
env.update({
"AUTO_YES": "yes",
"AUTO_SSH_PORT": "22",
"AUTO_CHANGE_ROOT_PWD": "no",
"AUTO_DISABLE_ROOT_LOGIN": "no",
"AUTO_SKIP_LYNIS": "no",
"AUTO_ENABLE_FAIL2BAN": "yes",
"AUTO_ENABLE_UFW": "yes",
"AUTO_ENABLE_AIDE": "yes",
"AUTO_ENABLE_CLAMAV": "yes",
"AUTO_SKIP_PORTS_DETECTION": "no",
"DEBIAN_FRONTEND": "noninteractive",
})
result = context.runner.run(
["bash", str(script_path), "--unattended"],
requires_root=True,
check=False,
capture_output=True,
env=env,
)
ok_steps = [line for line in result.stdout.splitlines() if "[OK]" in line]
warn_steps = [line for line in result.stdout.splitlines() if "[WARN]" in line]
err_steps = [line for line in result.stdout.splitlines() if "[ERR]" in line]
details.append(f"{len(ok_steps)} étape(s) réussie(s)")
if warn_steps:
details.append(f"{len(warn_steps)} avertissement(s)")
if err_steps:
details.append(f"{len(err_steps)} erreur(s)")
for line in err_steps[:5]:
details.append(f" {line.strip()}")
score_line = next(
(line for line in result.stdout.splitlines() if "Score:" in line and "Lynis" in line),
None,
)
if score_line:
details.append(score_line.strip())
log_path = Path("/var/log/system_hardening.log")
if log_path.exists():
details.append(f"Log complet: {log_path}")
backup_path = next(Path("/root").glob("backup_hardening_*"), None) if Path("/root").exists() else None
if backup_path:
details.append(f"Sauvegardes: {backup_path}")
success = result.returncode == 0 and not err_steps
error = None if success else f"Le script s'est terminé avec le code {result.returncode}"
return context.make_result(task, success=success, changed=True, started_at=started_at, details=details, error=error)
finally:
if script_path and script_path.exists():
script_path.unlink(missing_ok=True)
def bind(task: TaskDefinition, func) -> TaskDefinition:
return TaskDefinition(
key=task.key,
label=task.label,
description=task.description,
category=task.category,
requires_root=task.requires_root,
default_selected=task.default_selected,
handler=lambda context, _task=task, _func=func: _func(context, _task),
)