commit 70bec7b8427d070338f0f1ba25bdfe1bc7efa188 Author: Lauri Võsandi Date: Mon Aug 15 21:50:51 2022 +0300 Initial commit diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..e7b5dfa --- /dev/null +++ b/.drone.yml @@ -0,0 +1,2 @@ +kind: template +load: docker.yaml diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9de2cd0 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM harbor.k-space.ee/k-space/microservice-base +RUN pip install psutil scapy +RUN apk add libpcap +ADD netstat-sniffer.py /netstat-sniffer.py +ENTRYPOINT /netstat-sniffer.py diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..72e7955 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,7 @@ +version: '3.7' +services: + app: + image: harbor.k-space.ee/k-space/netstat-sniffer + network_mode: host + build: + context: . diff --git a/netstat-sniffer.py b/netstat-sniffer.py new file mode 100755 index 0000000..ca28bbd --- /dev/null +++ b/netstat-sniffer.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python + +import collections +import psutil +from datetime import datetime +from math import inf, ceil +from prometheus_client import start_http_server, Counter, Gauge, Histogram +from scapy.all import AsyncSniffer, DNS +from time import sleep, time + +# For interesting ports we will record entries even if reverse DNS lookup fails +INTERESTING_PORTS = 53, 80, 443, 27017, 3306, 5432, 6379, 16379, 26379, 11211 +EPHEMERAL_PORT_RANGE_START = 32000 + +reverse_mapping = {} +mapping = {} + +dns_snooped_packets = Counter( + "netstat_sniffer_dns_snooped_packets", + "Count of snooped packets on port 53") +dns_reverse_map_entries = Gauge( + "netstat_sniffer_dns_reverse_map_entries", + "Number of entries in the DNS reverse lookup table") +dns_reverse_lookup_failures_total = Counter( + "netstat_sniffer_dns_reverse_lookup_failures_total", + "Reverse DNS records missing in the DNS snooper cache") +histogram_outbound = Histogram( + "netstat_sniffer_outbound_connection_duration_seconds", + "Outbound connection duration in seconds", + labelnames=("addr", "port"), + buckets=(1, 5, 10, 50, 100, 500, 1000, inf)) +outbound_connection_count = Gauge( + "netstat_sniffer_outbound_connection_count", + "Total outbound connections", + labelnames=("addr", "port", "status")) + + +def normalize_addr(j): + if j.startswith("::ffff:"): + # Normalize IPv6 mapped addresses + j = j[7:] + return j + + +def poller(): + prev_labels = set() + while True: + counts = collections.Counter() + now = time() + sleep(ceil(now) - now) + now = datetime.utcnow() + for j in psutil.net_connections(): + if not j.raddr: + # Skip listening sockets + continue + laddr, raddr = normalize_addr(j.laddr[0]), normalize_addr(j.raddr[0]) + if laddr.startswith("127."): + # Skip localhost + continue + + key = laddr, j.laddr[1], raddr, j.raddr[1] + + if key not in mapping: + mapping[key] = {"first_seen": now} + + mapping[key]["last_seen"] = now + mapping[key]["status"] = j.status + + # Observe connection duration when connection disappears + for key, value in list(mapping.items()): + laddr, lport, raddr, rport = key + duration = (value["last_seen"] - value["first_seen"]).total_seconds() + if duration > 2: + if value["status"] == "SYN_SENT": + counts[(raddr, rport, "pending")] += 1 + elif value["status"] == "ESTABLISHED" and lport >= EPHEMERAL_PORT_RANGE_START and rport < EPHEMERAL_PORT_RANGE_START: + counts[(raddr, rport, "established")] += 1 + + if value["last_seen"] < now: + mapping.pop(key) + if rport >= EPHEMERAL_PORT_RANGE_START: + NotImplemented + else: + try: + raddr = reverse_mapping[raddr] + except KeyError: + dns_reverse_lookup_failures_total.inc() + else: + histogram_outbound.labels(raddr, rport).observe(duration) + + used_labels = set() + for (raddr, rport, status), count in counts.items(): + try: + raddr = reverse_mapping[raddr] + except KeyError: + dns_reverse_lookup_failures_total.inc() + key = raddr, rport, status + outbound_connection_count.labels(*key).set(count) + used_labels.add(key) + prev_labels.discard(key) + print("Adding label:", key) + + for key in prev_labels: + print("Removing label:", key) + outbound_connection_count.remove(*key) + + prev_labels = used_labels + + +def process_packet(p): + dns_snooped_packets.inc() + if not p.haslayer(DNS): + return + if not p.an: + return + for an in p.an: + reverse_mapping[an.rdata] = an.rrname.decode("ascii").lower().rstrip(".") + dns_reverse_map_entries.set(len(reverse_mapping)) + + +if __name__ == "__main__": + start_http_server(39680) + + # Sniff default route (?) + sniffer = AsyncSniffer(prn=process_packet, filter="port 53", store=False) + sniffer.start() + + # Sniff local DNS cache + sniffer2 = AsyncSniffer(iface="lo", prn=process_packet, filter="port 53", store=False) + sniffer2.start() + poller()