# # # an attempt at doing DNS queries in Python (why not?) # # Written March 1994, Wim Lewis # Updated mid-1996 to support MX RRs, and made into a "real" module # Release 1, September 1996 # # import string # errors that can occur DNSParseError = 'DNSParseError'; # mapping of symbolic names to assigned encodings RRTYPE = { 1: 'A', 2: 'NS', 3: 'MD', # obsolete per rfc1035 4: 'MF', # obsolete per rfc1035 5: 'CNAME', 6: 'SOA', 7: 'MB', # experimental per rfc1035 8: 'MG', # experimental (rfc1035) 9: 'MR', # experimental (rfc1035) 10: 'NULL', # experimental (rfc1035) 11: 'WKS', 12: 'PTR', 13: 'HINFO', 14: 'MINFO', 15: 'MX', 16: 'TXT', # The following are QTYPEs but not TYPEs. 252: 'AXFR', 253: 'MAILB', 254: 'MAILA', # obsolete per rfc1035 255: '*' } RRCLASS = { 1: 'IN', # Internet information 2: 'CS', # CSNET information 3: 'CH', # CHAOS information 4: 'HS', # Hesiod information # The following is a QCLASS but not a CLASS. 255: '*' # request for info in any class } opcode_names = ( 'QUERY', 'IQUERY', 'STATUS', '<3???>', '<4???>', '<5???>', '<6???>', '<7???>', '<8???>', '<9???>', '<10???>', '<11???>', '<12???>', '<13???>', '<14???>', '<15???>' ) rcode_names = { 0:'No error', 1:'Format error', 2:'Server failure', 3:'Name error', 4:'Not implemented', 5:'Refused' } for i in range(6,15): rcode_names[i] = ''; # A mapping from RR type names to the classes that implement them # This is filled in as the subclasses are implemented, below TYPE2CLASS = { } # and inverse mappings :-) def invertMap(map): inv = { } for k in map.keys(): inv[map[k]] = k return inv # # invTYPE = invertMap(RRTYPE) invCLASS = invertMap(RRCLASS) # ## ## DNS needs to be able to encode and decode 8, 16, and 32 bit wide ## integers in big-endian form ## encodeInt8 = chr; # routine to encode 8bit unsigned qty. decodeInt8 = ord; def encodeInt16(int): # routine to encode 16bit unsigned qty. (hi, lo) = divmod(int, 256); return encodeInt8(hi) + encodeInt8(lo); def decodeInt16(str): return (decodeInt8(str[0]) << 8) + decodeInt8(str[1]) def encodeInt32(int): # routine to encode 32bit unsigned qty. (hi, lo) = divmod(int, 65536); return encodeInt16(hi) + encodeInt16(lo); def decodeInt32(str): return (((( decodeInt8(str[0]) << 8 ) + decodeInt8(str[1]) << 8 ) + decodeInt8(str[2]) << 8 ) + decodeInt8(str[3]) ) # # a debugging routine def hexdump(str, *etc): width = 2 if etc: width = etc[0] count = 0 for chr in str: if count != 0: print ' ', print hexbyte(ord(chr)), count = count + 1 if count >= width: print '' count = 0 # def hexbyte(i): return ( string.hexdigits[ ( i & 0xF0 ) >> 4 ] + string.hexdigits[ ( i & 0x0F ) ] ) # ## ## Routines to manipulate domain names (encode, repr, decode) ## # domain names are represented as tuples: i.e., "foo.bar.edu" is # represented as ('foo', 'bar', 'edu'). The root domain is the # empty tuple. def encodeDomain(domain): components = [] for label in domain: components.append( encodeInt8(len(label)) + label ) components.append( encodeInt8(0) ) return string.joinfields(components, '') def reprDomain(domain): if domain: return string.joinfields(domain, '.') else: return '.' def decodeDomain(str, *msg): # returns (domain, unused-portion) if len(msg): msg = msg[0] else: msg = None domain = ( ) done = 0 idx = 0 remnant = None while not done: if idx >= len(str): raise DNSParseError, 'Overrun while parsing domain name' clen = ord(str[idx]) if(clen >= 192): # a compressed name if not msg: raise DNSParseError, 'no context avail. for name decompression' if remnant == None: remnant = str[idx + 2:] idx = ( (clen & 0x3F ) << 8) + ord(str[idx+1]) str = msg elif clen >= 64: raise DNSParseError, 'unknown domain length label '+`clen` elif clen > 0: domain = domain + (str[idx+1 : idx+1+clen],) # inefficient. feh. idx = idx + clen + 1 else: # clen == 0 done = 1 if remnant == None: remnant = str[idx+1:] return( domain, remnant ) # ## ## The Question class encodes a question in DNS. It's like a bonsai RR ## which only has the domain, class, and type fields. ## class Question: name = () type = '*' rrclass = '*' def encode(self): return ( encodeDomain(self.name) + encodeInt16(invTYPE[self.type]) + encodeInt16(invCLASS[self.rrclass] ) ) def repr(self): return reprDomain(self.name) + ' ' + self.rrclass + ' ' + self.type def __repr__(self): return '<' + self.repr() + '>' def decodeQuestion(str, msg): q = Question() (q.name, str) = decodeDomain(str, msg) q.type = RRTYPE[decodeInt16(str[0:2])] q.rrclass = RRCLASS[decodeInt16(str[2:4])] return (q, str[4:]) ## ## The ResourceRecord class is an abstract class giving the common ## behavior of all DNS resource records: encode and repr; plus ## a separate routine to decode into the appropriate subclass ## class ResourceRecord: name = () type = '*' rrclass = '*' TTL = 0 def encode(self): rdata = self.encodeRdata() return ( encodeDomain(self.name) + encodeInt16(invTYPE[self.type]) + encodeInt16(invCLASS[self.rrclass]) + encodeInt32(self.TTL) + encodeInt16(len(rdata)) + rdata ) def repr(self): return string.joinfields( (reprDomain(self.name), repr(self.TTL), self.rrclass, self.type, self.reprRdata() ), ' ' ) def __repr__(self): return '<'+self.repr()+'>' # end of abstract RR class def decodeRR(str, msg): (rrname, str) = decodeDomain(str, msg) rrtype = RRTYPE [decodeInt16(str[0:2])] rr = TYPE2CLASS[rrtype]() rr.name = rrname rr.rrclass = RRCLASS[decodeInt16(str[2:4])] rr.TTL = decodeInt32(str[4:8]) rdataLen = decodeInt16(str[8:10]) rr.decodeRdata(str[10:(10 + rdataLen)], msg) return (rr, str[10 + rdataLen:]) # # utility routine to encode a sequence of RRs def encodeRRs(rrs): out = [ ] for i in rrs: out.append(i.encode()) return out # ## ## Subclass for the "A" type resource record. Defines routines to ## manipulate the rdata field: encodeRdata, reprRdata, decodeRdata ## class A_RR (ResourceRecord): type = 'A' address = (0,0,0,0) def encodeRdata(self): return string.joinfields( (encodeInt8(self.address[0]), encodeInt8(self.address[1]), encodeInt8(self.address[2]), encodeInt8(self.address[3])), '') def reprRdata(self): foo = [] for octet in self.address: foo.append(repr(octet)) return string.joinfields(foo, '.') def decodeRdata(self, str, msg): self.address = ( decodeInt8(str[0]) , decodeInt8(str[1]) , decodeInt8(str[2]) , decodeInt8(str[3]) ) if len(str) != 4: raise DNSParseError, 'length of A RDATA is '+`len(str)`+'???' TYPE2CLASS['A'] = A_RR; ## ## Subclass for the "CNAME" resource record. ## class CNAME_RR (ResourceRecord): type = 'CNAME' cname = () def encodeRdata(self): return encodeDomain(self.cname) def reprRdata(self): return reprDomain(self.cname) def decodeRdata(self, str, msg): (self.cname, str) = decodeDomain(str, msg) if str != '': raise DNSParseError, 'garbage at end of CNAME RDATA: '+`str` TYPE2CLASS['CNAME'] = CNAME_RR; ## ## Subclass for the SOA resource record. ## class SOA_RR (ResourceRecord): type = 'SOA' mname = () rname = () serial = 0 refresh = 0 retry = 0 expire = 0 minimum = 0 def encodeRdata(self): return (encodeDomain(self.mname) + encodeDomain(self.rname) + encodeInt32(self.serial) + encodeInt32(self.refresh) + encodeInt32(self.retry) + encodeInt32(self.expire) + encodeInt32(self.minimum)) def reprRdata(self): return ('('+reprDomain(self.mname)+ ' '+reprDomain(self.rname)+ ' '+repr(self.serial)+ ' '+repr(self.refresh)+ ' '+repr(self.retry)+ ' '+repr(self.expire)+ ' '+repr(self.minimum)+')') def decodeRdata(self, str, msg): (self.mname, str) = decodeDomain(str, msg) (self.rname, str) = decodeDomain(str, msg) self.serial = decodeInt32(str[0:4]) self.refresh = decodeInt32(str[4:8]) self.retry = decodeInt32(str[8:12]) self.expire = decodeInt32(str[12:16]) self.minimum = decodeInt32(str[16:20]) if(len(str) > 20): raise DNSParseError, 'garbage at end of SOA RDATA: '+`str` TYPE2CLASS['SOA'] = SOA_RR ## ## Subclass for the "NS" resource record. ## class NS_RR (ResourceRecord): type = 'NS' nsdname = ( ) def encodeRdata(self): return encodeDomain(self.nsdname) def reprRdata(self): return reprDomain(self.nsdname) def decodeRdata(self, str, msg): (self.nsdname, str) = decodeDomain(str, msg) if str != '': raise DNSParseError, 'garbage at end of NS RDATA: '+`str` TYPE2CLASS['NS'] = NS_RR ## ## Subclass for the MX resource record ## class MX_RR (ResourceRecord): type = 'MX' preference = 0 exchanger = ( ) def encodeRdata(self): return (encodeInt16(self.preference) + encodeDomain(self.exchanger)); def reprRdata(self): return (repr(self.preference)+' '+reprDomain(self.exchanger)) def decodeRdata(self, str, msg): print '[MX RDATA: '+`str`+']' if(len(str) < 5): raise DNSParseError, 'Truncated MX RDATA' self.preference = decodeInt16(str[0:2]) (self.exchanger, str) = decodeDomain(str[2:], msg) if(len(str) > 0): raise DNSParseError, 'garbage at end of MX RDATA: '+`str` TYPE2CLASS['MX'] = MX_RR ## ## A class to represent DNS messages. Has decode (as __init__) and encode, ## but no repr as that would be too cumbersome; instead, it has a __print__ ## method. ## class DNSMessage: print 'defining DNSMessage' # ivars: id, qr, opcode, aa, tc, rd, ra, z, rcode, # qdrecs, anrecs, nsrecs, arrecs id = 0 qr = 0 opcode = 0 aa = 0 tc = 0 rd = 0 ra = 0 z = 0 # must always be zero (rfc1035) rcode = 0 def __init__(self, *args): # do basic setup print 'initializing dnsmessage' self.qdrecs = [] self.anrecs = [] self.nsrecs = [] self.arrecs = [] # semantics: if callled with no 'args', creates an empty message; # otherwise, decodes a message from the supplied string. if args: msg = args[0] self.id = decodeInt16(msg[0:2]) buf = decodeInt8(msg[2]) self.qr = ( buf & 0x80 ) >> 7 self.opcode = ( buf & 0x78 ) >> 3 self.aa = ( buf & 0x04 ) >> 2 self.tc = ( buf & 0x02 ) >> 1 self.rd = ( buf & 0x01 ) buf = decodeInt8(msg[3]) self.ra = ( buf & 0x80 ) >> 7 self.z = ( buf & 0x70 ) >> 4 self.rcode = ( buf & 0x0f ) qdcount = decodeInt16(msg[4:6]) ancount = decodeInt16(msg[6:8]) nscount = decodeInt16(msg[8:10]) arcount = decodeInt16(msg[10:12]) (self.qdrecs, buf) = self._getquestions(qdcount, msg[12:], msg) (self.anrecs, buf) = self._getrecs(ancount, buf, msg) (self.nsrecs, buf) = self._getrecs(nscount, buf, msg) (self.arrecs, buf) = self._getrecs(arcount, buf, msg) self.trailer = buf def _getrecs(self, count, buf, msg): list = [ ] while len(list) < count: (rec, buf) = decodeRR(buf, msg) list.append(rec) return (list, buf) def _getquestions(self, count, buf, msg): list = [ ] while len(list) < count: (rec, buf) = decodeQuestion(buf, msg) list.append(rec) return (list, buf) # def encode(self): hdr = encodeInt16(self.id) + \ encodeInt8(128*self.qr + 8*self.opcode + \ 4*self.aa + 2*self.tc + self.rd ) + \ encodeInt8(128*self.ra + self.rcode) + \ encodeInt16(len(self.qdrecs)) + \ encodeInt16(len(self.anrecs)) + \ encodeInt16(len(self.nsrecs)) + \ encodeInt16(len(self.arrecs)); return hdr + string.joinfields( encodeRRs(self.qdrecs + self.anrecs + self.nsrecs + self.arrecs) , '') def _printRRs(self, rrs, name): if rrs: print name+' records: ('+`len(rrs)`+')' for i in rrs: print ' ', i else: print '(no '+name+' records)' def __print__(s): print 'DNSmessage id='+`s.id`, 'qr='+(('QUERY','RESPONSE')[s.qr]), print 'opcode='+`opcode_names[s.opcode]`, if s.aa: print 'auth.', if s.tc: print 'trunc.', print if s.rd: print '(recursion desired)', if s.ra: print '(recursion avail.)', if s.z: print '(z='+`s.z`+')', print 'rcode='+`s.rcode`, rcode_names[s.rcode] s._printRRs(s.qdrecs, 'question') s._printRRs(s.anrecs, 'answer') s._printRRs(s.nsrecs, 'authority') s._printRRs(s.arrecs, 'additional') # #