Bugs Correction

This commit is contained in:
Johnny
2026-04-06 06:07:02 +02:00
parent 751dc8892c
commit 4980d8cf3c
34 changed files with 20541 additions and 35 deletions

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import curses
import re
import textwrap
from collections import defaultdict
from dataclasses import dataclass
@@ -333,10 +334,17 @@ class SecureCheckTUI:
class RunSummaryTUI:
ANSI_RE = re.compile(r"\x1b\[[0-?]*[ -/]*[@-~]")
def __init__(self, results: list[TaskResult], status_items: list[StatusItem], run_log_path: str) -> None:
self.results = results
self.status_items = status_items
self.run_log_path = run_log_path
self.scroll_offset = 0
@classmethod
def _clean(cls, text: str) -> str:
return cls.ANSI_RE.sub("", text)
def run(self) -> None:
curses.wrapper(self._main)
@@ -344,48 +352,73 @@ class RunSummaryTUI:
def _main(self, stdscr: curses.window) -> None:
curses.curs_set(0)
stdscr.keypad(True)
stdscr.timeout(5000)
_setup_colors()
while True:
self._draw(stdscr)
key = stdscr.getch()
if key == -1 or key in (ord("q"), 27, 10, 13, ord("m"), ord(" ")):
if key in (ord("q"), 27, 10, 13, ord("m"), ord(" ")):
return
if key == curses.KEY_UP and self.scroll_offset > 0:
self.scroll_offset -= 1
elif key == curses.KEY_DOWN:
self.scroll_offset += 1
def _draw(self, stdscr: curses.window) -> None:
stdscr.erase()
height, width = stdscr.getmaxyx()
ok_count = sum(1 for result in self.results if result.success)
ko_count = len(self.results) - ok_count
self._draw_box(stdscr, 0, 1, height - 1, width - 2, "Résumé d'exécution")
stdscr.addnstr(1, 3, f"OK: {ok_count} | ECHEC: {ko_count} | Retour menu auto dans 5s", width - 6, curses.color_pair(Palette.HEADER) | curses.A_BOLD)
row = 3
score_lines: list[str] = []
notif_lines: list[str] = []
entries: list[tuple[str, int]] = []
entries.append((f"OK: {ok_count} | ECHEC: {ko_count} | Appuie sur une touche pour revenir", curses.color_pair(Palette.HEADER) | curses.A_BOLD))
for result in self.results:
if row >= height - 6:
break
color = curses.color_pair(Palette.SUCCESS if result.success else Palette.ERROR)
status = "OK" if result.success else "ECHEC"
line = f"{status:<5} {result.label} ({result.duration_seconds:.1f}s)"
stdscr.addnstr(row, 3, line, width - 6, color | curses.A_BOLD)
row += 1
for detail in result.details[:2]:
if row >= height - 6:
break
stdscr.addnstr(row, 6, f"- {detail}", width - 9)
row += 1
if result.error and row < height - 6:
stdscr.addnstr(row, 6, f"- {result.error}", width - 9, curses.color_pair(Palette.ERROR))
row += 1
row += 1
stdscr.addnstr(row, 3, "Etat synthétique:", width - 6, curses.color_pair(Palette.CATEGORY) | curses.A_BOLD)
row += 1
for item in self.status_items[: max(0, height - row - 2)]:
color = curses.color_pair(Palette.SUCCESS if item.ok else Palette.ERROR)
stdscr.addnstr(row, 3, "", 1, color | curses.A_BOLD)
stdscr.addnstr(row, 5, f"[{item.category}] {item.label}: {item.detail}", width - 8)
row += 1
stdscr.addnstr(height - 2, 3, f"Log: {self.run_log_path}", width - 6, curses.color_pair(Palette.MUTED))
color = curses.color_pair(Palette.SUCCESS if result.success else Palette.ERROR)
entries.append((f"{status:<4} {result.label} ({result.duration_seconds:.1f}s)", color | curses.A_BOLD))
for detail in result.details:
clean = self._clean(detail)
if clean.startswith("Score Lynis") or clean.startswith("Hardening index"):
score_lines.append(clean)
continue
if clean.startswith("Modifications") or clean.strip().startswith(""):
notif_lines.append(clean)
continue
wrapped = textwrap.wrap(clean, width - 9) or [""]
for line in wrapped:
entries.append((f" - {line}", 0))
if result.error:
entries.append((f" - {result.error}", curses.color_pair(Palette.ERROR)))
if score_lines:
entries.insert(1, ("Lynis", curses.color_pair(Palette.CATEGORY) | curses.A_BOLD))
for idx, line in enumerate(score_lines, start=2):
entries.insert(idx, (f" {line}", curses.color_pair(Palette.SUCCESS)))
entries.append((("", 0)))
entries.append(("Etat synthétique:", curses.color_pair(Palette.CATEGORY) | curses.A_BOLD))
for item in self.status_items:
attr = curses.color_pair(Palette.SUCCESS if item.ok else Palette.ERROR) | curses.A_BOLD
entries.append((f"● [{item.category}] {item.label}: {item.detail}", attr))
if notif_lines:
entries.append((("", 0)))
entries.append(("Modifications recommandées:", curses.color_pair(Palette.ERROR) | curses.A_BOLD))
for line in notif_lines:
clean = self._clean(line)
bullet = "" if clean.strip().startswith("") else "-"
entries.append((f" {bullet} {clean.lstrip('').strip()}", curses.color_pair(Palette.MUTED)))
entries.append(("", 0))
entries.append((f"Log: {self.run_log_path}", curses.color_pair(Palette.MUTED)))
available = height - 4
max_offset = max(0, len(entries) - available)
self.scroll_offset = min(max(self.scroll_offset, 0), max_offset)
visible = entries[self.scroll_offset : self.scroll_offset + available]
self._draw_box(stdscr, 0, 1, height - 1, width - 2, "Résumé d'exécution")
for idx, (line, attr) in enumerate(visible):
stdscr.addnstr(2 + idx, 3, line, width - 6, attr)
if max_offset:
bar_pos = int((self.scroll_offset / max_offset) * (available - 1)) if max_offset else 0
stdscr.addch(2 + min(bar_pos, available - 1), width - 3, curses.ACS_CKBOARD)
def _draw_box(self, stdscr: curses.window, top: int, left: int, height: int, width: int, title: str) -> None:
stdscr.attron(curses.color_pair(Palette.PANEL))
stdscr.addch(top, left, curses.ACS_ULCORNER)

