#!/usr/bin/python3 # -*- coding: UTF-8 -*- ''' dnstest version 1.7 (c) 2015-2024 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. Features: - check DNS zones on all nameservers and their matches Basic usage: dnstest.py [-v] [-l] domain [nameservers ...] Parameters: -v verbose usage -l zone listing Reverse DNS query: dnstest.py -r [-v] IP_address [nameservers ...] ENUM query: dnstest.py -e [-v] phone_number [nameservers ...] RBL query: dnstest.py -b blacklist_domain ipv4_address ''' import sys import socket import getopt import re import dns.resolver import dns.zone import dns.query import dns.reversename import dns.dnssec, dns.rdatatype, dns.message TIMEOUT = 1 FMT = "%-15s %s %s" class color: ''' Class for displaying colorful error messages. ''' def __init__(self): self.OK = self.green("OK") self.ERROR = self.red("ERROR") self.WARNING = self.yellow("WARNING") def green(self, s): return '\033[1;32m%-8s\033[m' % s def red(self, s): return '\033[1;31m%-8s\033[m' % s def yellow(self, s): return '\033[1;33m%-8s\033[m' % s color = color() def print_block(text, color, rows): if rows: print(FMT % (text, color, rows[0])) for row in rows[1:]: print(FMT % ("", color, row)) class DNSError(Exception): def __init__(self, name): self.name = name def __str__(self): return self.name def __eq__(self, other): if not isinstance(other, DNSError): return False return self.name == other.name def __ne__(self, other): if not isinstance(other, DNSError): return False return self.name != other.name class get_any: verbose_texts = [] def __init__(self, name, domain, rdtype=None, filter="."): self.kwargs = {} if name == "@": self.name = domain else: self.name = name+"."+domain if rdtype is not None: self.kwargs["rdtype"] = rdtype self.filter = re.compile(filter, re.I) self.verbose_texts = [] def __call__(self, ns): try: rds = self.resolver(ns).resolve(self.name, **self.kwargs) except dns.resolver.NoAnswer: self.verbose_texts.append('\tNoAnswer from %s' % ns) return DNSError('NoAnswer') except dns.resolver.NoNameservers: self.verbose_texts.append('\tNoNameservers from %s' % ns) return DNSError('NoNS') except dns.resolver.NXDOMAIN: self.verbose_texts.append('\tNXdomain from %s' % ns) return DNSError('NXdomain') except dns.exception.Timeout: self.verbose_texts.append('\tTimeout from %s' % ns) return DNSError('Timeout') ret = self.extract(rds) self.verbose_texts.append('\t%s from %s' % (ret, ns)) for rd in rds: if rd.rdtype == dns.rdatatype.A: name = rd.to_text() self.verbose_texts.append( "\t%s -> %s" % (name, self.reverse(name))) elif rd.rdtype == dns.rdatatype.MX: prio, name = rd.to_text().split(" ", 1) resolved = None for ipv in "A", "AAAA": try: for answer in dns.resolver.Resolver().resolve(name, ipv): resolved = answer.to_text() try: reverse = self.reverse(resolved) except Exception as err: reverse = "E:%s" % err self.verbose_texts.append( "\t%s -> %s -> %s" % (name, resolved, reverse) ) except dns.resolver.NoAnswer: pass except dns.resolver.NXDOMAIN: pass if resolved is None: print(FMT % ("", color.WARNING, "Unable to resolve %s!" % name)) return ret def extract(self, rds): return [rds.ttl, sorted([ rd.to_text() for rd in rds if self.filter.search(rd.to_text()) ])] def resolver(self, ns): dr = dns.resolver.Resolver() dr.nameservers = [ns] dr.timeout = TIMEOUT dr.lifetime = TIMEOUT return dr def fmt_ttl(self, value): groups = [ (3600*24*7, " weeks"), (3600*24, " days"), (3600, "h"), (60, "m") ] for val, name in groups: if value % val == 0: return "%d%s" % (value/val, name) return value def fmt(self, value): if type(value) == str or isinstance(value, DNSError): return value return "%s [ttl=%s]" % (', '.join(value[1]), self.fmt_ttl(value[0])) def reverse(self, value): rname = dns.reversename.from_address(value) try: return dns.resolver.resolve(rname.to_text(), 'PTR')[0].to_text() except dns.resolver.NXDOMAIN: return None class get_serial(get_any): def __init__(self, domain): self.kwargs = dict(rdtype="SOA") self.name = domain def extract(self, rds): return [rds.ttl, sorted([str(x.serial) for x in rds])] def check_count(nameservers, min_count=2): if len(nameservers) >= min_count: return color.OK return color.WARNING def name2ip(*hostnames): ns_ips = [] for name in hostnames: for answer in dns.resolver.resolve(name, "A"): ns_ips.append(answer.to_text()) try: for answer in dns.resolver.resolve(name, "AAAA"): ns_ips.append(answer.to_text()) except dns.resolver.NoAnswer: pass return ns_ips class nameservers: ''' DNS client class. ''' nameservers = [] ipaddr = re.compile('^([0-9.]+|[0-9a-fA-F:]+)$') def __init__(self, domain, nss=[], verbose=False): self.domain = domain self.verbose = verbose if nss: nss2 = [] for ns in nss: if not self.ipaddr.search(ns) and not ns.endswith("."): ns += "." nss2.append(ns) self.set_dns(nss2) self.load_ns_ips() else: self.load_dns() def load_dns(self): resolver = dns.resolver.Resolver() resolver.timeout = 5 resolver.lifetime = resolver.timeout answer = resolver.resolve(self.domain, 'NS') self.nameservers = sorted([ x.to_text() for x in answer.response.answer[0].items ]) self.load_ns_ips() return self.nameservers def set_dns(self, nameservers): self.nameservers = nameservers def load_ns_ips(self): self.ns_ips = [] self.ns2ip = {} for ns in self.nameservers: self.ns2ip[ns] = [] if self.ipaddr.search(ns): self.ns_ips.append(ns) self.ns2ip[ns].append(ns) else: for answer in dns.resolver.resolve(ns, "A"): self.ns_ips.append(answer.to_text()) self.ns2ip[ns].append(answer.to_text()) try: for answer in dns.resolver.resolve(ns, "AAAA"): self.ns_ips.append(answer.to_text()) self.ns2ip[ns].append(answer.to_text()) except dns.resolver.NoAnswer: pass return self.ns_ips def resolver(self, ns): dnsr = dns.resolver.Resolver() dnsr.nameservers = [ns] return dnsr def load_primary_ns(self): resolver = self.resolver(self.ns_ips[0]) try: answer = resolver.resolve(self.domain, 'SOA') except dns.resolver.NoAnswer: print(FMT % ("", color.ERROR, "Unable to retrieve SOA record!")) self.primary = list(self.ns2ip.keys())[0] return False except dns.resolver.NXDOMAIN: print(FMT % ("", color.ERROR, "Domain does not exist!")) self.primary = list(self.ns2ip.keys())[0] return False except dns.resolver.NoNameservers: print(FMT % ("", color.ERROR, "No nameservers found!")) self.primary = list(self.ns2ip.keys())[0] return False self.primary = answer[0].mname.to_text() if self.primary not in self.ns2ip: # load zone from primary NS print("Not a primary DNS, loading primary DNS zone from %s" % self.primary) self.nameservers.append(self.primary) self.load_ns_ips() return False return True def check_parent(self): ''' Check if parent domain NS servers are set same as in domain itself. ''' domain, tld = self.domain.split(".", 1) tld_ns = nameservers(tld) ns_ip = list(tld_ns.ns2ip.values())[0][0] #resolver = tld_ns.resolver(list(tld_ns.ns2ip.keys())[0]) query = dns.message.make_query(self.domain, dns.rdatatype.NS) response = dns.query.udp(query, ns_ip, timeout=5) if response.rcode() != dns.rcode.NOERROR: if response.rcode() == dns.rcode.NXDOMAIN: raise Exception('%s does not exist.' % domain) else: raise Exception('Error %s' % dns.rcode.to_text(response.rcode())) if len(response.authority) > 0: rrset = response.authority[0] else: rrset = response.answer[0] nss = [x.to_text() for x in rrset] if sorted(nss) == sorted(self.nameservers): print(FMT % ("Parent NS:", color.OK, "Match OK")) return True print(FMT % ("Parent NS:", color.ERROR, "%s != %s" % ( sorted(nss), sorted(self.nameservers) ))) return False class zone_checker: ''' Zone checker. This class is used to retrieve and check contents of a transferable zones. ''' nameservers = [] def __init__(self, domain, dnsc, verbose=False): self.domain = domain self.dnsc = dnsc self.verbose = verbose self.zones = {} for ns in dnsc.ns_ips: self.transfer(ns) def values(self): return list(self.zones.values()) def keys(self): primary_zone = self.zones[self.dnsc.ns2ip[self.dnsc.primary][0]] return [str(x) for x in list(primary_zone.keys())] def cmp(self, text, fx): value0 = None errors = [] for ns_ip in self.dnsc.ns_ips: try: value = fx(ns_ip) except dns.exception.Timeout as err: errors.append("Timeout on %s" % ns_ip) continue if value0 is None: value0 = value elif value != value0: errors.append("%s != %s" % (fx.fmt(value0), fx.fmt(value))) if errors: print_block(text, color.ERROR, errors) elif isinstance(value0, DNSError): print(FMT % (text, color.WARNING, value0)) else: print(FMT % (text, color.OK, fx.fmt(value0))) if self.verbose: print("\n".join(fx.verbose_texts)) def transfer(self, server): self.zones[server] = {} try: self.zones[server] = dns.zone.from_xfr( dns.query.xfr(server, self.domain, timeout=TIMEOUT, lifetime=TIMEOUT) ) except dns.exception.FormError as err: if self.verbose: print(" Transfer error: %s from %s (form error)" % (err, server)) except dns.exception.Timeout as err: if self.verbose: print(" Transfer error: %s from %s (timeout)" % (err, server)) except dns.exception.DNSException as err: if self.verbose: print(" Transfer error: %s from %s" % (err, server)) except socket.error as err: if self.verbose: print(" Transfer error: %s from %s (socket error)" % (err, server)) except EOFError as err: if self.verbose: print(" Transfer error: %s from %s (EOF)" % (err, server)) def sorted_rows(self, value): return '\n'.join(sorted(value.split('\n'))) def joined_node(self, z, name): if z in list(self.dnsc.ns2ip.keys()): z = self.zones[self.dnsc.ns2ip[z][0]] if not z: return '' node = z.get_node(name) if node is None: return "missing subdomain" rds = node.rdatasets return '\n'.join(sorted([self.sorted_rows(x.to_text()) for x in rds])) def dnssec_check(self): errors = 0 request = dns.message.make_query( self.domain, dns.rdatatype.DNSKEY, want_dnssec=True ) for ns_ip in self.dnsc.ns_ips: response = dns.query.udp(request, ns_ip) if response.rcode() != 0 or len(response.answer) != 2: print(FMT % ("DNSSEC:", color.WARNING, "NO DNSKEY?")) return name = dns.name.from_text(self.domain) try: dns.dnssec.validate( response.answer[0], response.answer[1], {name: response.answer[0]} ) except dns.dnssec.ValidationFailure as err: errors += 1 print(FMT % ("DNSSEC:", color.ERROR, "%s on %s" % (err, ns_ip))) if errors == 0: print(FMT % ("DNSSEC:", color.OK, "Present and validated")) class subdomain_checker(zone_checker): ''' Subdomain checker. This class is used to check some predefined subdomains of an zone, which cann't be transfered (transfer disabled on server). ''' subdomains = ['@', 'www', '*', 'mail', 'smtp', 'pop3', 'imap'] rdtypes = 'A|AAAA|MX|CNAME|NS|TXT'.split("|") def keys(self): return self.subdomains def values(self): return self.dnsc.ns_ips def joined_node(self, z, name): if name == "@": name = self.domain else: name = name+"."+self.domain if z in self.dnsc.ns2ip: z = self.dnsc.ns2ip[z][0] resolver = self.dnsc.resolver(z) ret = [] for rdtype in self.rdtypes: try: for answer in resolver.resolve(name, rdtype): ret.append("IN %s %s" % (rdtype, answer.to_text())) except dns.resolver.NoAnswer: pass except dns.resolver.NoNameservers: pass except dns.resolver.NXDOMAIN: pass return '\n'.join(sorted(ret)) def main(domain, nss, args): ''' Main function, which can be called from other scripts. ''' verbose = '-v' in args list_all = '-l' in args print(FMT % ("Domain:", color.OK, domain)) sys.stdout.flush() try: dnsc = nameservers(domain, nss, verbose) except dns.resolver.NoNameservers as err: print(FMT % ("Nameservers:", color.ERROR, "No nameservers found!")) print(err) return except dns.resolver.NXDOMAIN as err: print(FMT % ("Nameservers:", color.ERROR, "%s not found!" % domain)) print(err) return except dns.exception.Timeout as err: print(FMT % ("Nameservers:", color.ERROR, "Timed out!")) print(err) return #print(FMT % ("Nameservers:", check_count(dnsc.nameservers), # ', '.join(dnsc.nameservers))) #print(FMT % ("NS IP:", check_count(dnsc.ns_ips), ', '.join(dnsc.ns_ips))) print_block( "Nameservers:", check_count(dnsc.nameservers), [ "%s: %s" % (x[0].rstrip("."), ", ".join(x[1])) for x in dnsc.ns2ip.items() ] ) if verbose: for key, value in list(dnsc.ns2ip.items()): print("\t%s\t%s" % (key, ', '.join(value))) dnsc.check_parent() zone = zone_checker(domain, dnsc, verbose) if dnsc.load_primary_ns(): print(FMT % ("Primary NS:", color.OK, dnsc.primary)) else: print(FMT % ("Primary NS:", color.ERROR, "Testing on non primary NS: %s" % dnsc.primary)) zone.transfer(dnsc.ns2ip[dnsc.primary][0]) zone.cmp("Serial match:", get_serial(domain)) if not list(zone.values()): print(FMT % ("Zone match:", color.WARNING, "Transfer failed!")) # switch to check at least some subdomains zone = subdomain_checker(domain, dnsc, verbose) differences = [] matches = [] last_name = None for name in list(zone.keys()): value0 = zone.joined_node(dnsc.primary, name) if list_all: for x in value0.split("\n"): if x: if last_name == name: print("%-15s %s" % ("", x.replace(" ", "\t", 2))) else: print("%-15s %s" % (name, x.replace(" ", "\t", 2))) last_name = name for z in list(zone.values()): text = zone.joined_node(z, name) if value0 == text: matches.append(text) else: differences.append([name, value0, text]) if list_all: sys.exit() if not differences and matches: print(FMT % ("Zone match:", color.OK, "%d servers, %s matches" % (len(list(zone.values())), len(matches)))) elif not differences and not matches: print(FMT % ("Zone match:", color.WARNING, "%d servers, %s matches" % (len(list(zone.values())), len(matches)))) else: print(FMT % ("Zone match:", color.ERROR, "Differences: %d" % len(differences))) if len(differences)<20 or verbose: for name, zone_ret, primary_ret in differences: print("Subdomain: %s" % name) print("\t%s" % zone_ret) print("\t%s" % primary_ret) zone.cmp("A match:", get_any("@", domain, "A")) zone.cmp("MX match:", get_any("@", domain, "MX")) zone.cmp("DMARC match:", get_any("_dmarc", domain, "TXT", '^"v=dmarc')) zone.cmp("SPF match:", get_any("@", domain, "TXT", '^"v=spf1')) zone.cmp("WWW match:", get_any("www", domain)) if '--skip-ipv6' not in args: zone.cmp("WWW IPv6 match:", get_any("www", domain, "AAAA")) if '--skip-wild' not in args: zone.cmp("* match:", get_any("*", domain)) zone.dnssec_check() def query(q, type, nss=[], verbose=False): if nss: for ns in nss: if verbose: print("Query: %s @ %s" % (q, ns)) resolver = dns.resolver.Resolver() resolver.nameservers = name2ip(ns) for answer in resolver.resolve(q, type): print("%s IN %s %s", (q, type, answer.to_text())) else: if verbose: print("Query:", q) resolver = dns.resolver.Resolver() try: for answer in resolver.resolve(q, type): print("%s IN %s %s" % (q, type, answer.to_text())) except dns.resolver.NXDOMAIN as err: print("%s: Not found." % q) def reverse_query(host, nss=[], verbose=False): if ":" in host: # IPv6 if '::' in host: host = host.replace("::", ":0"*(8-host.count(":"))+":") revdns = '.'.join(reversed([ '.'.join(list(reversed('0000'+x))[:4]) for x in host.split(':') ]))+".ip6.arpa" query(revdns, "PTR", nss, verbose) else: revdns = '.'.join(reversed(host.split('.')))+".in-addr.arpa" query(revdns, "PTR", nss, verbose) def enum_query(host, nss=[], verbose=False): revdns = '.'.join(list(reversed(host)))+".e164.arpa." query(revdns, "NAPTR", nss, verbose) def rbl_query(host, domains, verbose=False): for domain in domains: revdns = '.'.join(list(reversed(host.split('.'))))\ + '.'+domain.rstrip('.')+'.' query(revdns, "A", verbose=verbose) if __name__ == "__main__": if len(sys.argv) == 1: print(__doc__.strip()) sys.exit() try: opts, argv = getopt.gnu_getopt(sys.argv[1:], 'lvreb:', ['skip-ipv6', 'skip-wild']) kwargs = dict(opts) except getopt.GetoptError as msg: print("Error:", msg[0].args) sys.exit() if '-r' in kwargs: reverse_query(argv[0], argv[1:], '-v' in kwargs) elif '-e' in kwargs: enum_query(argv[0], argv[1:], '-v' in kwargs) elif '-b' in kwargs: for ip in argv: rbl_query(ip, [v for k, v in opts if k == '-b'], '-v' in kwargs) else: domain = argv.pop(0).rstrip(".")+"." #domain = domain.decode(sys.getfilesystemencoding()).encode('idna') domain = domain.encode('idna').decode("latin-1") main(domain, argv, kwargs)