Files
network-topology-wrapper/backend/app/dnslog.py
T
2026-05-19 14:53:36 +02:00

34 lines
969 B
Python

from __future__ import annotations
import asyncio
import os
import re
from datetime import datetime, timezone
from .state import NetworkState
# dnsmasq style: "query[A] example.com from 192.168.88.10"
DNS_QUERY_RE = re.compile(r"query\[[^\]]+\]\s+(?P<qname>\S+)\s+from\s+(?P<src_ip>\d+\.\d+\.\d+\.\d+)")
async def tail_dns_log(state: NetworkState, path: str) -> None:
if not path:
return
if not os.path.isfile(path):
return
with open(path, "r", encoding="utf-8", errors="ignore") as f:
f.seek(0, os.SEEK_END)
while True:
line = f.readline()
if not line:
await asyncio.sleep(0.5)
continue
m = DNS_QUERY_RE.search(line)
if not m:
continue
src_ip = m.group("src_ip")
qname = m.group("qname")
await state.add_dns_event(src_ip=src_ip, qname=qname, answers=[], ts=datetime.now(timezone.utc))