Files
netbird-iac/watcher/netbird_watcher.py
Prox ca546ff6d8
All checks were successful
Terraform / terraform (push) Successful in 7s
added netbird-watcher script
2026-02-15 19:11:39 +02:00

349 lines
12 KiB
Python

#!/usr/bin/env python3
"""
NetBird Peer Renamer Watcher
Polls NetBird API for consumed setup keys and automatically renames
the enrolled peers to match the setup key name.
Setup key name = desired peer name.
Usage:
./netbird_watcher.py --config /etc/netbird-watcher/config.json
./netbird_watcher.py --url https://netbird.example.com --token nbp_xxx
Environment variables (alternative to flags):
NETBIRD_URL - NetBird management URL
NETBIRD_TOKEN - NetBird API token (PAT)
"""
import argparse
import json
import logging
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
# -----------------------------------------------------------------------------
# Configuration
# -----------------------------------------------------------------------------
DEFAULT_STATE_FILE = "/var/lib/netbird-watcher/state.json"
DEFAULT_POLL_INTERVAL = 30 # seconds
PEER_MATCH_WINDOW = 60 # seconds - how close peer creation must be to key usage
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
def setup_logging(verbose: bool = False) -> logging.Logger:
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
return logging.getLogger("netbird-watcher")
# -----------------------------------------------------------------------------
# State Management
# -----------------------------------------------------------------------------
class State:
"""Tracks which setup keys have been processed."""
def __init__(self, state_file: str):
self.state_file = Path(state_file)
self.processed_keys: set[str] = set()
self._load()
def _load(self):
if self.state_file.exists():
try:
data = json.loads(self.state_file.read_text())
self.processed_keys = set(data.get("processed_keys", []))
except (json.JSONDecodeError, IOError) as e:
logging.warning(f"Failed to load state file: {e}")
self.processed_keys = set()
def save(self):
self.state_file.parent.mkdir(parents=True, exist_ok=True)
data = {"processed_keys": list(self.processed_keys)}
self.state_file.write_text(json.dumps(data, indent=2))
def is_processed(self, key_id: str) -> bool:
return key_id in self.processed_keys
def mark_processed(self, key_id: str):
self.processed_keys.add(key_id)
self.save()
# -----------------------------------------------------------------------------
# NetBird API Client
# -----------------------------------------------------------------------------
class NetBirdAPI:
"""Simple NetBird API client."""
def __init__(self, url: str, token: str):
self.base_url = url.rstrip("/")
self.token = token
self.logger = logging.getLogger("netbird-api")
def _request(self, method: str, endpoint: str, data: Optional[dict] = None) -> dict:
url = f"{self.base_url}/api{endpoint}"
headers = {
"Authorization": f"Token {self.token}",
"Content-Type": "application/json",
}
body = json.dumps(data).encode() if data else None
req = Request(url, data=body, headers=headers, method=method)
try:
with urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except HTTPError as e:
self.logger.error(f"HTTP {e.code} for {method} {endpoint}: {e.read().decode()}")
raise
except URLError as e:
self.logger.error(f"URL error for {method} {endpoint}: {e}")
raise
def get_setup_keys(self) -> list[dict]:
result = self._request("GET", "/setup-keys")
return result if isinstance(result, list) else []
def get_peers(self) -> list[dict]:
result = self._request("GET", "/peers")
return result if isinstance(result, list) else []
def rename_peer(self, peer_id: str, new_name: str) -> dict:
return self._request("PUT", f"/peers/{peer_id}", {"name": new_name})
# -----------------------------------------------------------------------------
# Watcher Logic
# -----------------------------------------------------------------------------
def parse_timestamp(ts: str) -> Optional[datetime]:
"""Parse NetBird timestamp to datetime."""
if not ts or ts.startswith("0001-01-01"):
return None
try:
# Handle various formats
ts = ts.replace("Z", "+00:00")
if "." in ts:
# Truncate nanoseconds to microseconds
parts = ts.split(".")
frac = parts[1]
tz_idx = frac.find("+") if "+" in frac else frac.find("-") if "-" in frac else len(frac)
frac_sec = frac[:tz_idx][:6] # max 6 digits for microseconds
tz_part = frac[tz_idx:] if tz_idx < len(frac) else "+00:00"
ts = f"{parts[0]}.{frac_sec}{tz_part}"
return datetime.fromisoformat(ts)
except ValueError:
return None
def find_matching_peer(
key_name: str,
key_last_used: datetime,
key_auto_groups: list[str],
peers: list[dict],
logger: logging.Logger,
) -> Optional[dict]:
"""
Find the peer that enrolled using this setup key.
Strategy:
1. Peer must be in one of the key's auto_groups
2. Peer must have been created within PEER_MATCH_WINDOW of key usage
3. Peer name should NOT already match key name (not yet renamed)
4. Pick the closest match by creation time
"""
candidates = []
for peer in peers:
peer_name = peer.get("name", "")
peer_id = peer.get("id", "")
peer_groups = [g.get("id") for g in peer.get("groups", [])]
# Skip if already renamed to target name
if peer_name == key_name:
logger.debug(f"Peer {peer_id} already named '{key_name}', skipping")
continue
# Check group membership
if not any(g in peer_groups for g in key_auto_groups):
continue
# Check creation time
# Note: NetBird peer object doesn't have 'created_at' directly accessible
# We use 'last_seen' or 'connected' status as proxy
# Actually, let's use the peer's 'connected' first time or 'last_seen'
# Looking at NetBird API, peers have 'last_seen' but not always 'created_at'
# For newly enrolled peers, last_seen should be very recent
peer_last_seen = parse_timestamp(peer.get("last_seen", ""))
if peer_last_seen:
time_diff = abs((peer_last_seen - key_last_used).total_seconds())
if time_diff <= PEER_MATCH_WINDOW:
candidates.append((peer, time_diff))
logger.debug(
f"Candidate peer: {peer_name} ({peer_id}), "
f"last_seen={peer_last_seen}, diff={time_diff:.1f}s"
)
if not candidates:
return None
# Return closest match
candidates.sort(key=lambda x: x[1])
return candidates[0][0]
def process_consumed_keys(
api: NetBirdAPI,
state: State,
logger: logging.Logger,
) -> int:
"""
Process all consumed setup keys and rename their peers.
Returns count of peers renamed.
"""
renamed_count = 0
try:
setup_keys = api.get_setup_keys()
peers = api.get_peers()
except Exception as e:
logger.error(f"Failed to fetch data from NetBird API: {e}")
return 0
for key in setup_keys:
key_id = key.get("id", "")
key_name = key.get("name", "")
used_times = key.get("used_times", 0)
last_used_str = key.get("last_used", "")
auto_groups = key.get("auto_groups", [])
# Skip if not consumed
if used_times == 0:
continue
# Skip if already processed
if state.is_processed(key_id):
continue
# Skip system/default keys (optional - adjust pattern as needed)
if key_name.lower() in ("default", "all"):
state.mark_processed(key_id)
continue
last_used = parse_timestamp(last_used_str)
if not last_used:
logger.warning(f"Key '{key_name}' ({key_id}) has invalid last_used: {last_used_str}")
state.mark_processed(key_id)
continue
logger.info(f"Processing consumed key: '{key_name}' (used_times={used_times})")
# Find matching peer
peer = find_matching_peer(key_name, last_used, auto_groups, peers, logger)
if peer:
peer_id = peer.get("id", "")
old_name = peer.get("name", "unknown")
if not peer_id:
logger.error(f"Peer has no ID, cannot rename")
state.mark_processed(key_id)
continue
try:
api.rename_peer(peer_id, key_name)
logger.info(f"Renamed peer '{old_name}' -> '{key_name}' (id={peer_id})")
renamed_count += 1
except Exception as e:
logger.error(f"Failed to rename peer {peer_id}: {e}")
else:
logger.warning(
f"No matching peer found for key '{key_name}'. "
f"Peer may not have enrolled yet or was already renamed."
)
# Mark as processed regardless (to avoid infinite retries)
state.mark_processed(key_id)
return renamed_count
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="NetBird Peer Renamer Watcher")
parser.add_argument("--url", help="NetBird management URL")
parser.add_argument("--token", help="NetBird API token")
parser.add_argument("--config", help="Path to config JSON file")
parser.add_argument("--state-file", default=DEFAULT_STATE_FILE, help="Path to state file")
parser.add_argument("--interval", type=int, default=DEFAULT_POLL_INTERVAL, help="Poll interval in seconds")
parser.add_argument("--once", action="store_true", help="Run once and exit (for cron)")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose logging")
args = parser.parse_args()
logger = setup_logging(args.verbose)
# Load config
url = args.url or os.environ.get("NETBIRD_URL")
token = args.token or os.environ.get("NETBIRD_TOKEN")
if args.config:
try:
config = json.loads(Path(args.config).read_text())
url = url or config.get("url")
token = token or config.get("token")
except Exception as e:
logger.error(f"Failed to load config file: {e}")
sys.exit(1)
if not url or not token:
logger.error("NetBird URL and token are required. Use --url/--token, env vars, or --config")
sys.exit(1)
# Initialize
api = NetBirdAPI(url, token)
state = State(args.state_file)
logger.info(f"NetBird Watcher started (url={url}, interval={args.interval}s)")
if args.once:
# Single run mode (for cron)
count = process_consumed_keys(api, state, logger)
logger.info(f"Processed {count} peer(s)")
else:
# Daemon mode
while True:
try:
count = process_consumed_keys(api, state, logger)
if count > 0:
logger.info(f"Processed {count} peer(s) this cycle")
except Exception as e:
logger.exception(f"Error in processing cycle: {e}")
time.sleep(args.interval)
if __name__ == "__main__":
main()