View File

@@ -72,6 +72,18 @@ def collect_status(system: SystemInfo) -> list[StatusItem]:
firewall_ok = False
firewall_detail = "pare-feu absent"
security.append(StatusItem("Sécurité", "Firewall", firewall_ok, firewall_detail))
apparmor_active = _service_active("apparmor") or _command_exists("apparmor_status")
security.append(StatusItem("Sécurité", "AppArmor", apparmor_active, "activé" if apparmor_active else "inactif"))
clamav_active = _service_active("clamav-daemon") or _service_active("clamav-freshclam")
security.append(StatusItem("Sécurité", "ClamAV", clamav_active, "actif" if clamav_active else "inactif"))
wazuh_active = _service_active("wazuh-agent")
security.append(StatusItem("Sécurité", "Wazuh agent", wazuh_active, "actif" if wazuh_active else "inactif"))
aide_timer_active = _service_active("aidecheck.timer")
aide_db_exists = Path("/var/lib/aide/aide.db").exists()
aide_ok = aide_timer_active or aide_db_exists
detail = "timer actif" if aide_timer_active else "db présent" if aide_db_exists else "inactif"
security.append(StatusItem("Sécurité", "AIDE", aide_ok, detail))
security.append(_binary_status("Sécurité", "Lynis", "lynis"))
security.append(_binary_status("Sécurité", "rkhunter", "rkhunter"))
@@ -81,6 +93,8 @@ def collect_status(system: SystemInfo) -> list[StatusItem]:
fail2ban_active = _command_exists("fail2ban-client") and _service_active("fail2ban.service")
services.append(StatusItem("Services", "Service Docker", docker_active, "actif" if docker_active else "inactif"))
services.append(StatusItem("Services", "Service fail2ban", fail2ban_active, "actif" if fail2ban_active else "inactif"))
avahi_running = _command_exists("avahi-daemon") and _service_active("avahi-daemon")
services.append(StatusItem("Services", "Avahi", not avahi_running, "désactivé" if not avahi_running else "actif"))
services.append(_binary_status("Services", "Docker", "docker"))
services.append(_binary_status("Services", "fail2ban", "fail2ban-client"))

