from __future__ import annotations import asyncio from collections import deque from datetime import datetime, timedelta, timezone from typing import Any def utc_now() -> datetime: return datetime.now(timezone.utc) class NetworkState: def __init__(self) -> None: self.devices: dict[str, dict[str, Any]] = {} self.dns_by_ip: dict[str, deque[dict[str, Any]]] = {} self.flows: deque[dict[str, Any]] = deque(maxlen=2000) self.events: deque[dict[str, Any]] = deque(maxlen=2000) self._subscribers: set[asyncio.Queue] = set() async def publish(self, event: dict[str, Any]) -> None: self.events.append(event) stale: list[asyncio.Queue] = [] for q in self._subscribers: try: q.put_nowait(event) except asyncio.QueueFull: stale.append(q) for q in stale: self._subscribers.discard(q) def subscribe(self) -> asyncio.Queue: queue: asyncio.Queue = asyncio.Queue(maxsize=200) self._subscribers.add(queue) return queue def unsubscribe(self, queue: asyncio.Queue) -> None: self._subscribers.discard(queue) async def upsert_device(self, ip: str, payload: dict[str, Any]) -> None: now = utc_now() prev = self.devices.get(ip) merged = { "ip": ip, "name": payload.get("name") or (prev or {}).get("name") or ip, "hostname": payload.get("hostname") or (prev or {}).get("hostname"), "mac": payload.get("mac") or (prev or {}).get("mac"), "vendor": payload.get("vendor") or (prev or {}).get("vendor"), "type": payload.get("type") or (prev or {}).get("type") or "unknown", "status": payload.get("status") or "online", "last_seen": payload.get("last_seen") or now, "first_seen": (prev or {}).get("first_seen") or now, "source": payload.get("source") or (prev or {}).get("source") or "scanner", } self.devices[ip] = merged if prev is None: await self.publish({ "kind": "device_discovered", "ip": ip, "device": self._serialize_device(merged), "ts": now.isoformat(), }) elif prev.get("status") != "online": await self.publish({ "kind": "device_online", "ip": ip, "device": self._serialize_device(merged), "ts": now.isoformat(), }) async def mark_offline_missing(self, active_ips: set[str]) -> None: now = utc_now() for ip, device in list(self.devices.items()): if ip in active_ips: continue if device.get("status") == "offline": continue if now - self._ensure_dt(device.get("last_seen")) < timedelta(seconds=45): continue device["status"] = "offline" await self.publish({ "kind": "device_offline", "ip": ip, "device": self._serialize_device(device), "ts": now.isoformat(), }) async def add_dns_event(self, src_ip: str, qname: str, answers: list[str], ts: datetime | None = None) -> None: now = ts or utc_now() item = {"qname": qname, "answers": answers, "ts": now} bucket = self.dns_by_ip.setdefault(src_ip, deque(maxlen=200)) bucket.append(item) await self.upsert_device(src_ip, {"status": "online", "last_seen": now, "source": "dns"}) await self.publish({ "kind": "dns_query", "ip": src_ip, "qname": qname, "answers": answers, "ts": now.isoformat(), }) async def add_flow_event( self, src_ip: str, dst_ip: str | None, dst_host: str | None, protocol: str | None, bytes_count: int | None, ts: datetime | None = None, ) -> None: now = ts or utc_now() flow = { "src_ip": src_ip, "dst_ip": dst_ip, "dst_host": dst_host, "protocol": protocol or "unknown", "bytes_count": bytes_count or 0, "ts": now, } self.flows.append(flow) await self.upsert_device(src_ip, {"status": "online", "last_seen": now, "source": "flow"}) await self.publish({ "kind": "flow", "src_ip": src_ip, "dst_ip": dst_ip, "dst_host": dst_host, "protocol": protocol, "bytes_count": bytes_count, "ts": now.isoformat(), }) def topology_payload(self, core_router_ip: str, core_router_name: str, core_switch_ip: str, core_switch_name: str) -> dict[str, Any]: nodes: list[dict[str, Any]] = [] edges: list[dict[str, Any]] = [] nodes.append({"id": "router", "label": core_router_name, "kind": "router", "ip": core_router_ip}) nodes.append({"id": "switch", "label": core_switch_name, "kind": "switch", "ip": core_switch_ip}) edges.append({"id": "e-router-switch", "source": "router", "target": "switch", "kind": "core"}) for ip, device in sorted(self.devices.items(), key=lambda x: tuple(int(p) for p in x[0].split("."))): node_id = f"dev-{ip}" label = device.get("hostname") or device.get("name") or ip nodes.append({ "id": node_id, "label": label, "kind": device.get("type") or "unknown", "status": device.get("status") or "offline", "ip": ip, "vendor": device.get("vendor"), "last_seen": self._ensure_dt(device.get("last_seen")).isoformat(), }) parent = "router" if ip == core_router_ip else "switch" edges.append({"id": f"e-{parent}-{node_id}", "source": parent, "target": node_id, "kind": "lan"}) domain_nodes: dict[str, str] = {} for ip, entries in self.dns_by_ip.items(): src_id = f"dev-{ip}" if src_id not in {n["id"] for n in nodes}: continue for item in list(entries)[-8:]: qname = item.get("qname", "") if not qname: continue domain_id = domain_nodes.get(qname) if domain_id is None: domain_id = f"domain-{len(domain_nodes)+1}" domain_nodes[qname] = domain_id nodes.append({"id": domain_id, "label": qname, "kind": "domain"}) edges.append({ "id": f"e-{src_id}-{domain_id}", "source": src_id, "target": domain_id, "kind": "dns", }) recent_events = list(self.events)[-40:] return { "generated_at": utc_now().isoformat(), "nodes": nodes, "edges": edges, "devices": [self._serialize_device(d) for d in self.devices.values()], "recent_events": recent_events, } @staticmethod def _ensure_dt(value: Any) -> datetime: if isinstance(value, datetime): return value return utc_now() @staticmethod def _serialize_device(device: dict[str, Any]) -> dict[str, Any]: out = dict(device) if isinstance(out.get("last_seen"), datetime): out["last_seen"] = out["last_seen"].isoformat() if isinstance(out.get("first_seen"), datetime): out["first_seen"] = out["first_seen"].isoformat() return out