# audit_log.py — Ansible callback plugin for fleet-ops audit trail # Logs each playbook run to /audit/-.jsonl # Format: one JSON object per line (JSONL), one file per playbook per day from __future__ import annotations from ansible.plugins.callback import CallbackBase import json import os from datetime import datetime, timezone DOCUMENTATION = """ name: audit_log type: notification short_description: Write structured audit log to /audit/ description: - Appends a JSON record for each playbook run to /audit/-.jsonl - Fields: timestamp, completed, playbook, check_mode, outcome, summary, tasks options: {} """ class CallbackModule(CallbackBase): CALLBACK_VERSION = 2.0 CALLBACK_TYPE = "notification" CALLBACK_NAME = "audit_log" CALLBACK_NEEDS_WHITELIST = True AUDIT_DIR = "/audit" def __init__(self): super().__init__() self._playbook_name = "unknown" self._start_time = None self._check_mode = False self._task_results: list[dict] = [] def v2_playbook_on_start(self, playbook): self._playbook_name = os.path.basename(playbook._file_name) self._start_time = datetime.now(timezone.utc).isoformat() self._task_results = [] def v2_playbook_on_play_start(self, play): self._check_mode = play.check_mode def _record(self, result, status: str): entry = { "host": result._host.name, "task": result._task.get_name(), "status": status, } if status == "failed": entry["msg"] = str(result._result.get("msg", "")) self._task_results.append(entry) def v2_runner_on_ok(self, result): self._record(result, "ok") def v2_runner_on_failed(self, result, ignore_errors=False): self._record(result, "failed" if not ignore_errors else "ignored") def v2_runner_on_unreachable(self, result): self._record(result, "unreachable") def v2_runner_on_skipped(self, result): self._record(result, "skipped") def v2_playbook_on_stats(self, stats): end_time = datetime.now(timezone.utc).isoformat() summary = {} for host in stats.processed: summary[host] = { "ok": stats.ok.get(host, 0), "changed": stats.changed.get(host, 0), "failed": stats.failures.get(host, 0), "unreachable": stats.dark.get(host, 0), "skipped": stats.skipped.get(host, 0), } any_failed = any( v["failed"] > 0 or v["unreachable"] > 0 for v in summary.values() ) outcome = "failed" if any_failed else "success" if self._check_mode: outcome = f"check-{outcome}" log_entry = { "timestamp": self._start_time, "completed": end_time, "playbook": self._playbook_name, "check_mode": self._check_mode, "outcome": outcome, "summary": summary, "tasks": self._task_results, } os.makedirs(self.AUDIT_DIR, exist_ok=True) date_str = (self._start_time or end_time)[:10] playbook_stem = self._playbook_name.replace(".yml", "").replace(".yaml", "") log_file = os.path.join(self.AUDIT_DIR, f"{date_str}-{playbook_stem}.jsonl") try: with open(log_file, "a") as f: f.write(json.dumps(log_entry) + "\n") except OSError as e: self._display.warning(f"audit_log: could not write to {log_file}: {e}")