34 lines
969 B
Python
34 lines
969 B
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import re
|
|
from datetime import datetime, timezone
|
|
|
|
from .state import NetworkState
|
|
|
|
# dnsmasq style: "query[A] example.com from 192.168.88.10"
|
|
DNS_QUERY_RE = re.compile(r"query\[[^\]]+\]\s+(?P<qname>\S+)\s+from\s+(?P<src_ip>\d+\.\d+\.\d+\.\d+)")
|
|
|
|
|
|
async def tail_dns_log(state: NetworkState, path: str) -> None:
|
|
if not path:
|
|
return
|
|
if not os.path.isfile(path):
|
|
return
|
|
|
|
with open(path, "r", encoding="utf-8", errors="ignore") as f:
|
|
f.seek(0, os.SEEK_END)
|
|
while True:
|
|
line = f.readline()
|
|
if not line:
|
|
await asyncio.sleep(0.5)
|
|
continue
|
|
|
|
m = DNS_QUERY_RE.search(line)
|
|
if not m:
|
|
continue
|
|
src_ip = m.group("src_ip")
|
|
qname = m.group("qname")
|
|
await state.add_dns_event(src_ip=src_ip, qname=qname, answers=[], ts=datetime.now(timezone.utc))
|