#!/usr/bin/env python import collections import psutil from datetime import datetime from math import inf, ceil from prometheus_client import start_http_server, Counter, Gauge, Histogram from scapy.all import AsyncSniffer, DNS from time import sleep, time # For interesting ports we will record entries even if reverse DNS lookup fails INTERESTING_PORTS = 53, 80, 443, 27017, 3306, 5432, 6379, 16379, 26379, 11211 EPHEMERAL_PORT_RANGE_START = 32000 reverse_mapping = {} mapping = {} dns_snooped_packets = Counter( "netstat_sniffer_dns_snooped_packets", "Count of snooped packets on port 53") dns_reverse_map_entries = Gauge( "netstat_sniffer_dns_reverse_map_entries", "Number of entries in the DNS reverse lookup table") dns_reverse_lookup_failures_total = Counter( "netstat_sniffer_dns_reverse_lookup_failures_total", "Reverse DNS records missing in the DNS snooper cache") histogram_outbound = Histogram( "netstat_sniffer_outbound_connection_duration_seconds", "Outbound connection duration in seconds", labelnames=("addr", "port"), buckets=(1, 5, 10, 50, 100, 500, 1000, inf)) outbound_connection_count = Gauge( "netstat_sniffer_outbound_connection_count", "Total outbound connections", labelnames=("addr", "port", "status")) def normalize_addr(j): if j.startswith("::ffff:"): # Normalize IPv6 mapped addresses j = j[7:] return j def poller(): prev_labels = set() while True: counts = collections.Counter() now = time() sleep(ceil(now) - now) now = datetime.utcnow() for j in psutil.net_connections(): if not j.raddr: # Skip listening sockets continue laddr, raddr = normalize_addr(j.laddr[0]), normalize_addr(j.raddr[0]) if laddr.startswith("127."): # Skip localhost continue key = laddr, j.laddr[1], raddr, j.raddr[1] if key not in mapping: mapping[key] = {"first_seen": now} mapping[key]["last_seen"] = now mapping[key]["status"] = j.status # Observe connection duration when connection disappears for key, value in list(mapping.items()): laddr, lport, raddr, rport = key duration = (value["last_seen"] - value["first_seen"]).total_seconds() if duration > 2: if value["status"] == "SYN_SENT": counts[(raddr, rport, "pending")] += 1 elif value["status"] == "ESTABLISHED" and lport >= EPHEMERAL_PORT_RANGE_START and rport < EPHEMERAL_PORT_RANGE_START: counts[(raddr, rport, "established")] += 1 if value["last_seen"] < now: mapping.pop(key) if rport >= EPHEMERAL_PORT_RANGE_START: NotImplemented else: try: raddr = reverse_mapping[raddr] except KeyError: dns_reverse_lookup_failures_total.inc() else: histogram_outbound.labels(raddr, rport).observe(duration) used_labels = set() for (raddr, rport, status), count in counts.items(): try: raddr = reverse_mapping[raddr] except KeyError: dns_reverse_lookup_failures_total.inc() key = raddr, rport, status outbound_connection_count.labels(*key).set(count) used_labels.add(key) prev_labels.discard(key) print("Adding label:", key) for key in prev_labels: print("Removing label:", key) outbound_connection_count.remove(*key) prev_labels = used_labels def process_packet(p): dns_snooped_packets.inc() if not p.haslayer(DNS): return if not p.an: return for an in p.an: reverse_mapping[an.rdata] = an.rrname.decode("ascii").lower().rstrip(".") dns_reverse_map_entries.set(len(reverse_mapping)) if __name__ == "__main__": start_http_server(39680) # Sniff default route (?) sniffer = AsyncSniffer(prn=process_packet, filter="port 53", store=False) sniffer.start() # Sniff local DNS cache sniffer2 = AsyncSniffer(iface="lo", prn=process_packet, filter="port 53", store=False) sniffer2.start() poller()