from __future__ import annotations import asyncio import os import re from datetime import datetime, timezone from .state import NetworkState # dnsmasq style: "query[A] example.com from 192.168.88.10" DNS_QUERY_RE = re.compile(r"query\[[^\]]+\]\s+(?P\S+)\s+from\s+(?P\d+\.\d+\.\d+\.\d+)") async def tail_dns_log(state: NetworkState, path: str) -> None: if not path: return if not os.path.isfile(path): return with open(path, "r", encoding="utf-8", errors="ignore") as f: f.seek(0, os.SEEK_END) while True: line = f.readline() if not line: await asyncio.sleep(0.5) continue m = DNS_QUERY_RE.search(line) if not m: continue src_ip = m.group("src_ip") qname = m.group("qname") await state.add_dns_event(src_ip=src_ip, qname=qname, answers=[], ts=datetime.now(timezone.utc))