#!/usr/bin/env python3 """ NetBird Peer Renamer Watcher Polls NetBird API for consumed setup keys and automatically renames the enrolled peers to match the setup key name. Setup key name = desired peer name. Usage: ./netbird_watcher.py --config /etc/netbird-watcher/config.json ./netbird_watcher.py --url https://netbird.example.com --token nbp_xxx Environment variables (alternative to flags): NETBIRD_URL - NetBird management URL NETBIRD_TOKEN - NetBird API token (PAT) """ import argparse import json import logging import os import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Optional from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen # ----------------------------------------------------------------------------- # Configuration # ----------------------------------------------------------------------------- DEFAULT_STATE_FILE = "/var/lib/netbird-watcher/state.json" DEFAULT_POLL_INTERVAL = 30 # seconds PEER_MATCH_WINDOW = 60 # seconds - how close peer creation must be to key usage # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- def setup_logging(verbose: bool = False) -> logging.Logger: level = logging.DEBUG if verbose else logging.INFO logging.basicConfig( level=level, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) return logging.getLogger("netbird-watcher") # ----------------------------------------------------------------------------- # State Management # ----------------------------------------------------------------------------- class State: """Tracks which setup keys have been processed.""" def __init__(self, state_file: str): self.state_file = Path(state_file) self.processed_keys: set[str] = set() self._load() def _load(self): if self.state_file.exists(): try: data = json.loads(self.state_file.read_text()) self.processed_keys = set(data.get("processed_keys", [])) except (json.JSONDecodeError, IOError) as e: logging.warning(f"Failed to load state file: {e}") self.processed_keys = set() def save(self): self.state_file.parent.mkdir(parents=True, exist_ok=True) data = {"processed_keys": list(self.processed_keys)} self.state_file.write_text(json.dumps(data, indent=2)) def is_processed(self, key_id: str) -> bool: return key_id in self.processed_keys def mark_processed(self, key_id: str): self.processed_keys.add(key_id) self.save() # ----------------------------------------------------------------------------- # NetBird API Client # ----------------------------------------------------------------------------- class NetBirdAPI: """Simple NetBird API client.""" def __init__(self, url: str, token: str): self.base_url = url.rstrip("/") self.token = token self.logger = logging.getLogger("netbird-api") def _request(self, method: str, endpoint: str, data: Optional[dict] = None) -> dict: url = f"{self.base_url}/api{endpoint}" headers = { "Authorization": f"Token {self.token}", "Content-Type": "application/json", } body = json.dumps(data).encode() if data else None req = Request(url, data=body, headers=headers, method=method) try: with urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) except HTTPError as e: self.logger.error(f"HTTP {e.code} for {method} {endpoint}: {e.read().decode()}") raise except URLError as e: self.logger.error(f"URL error for {method} {endpoint}: {e}") raise def get_setup_keys(self) -> list[dict]: result = self._request("GET", "/setup-keys") return result if isinstance(result, list) else [] def get_peers(self) -> list[dict]: result = self._request("GET", "/peers") return result if isinstance(result, list) else [] def rename_peer(self, peer_id: str, new_name: str) -> dict: return self._request("PUT", f"/peers/{peer_id}", {"name": new_name}) # ----------------------------------------------------------------------------- # Watcher Logic # ----------------------------------------------------------------------------- def parse_timestamp(ts: str) -> Optional[datetime]: """Parse NetBird timestamp to datetime.""" if not ts or ts.startswith("0001-01-01"): return None try: # Handle various formats ts = ts.replace("Z", "+00:00") if "." in ts: # Truncate nanoseconds to microseconds parts = ts.split(".") frac = parts[1] tz_idx = frac.find("+") if "+" in frac else frac.find("-") if "-" in frac else len(frac) frac_sec = frac[:tz_idx][:6] # max 6 digits for microseconds tz_part = frac[tz_idx:] if tz_idx < len(frac) else "+00:00" ts = f"{parts[0]}.{frac_sec}{tz_part}" return datetime.fromisoformat(ts) except ValueError: return None def find_matching_peer( key_name: str, key_last_used: datetime, key_auto_groups: list[str], peers: list[dict], logger: logging.Logger, ) -> Optional[dict]: """ Find the peer that enrolled using this setup key. Strategy: 1. Peer must be in one of the key's auto_groups 2. Peer must have been created within PEER_MATCH_WINDOW of key usage 3. Peer name should NOT already match key name (not yet renamed) 4. Pick the closest match by creation time """ candidates = [] for peer in peers: peer_name = peer.get("name", "") peer_id = peer.get("id", "") peer_groups = [g.get("id") for g in peer.get("groups", [])] # Skip if already renamed to target name if peer_name == key_name: logger.debug(f"Peer {peer_id} already named '{key_name}', skipping") continue # Check group membership if not any(g in peer_groups for g in key_auto_groups): continue # Check creation time # Note: NetBird peer object doesn't have 'created_at' directly accessible # We use 'last_seen' or 'connected' status as proxy # Actually, let's use the peer's 'connected' first time or 'last_seen' # Looking at NetBird API, peers have 'last_seen' but not always 'created_at' # For newly enrolled peers, last_seen should be very recent peer_last_seen = parse_timestamp(peer.get("last_seen", "")) if peer_last_seen: time_diff = abs((peer_last_seen - key_last_used).total_seconds()) if time_diff <= PEER_MATCH_WINDOW: candidates.append((peer, time_diff)) logger.debug( f"Candidate peer: {peer_name} ({peer_id}), " f"last_seen={peer_last_seen}, diff={time_diff:.1f}s" ) if not candidates: return None # Return closest match candidates.sort(key=lambda x: x[1]) return candidates[0][0] def process_consumed_keys( api: NetBirdAPI, state: State, logger: logging.Logger, ) -> int: """ Process all consumed setup keys and rename their peers. Returns count of peers renamed. """ renamed_count = 0 try: setup_keys = api.get_setup_keys() peers = api.get_peers() except Exception as e: logger.error(f"Failed to fetch data from NetBird API: {e}") return 0 for key in setup_keys: key_id = key.get("id", "") key_name = key.get("name", "") used_times = key.get("used_times", 0) last_used_str = key.get("last_used", "") auto_groups = key.get("auto_groups", []) # Skip if not consumed if used_times == 0: continue # Skip if already processed if state.is_processed(key_id): continue # Skip system/default keys (optional - adjust pattern as needed) if key_name.lower() in ("default", "all"): state.mark_processed(key_id) continue last_used = parse_timestamp(last_used_str) if not last_used: logger.warning(f"Key '{key_name}' ({key_id}) has invalid last_used: {last_used_str}") state.mark_processed(key_id) continue logger.info(f"Processing consumed key: '{key_name}' (used_times={used_times})") # Find matching peer peer = find_matching_peer(key_name, last_used, auto_groups, peers, logger) if peer: peer_id = peer.get("id", "") old_name = peer.get("name", "unknown") if not peer_id: logger.error(f"Peer has no ID, cannot rename") state.mark_processed(key_id) continue try: api.rename_peer(peer_id, key_name) logger.info(f"Renamed peer '{old_name}' -> '{key_name}' (id={peer_id})") renamed_count += 1 except Exception as e: logger.error(f"Failed to rename peer {peer_id}: {e}") else: logger.warning( f"No matching peer found for key '{key_name}'. " f"Peer may not have enrolled yet or was already renamed." ) # Mark as processed regardless (to avoid infinite retries) state.mark_processed(key_id) return renamed_count # ----------------------------------------------------------------------------- # Main # ----------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="NetBird Peer Renamer Watcher") parser.add_argument("--url", help="NetBird management URL") parser.add_argument("--token", help="NetBird API token") parser.add_argument("--config", help="Path to config JSON file") parser.add_argument("--state-file", default=DEFAULT_STATE_FILE, help="Path to state file") parser.add_argument("--interval", type=int, default=DEFAULT_POLL_INTERVAL, help="Poll interval in seconds") parser.add_argument("--once", action="store_true", help="Run once and exit (for cron)") parser.add_argument("--verbose", "-v", action="store_true", help="Verbose logging") args = parser.parse_args() logger = setup_logging(args.verbose) # Load config url = args.url or os.environ.get("NETBIRD_URL") token = args.token or os.environ.get("NETBIRD_TOKEN") if args.config: try: config = json.loads(Path(args.config).read_text()) url = url or config.get("url") token = token or config.get("token") except Exception as e: logger.error(f"Failed to load config file: {e}") sys.exit(1) if not url or not token: logger.error("NetBird URL and token are required. Use --url/--token, env vars, or --config") sys.exit(1) # Initialize api = NetBirdAPI(url, token) state = State(args.state_file) logger.info(f"NetBird Watcher started (url={url}, interval={args.interval}s)") if args.once: # Single run mode (for cron) count = process_consumed_keys(api, state, logger) logger.info(f"Processed {count} peer(s)") else: # Daemon mode while True: try: count = process_consumed_keys(api, state, logger) if count > 0: logger.info(f"Processed {count} peer(s) this cycle") except Exception as e: logger.exception(f"Error in processing cycle: {e}") time.sleep(args.interval) if __name__ == "__main__": main()