View File

@@ -1,15 +1,29 @@
from __future__ import annotations
import json
import re
from datetime import datetime
from pathlib import Path
from .assets import asset_text
from .executor import ExecutionContext, SecureCheckError
from .executor import CommandResult, ExecutionContext, SecureCheckError
from .models import TaskDefinition, TaskResult
P10K_REMOTE_URL = "https://git.h3campus.fr/Johnny/Install_zsh/raw/branch/main/.p10k.zsh"
P10K_THEME_GIT_URL = "https://github.com/romkatv/powerlevel10k.git"
AIDE_DEFAULT_CONF = """database=file:/var/lib/aide/aide.db
gzip_dbout=yes
group=default
dbinfo=file:/var/lib/aide/aide.db
dbinfo=file:/var/lib/aide/aide.db.gz
verbose=5
report_url=file:/var/log/aide/aide.log
checksum=sha512
file = p+u+g+s+m+acl+selinux+xattrs+sha512
"""
def _result(
@@ -98,6 +112,23 @@ system_name = securecheck
return _result(context, task, started_at, changed=changed, details=details)
def _parse_lynis_result(result: CommandResult) -> tuple[int | None, int | None, list[str]]:
score = None
hardening = None
issues: list[str] = []
for line in result.stdout.splitlines():
stripped = line.strip()
lowered = stripped.lower()
if match := re.search(r"final score\s*:\s*(\d+)", stripped, re.IGNORECASE):
score = int(match.group(1))
elif match := re.search(r"hardening index score\s*:\s*(\d+)", stripped, re.IGNORECASE):
hardening = int(match.group(1))
if any(keyword in lowered for keyword in ("warning", "suggest", "recommend", "failed", "error")):
if stripped and not stripped.startswith("Tip"):
issues.append(stripped)
return score, hardening, sorted(set(issues))
def lynis_audit(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
started_at = datetime.now()
details: list[str] = []
@@ -115,8 +146,23 @@ def lynis_audit(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
).strip() + "\n"
report_path = _write_report(context, "lynis", report_body)
details.append(f"Rapport Lynis: {report_path}")
success = result.returncode == 0
return context.make_result(task, success=success, changed=changed, started_at=started_at, details=details, error=None if success else "Lynis a remonté une erreur")
score, hardening, issues = _parse_lynis_result(result)
if score is not None:
details.append(f"Score Lynis: {score}")
if hardening is not None:
details.append(f"Hardening index: {hardening}")
if issues:
details.append("Modifications recommandées Lynis :")
details.extend(f"{issue}" for issue in issues[:10])
success = result.returncode == 0 and not issues
return context.make_result(
task,
success=success,
changed=changed,
started_at=started_at,
details=details,
error=None if success else "Lynis a détecté des recommandations",
)
def rootkit_check(context: ExecutionContext, task: TaskDefinition) -> TaskResult:
@@ -310,15 +356,92 @@ def utilities_setup(context: ExecutionContext, task: TaskDefinition) -> TaskResu
"ca-certificates",
]
elif manager in {"dnf", "yum"}:
packages = ["ncdu", "git", "curl", "fail2ban", "htop", "nmon", "duf", "net-tools", "tmux", "tree", "vim-enhanced"]
packages = [
"ncdu",
"git",
"curl",
"fail2ban",
"htop",
"nmon",
"duf",
"net-tools",
"tmux",
"tree",
"vim-enhanced",
"libpam-tmpdir",
"clamav",
"apparmor",
"wazuh-agent",
"aide",
"aide-common",
]
else:
packages = ["ncdu", "git", "curl", "htop", "nmon", "duf", "net-tools", "tmux", "tree", "vim"]
packages = [
"ncdu",
"git",
"curl",
"htop",
"nmon",
"duf",
"net-tools",
"tmux",
"tree",
"vim",
"libpam-tmpdir",
"clamav",
"apparmor",
"wazuh-agent",
"aide",
"aide-common",
]
details: list[str] = []
pkg_report = context.runner.ensure_packages_report(packages)
changed = _append_package_details(context, details, pkg_report)
if context.runner.command_exists("systemctl") and context.runner.command_exists("fail2ban-client"):
context.runner.enable_service("fail2ban.service")
if context.runner.command_exists("systemctl") and context.runner.command_exists("avahi-daemon"):
context.runner.run(["systemctl", "disable", "--now", "avahi-daemon"], requires_root=True, check=False)
details.append("Service avahi-daemon stoppé/désactivé")
if context.runner.package_available("apparmor") or context.runner.command_exists("apparmor_status"):
context.runner.run(["systemctl", "enable", "--now", "apparmor"], requires_root=True, check=False)
details.append("AppArmor activé")
if context.runner.package_available("clamav") or context.runner.command_exists("clamd"):
context.runner.run(["systemctl", "enable", "--now", "clamav-freshclam"], requires_root=True, check=False)
context.runner.run(["systemctl", "enable", "--now", "clamav-daemon"], requires_root=True, check=False)
details.append("ClamAV (daemon + freshclam) activé")
if context.runner.package_available("aide") or context.runner.package_available("aide-common"):
aide_conf_path = Path("/etc/aide/aide.conf")
if not aide_conf_path.exists() or aide_conf_path.read_text(encoding="utf-8") != AIDE_DEFAULT_CONF:
context.runner.write_text_file(aide_conf_path, AIDE_DEFAULT_CONF, mode=0o644, requires_root=True)
details.append("Configuration AIDE appliquée")
default_env = Path("/etc/default/aide")
if not default_env.exists() or default_env.read_text(encoding="utf-8") != 'MAILTO=""\n':
context.runner.write_text_file(default_env, 'MAILTO=""\n', mode=0o644, requires_root=True)
details.append("MAILTO AIDE désactivé")
aide_db_new = Path("/var/lib/aide/aide.db.new")
if not aide_db_new.exists():
context.runner.run(["aideinit"], requires_root=True, check=False)
details.append("AIDE initialisé (aideinit)")
else:
details.append("AIDE déjà initialisé")
if aide_db_new.exists():
existing_db = Path("/var/lib/aide/aide.db")
if not existing_db.exists() or aide_db_new.read_bytes() != existing_db.read_bytes():
context.runner.run(["cp", "-f", str(aide_db_new), "/var/lib/aide/aide.db"], requires_root=True, check=False)
details.append("Base AIDE mise à jour")
if context.runner.command_exists("systemctl"):
context.runner.run(["systemctl", "enable", "--now", "aidecheck.timer"], requires_root=True, check=False)
context.runner.run(["systemctl", "enable", "--now", "dailyaidecheck.timer"], requires_root=True, check=False)
details.append("Timers AIDE activés")
if context.runner.command_exists("systemctl"):
context.runner.run(["systemctl", "enable", "--now", "wazuh-agent"], requires_root=True, check=False)
details.append("Wazuh agent activé (configuration server sur 192.168.1.219 à gérer manuellement)")
details.append("Utilitaires système et sécurité installés / vérifiés")
return _result(context, task, started_at, changed=changed, details=details)