import socket import threading import time from typing import Dict, Optional from zeroconf import ( DNSAddress, DNSOutgoing, DNSRecord, DNSQuestion, RecordUpdateListener, Zeroconf, ) _CLASS_IN = 1 _FLAGS_QR_QUERY = 0x0000 # query _TYPE_A = 1 class HostResolver(RecordUpdateListener): def __init__(self, name: str): self.name = name self.address: Optional[bytes] = None def update_record(self, zc: Zeroconf, now: float, record: DNSRecord) -> None: if record is None: return if record.type == _TYPE_A: assert isinstance(record, DNSAddress) if record.name == self.name: self.address = record.address def request(self, zc: Zeroconf, timeout: float) -> bool: now = time.time() delay = 0.2 next_ = now + delay last = now + timeout try: zc.add_listener(self, None) while self.address is None: if last <= now: # Timeout return False if next_ <= now: out = DNSOutgoing(_FLAGS_QR_QUERY) out.add_question(DNSQuestion(self.name, _TYPE_A, _CLASS_IN)) zc.send(out) next_ = now + delay delay *= 2 time.sleep(min(next_, last) - now) now = time.time() finally: zc.remove_listener(self) return True class DashboardStatus(RecordUpdateListener, threading.Thread): PING_AFTER = 15 * 1000 # Send new mDNS request after 15 seconds OFFLINE_AFTER = PING_AFTER * 2 # Offline if no mDNS response after 30 seconds def __init__(self, zc: Zeroconf, on_update) -> None: threading.Thread.__init__(self) self.zc = zc self.query_hosts: set[str] = set() self.key_to_host: Dict[str, str] = {} self.stop_event = threading.Event() self.query_event = threading.Event() self.on_update = on_update def update_record(self, zc: Zeroconf, now: float, record: DNSRecord) -> None: pass def request_query(self, hosts: Dict[str, str]) -> None: self.query_hosts = set(hosts.values()) self.key_to_host = hosts self.query_event.set() def stop(self) -> None: self.stop_event.set() self.query_event.set() def host_status(self, key: str) -> bool: entries = self.zc.cache.entries_with_name(key) if not entries: return False now = time.time() * 1000 return any( (entry.created + DashboardStatus.OFFLINE_AFTER) >= now for entry in entries ) def run(self) -> None: self.zc.add_listener(self, None) while not self.stop_event.is_set(): self.on_update( {key: self.host_status(host) for key, host in self.key_to_host.items()} ) now = time.time() * 1000 for host in self.query_hosts: entries = self.zc.cache.entries_with_name(host) if not entries or all( (entry.created + DashboardStatus.PING_AFTER) <= now for entry in entries ): out = DNSOutgoing(_FLAGS_QR_QUERY) out.add_question(DNSQuestion(host, _TYPE_A, _CLASS_IN)) self.zc.send(out) self.query_event.wait() self.query_event.clear() self.zc.remove_listener(self) class EsphomeZeroconf(Zeroconf): def resolve_host(self, host: str, timeout=3.0): info = HostResolver(host) if info.request(self, timeout): return socket.inet_ntoa(info.address) return None