from __future__ import annotations import asyncio import ipaddress import socket from typing import Any from .state import NetworkState, utc_now COMMON_PORTS = (22, 53, 80, 443, 8291) VENDOR_HINTS = { "mikrotik": "MikroTik", "cisco": "Cisco", "ubiquiti": "Ubiquiti", "raspberry": "Raspberry Pi", } def guess_device_type(hostname: str | None, open_port: int | None) -> str: if not hostname and open_port is None: return "unknown" hn = (hostname or "").lower() if "rb4011" in hn or "router" in hn: return "router" if "crs" in hn or "switch" in hn or "slm" in hn: return "switch" if "cap" in hn or "ap" in hn or "wifi" in hn: return "ap" if open_port == 8291: return "mikrotik" if open_port == 53: return "dns" return "client" def guess_vendor(hostname: str | None) -> str | None: if not hostname: return None hn = hostname.lower() for token, vendor in VENDOR_HINTS.items(): if token in hn: return vendor return None async def probe_host(ip: str, timeout: float) -> tuple[bool, int | None]: for port in COMMON_PORTS: try: conn = asyncio.open_connection(ip, port) reader, writer = await asyncio.wait_for(conn, timeout=timeout) writer.close() await writer.wait_closed() return True, port except Exception: continue return False, None def reverse_dns(ip: str) -> str | None: try: return socket.gethostbyaddr(ip)[0] except Exception: return None async def scan_subnet( state: NetworkState, subnet_cidr: str, timeout_seconds: float, concurrency: int, ) -> set[str]: network = ipaddress.ip_network(subnet_cidr, strict=False) active: set[str] = set() sem = asyncio.Semaphore(concurrency) async def one(ip_obj: Any) -> None: ip = str(ip_obj) async with sem: is_up, port = await probe_host(ip, timeout_seconds) if not is_up: return hostname = reverse_dns(ip) active.add(ip) await state.upsert_device( ip, { "hostname": hostname, "name": hostname or ip, "type": guess_device_type(hostname, port), "vendor": guess_vendor(hostname), "status": "online", "last_seen": utc_now(), "source": "scanner", }, ) await asyncio.gather(*(one(ip) for ip in network.hosts())) return active