from __future__ import annotations import asyncio import json from contextlib import asynccontextmanager from pathlib import Path from fastapi import FastAPI from fastapi.responses import FileResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from .dnslog import tail_dns_log from .scanner import scan_subnet from .schemas import DhcpIngestEvent, DnsIngestEvent, FlowIngestEvent from .settings import settings from .state import NetworkState state = NetworkState() async def scanner_loop() -> None: while True: active_ips = await scan_subnet( state=state, subnet_cidr=settings.scan_subnet, timeout_seconds=settings.scan_timeout_seconds, concurrency=settings.scan_concurrency, ) await state.mark_offline_missing(active_ips) await asyncio.sleep(settings.scan_interval_seconds) @asynccontextmanager async def lifespan(_: FastAPI): tasks = [asyncio.create_task(scanner_loop(), name="scanner-loop")] if settings.dns_log_path: tasks.append(asyncio.create_task(tail_dns_log(state, settings.dns_log_path), name="dns-tail-loop")) try: yield finally: for task in tasks: task.cancel() for task in tasks: try: await task except asyncio.CancelledError: pass app = FastAPI(title="Network Topology Wrapper", lifespan=lifespan) static_dir = Path(__file__).resolve().parents[2] / "frontend" app.mount("/static", StaticFiles(directory=static_dir), name="static") @app.get("/") def index() -> FileResponse: return FileResponse(static_dir / "index.html") @app.get("/healthz") def healthz() -> dict[str, str]: return {"status": "ok"} @app.get("/api/topology") def api_topology() -> dict: return state.topology_payload( core_router_ip=settings.core_router_ip, core_router_name=settings.core_router_name, core_switch_ip=settings.core_switch_ip, core_switch_name=settings.core_switch_name, ) @app.post("/api/ingest/dns") async def ingest_dns(event: DnsIngestEvent) -> dict[str, str]: await state.add_dns_event( src_ip=event.src_ip, qname=event.qname, answers=event.answers, ts=event.ts, ) return {"status": "accepted"} @app.post("/api/ingest/dhcp") async def ingest_dhcp(event: DhcpIngestEvent) -> dict[str, str]: await state.upsert_device( ip=event.ip, payload={ "mac": event.mac, "hostname": event.hostname, "name": event.hostname or event.ip, "status": "online" if event.state != "offline" else "offline", "source": "dhcp", "type": "client", }, ) return {"status": "accepted"} @app.post("/api/ingest/flow") async def ingest_flow(event: FlowIngestEvent) -> dict[str, str]: await state.add_flow_event( src_ip=event.src_ip, dst_ip=event.dst_ip, dst_host=event.dst_host, protocol=event.protocol, bytes_count=event.bytes_count, ts=event.ts, ) return {"status": "accepted"} @app.get("/api/events/stream") async def events_stream() -> StreamingResponse: queue = state.subscribe() async def stream(): try: while True: event = await queue.get() payload = json.dumps(event, ensure_ascii=True) yield f"data: {payload}\n\n" finally: state.unsubscribe(queue) return StreamingResponse(stream(), media_type="text/event-stream")