Files
2026-05-19 14:53:36 +02:00

101 lines
2.6 KiB
Python

from __future__ import annotations
import asyncio
import ipaddress
import socket
from typing import Any
from .state import NetworkState, utc_now
COMMON_PORTS = (22, 53, 80, 443, 8291)
VENDOR_HINTS = {
"mikrotik": "MikroTik",
"cisco": "Cisco",
"ubiquiti": "Ubiquiti",
"raspberry": "Raspberry Pi",
}
def guess_device_type(hostname: str | None, open_port: int | None) -> str:
if not hostname and open_port is None:
return "unknown"
hn = (hostname or "").lower()
if "rb4011" in hn or "router" in hn:
return "router"
if "crs" in hn or "switch" in hn or "slm" in hn:
return "switch"
if "cap" in hn or "ap" in hn or "wifi" in hn:
return "ap"
if open_port == 8291:
return "mikrotik"
if open_port == 53:
return "dns"
return "client"
def guess_vendor(hostname: str | None) -> str | None:
if not hostname:
return None
hn = hostname.lower()
for token, vendor in VENDOR_HINTS.items():
if token in hn:
return vendor
return None
async def probe_host(ip: str, timeout: float) -> tuple[bool, int | None]:
for port in COMMON_PORTS:
try:
conn = asyncio.open_connection(ip, port)
reader, writer = await asyncio.wait_for(conn, timeout=timeout)
writer.close()
await writer.wait_closed()
return True, port
except Exception:
continue
return False, None
def reverse_dns(ip: str) -> str | None:
try:
return socket.gethostbyaddr(ip)[0]
except Exception:
return None
async def scan_subnet(
state: NetworkState,
subnet_cidr: str,
timeout_seconds: float,
concurrency: int,
) -> set[str]:
network = ipaddress.ip_network(subnet_cidr, strict=False)
active: set[str] = set()
sem = asyncio.Semaphore(concurrency)
async def one(ip_obj: Any) -> None:
ip = str(ip_obj)
async with sem:
is_up, port = await probe_host(ip, timeout_seconds)
if not is_up:
return
hostname = reverse_dns(ip)
active.add(ip)
await state.upsert_device(
ip,
{
"hostname": hostname,
"name": hostname or ip,
"type": guess_device_type(hostname, port),
"vendor": guess_vendor(hostname),
"status": "online",
"last_seen": utc_now(),
"source": "scanner",
},
)
await asyncio.gather(*(one(ip) for ip in network.hosts()))
return active