130 lines
3.5 KiB
Python
130 lines
3.5 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
from contextlib import asynccontextmanager
|
|
from pathlib import Path
|
|
|
|
from fastapi import FastAPI
|
|
from fastapi.responses import FileResponse, StreamingResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
|
|
from .dnslog import tail_dns_log
|
|
from .scanner import scan_subnet
|
|
from .schemas import DhcpIngestEvent, DnsIngestEvent, FlowIngestEvent
|
|
from .settings import settings
|
|
from .state import NetworkState
|
|
|
|
state = NetworkState()
|
|
|
|
|
|
async def scanner_loop() -> None:
|
|
while True:
|
|
active_ips = await scan_subnet(
|
|
state=state,
|
|
subnet_cidr=settings.scan_subnet,
|
|
timeout_seconds=settings.scan_timeout_seconds,
|
|
concurrency=settings.scan_concurrency,
|
|
)
|
|
await state.mark_offline_missing(active_ips)
|
|
await asyncio.sleep(settings.scan_interval_seconds)
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(_: FastAPI):
|
|
tasks = [asyncio.create_task(scanner_loop(), name="scanner-loop")]
|
|
if settings.dns_log_path:
|
|
tasks.append(asyncio.create_task(tail_dns_log(state, settings.dns_log_path), name="dns-tail-loop"))
|
|
try:
|
|
yield
|
|
finally:
|
|
for task in tasks:
|
|
task.cancel()
|
|
for task in tasks:
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
|
|
app = FastAPI(title="Network Topology Wrapper", lifespan=lifespan)
|
|
|
|
static_dir = Path(__file__).resolve().parents[2] / "frontend"
|
|
app.mount("/static", StaticFiles(directory=static_dir), name="static")
|
|
|
|
|
|
@app.get("/")
|
|
def index() -> FileResponse:
|
|
return FileResponse(static_dir / "index.html")
|
|
|
|
|
|
@app.get("/healthz")
|
|
def healthz() -> dict[str, str]:
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.get("/api/topology")
|
|
def api_topology() -> dict:
|
|
return state.topology_payload(
|
|
core_router_ip=settings.core_router_ip,
|
|
core_router_name=settings.core_router_name,
|
|
core_switch_ip=settings.core_switch_ip,
|
|
core_switch_name=settings.core_switch_name,
|
|
)
|
|
|
|
|
|
@app.post("/api/ingest/dns")
|
|
async def ingest_dns(event: DnsIngestEvent) -> dict[str, str]:
|
|
await state.add_dns_event(
|
|
src_ip=event.src_ip,
|
|
qname=event.qname,
|
|
answers=event.answers,
|
|
ts=event.ts,
|
|
)
|
|
return {"status": "accepted"}
|
|
|
|
|
|
@app.post("/api/ingest/dhcp")
|
|
async def ingest_dhcp(event: DhcpIngestEvent) -> dict[str, str]:
|
|
await state.upsert_device(
|
|
ip=event.ip,
|
|
payload={
|
|
"mac": event.mac,
|
|
"hostname": event.hostname,
|
|
"name": event.hostname or event.ip,
|
|
"status": "online" if event.state != "offline" else "offline",
|
|
"source": "dhcp",
|
|
"type": "client",
|
|
},
|
|
)
|
|
return {"status": "accepted"}
|
|
|
|
|
|
@app.post("/api/ingest/flow")
|
|
async def ingest_flow(event: FlowIngestEvent) -> dict[str, str]:
|
|
await state.add_flow_event(
|
|
src_ip=event.src_ip,
|
|
dst_ip=event.dst_ip,
|
|
dst_host=event.dst_host,
|
|
protocol=event.protocol,
|
|
bytes_count=event.bytes_count,
|
|
ts=event.ts,
|
|
)
|
|
return {"status": "accepted"}
|
|
|
|
|
|
@app.get("/api/events/stream")
|
|
async def events_stream() -> StreamingResponse:
|
|
queue = state.subscribe()
|
|
|
|
async def stream():
|
|
try:
|
|
while True:
|
|
event = await queue.get()
|
|
payload = json.dumps(event, ensure_ascii=True)
|
|
yield f"data: {payload}\n\n"
|
|
finally:
|
|
state.unsubscribe(queue)
|
|
|
|
return StreamingResponse(stream(), media_type="text/event-stream")
|