lin
2025-07-30 fcd736bf35fd93b563e9bbf594f2aa7b62028cc9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
## This file is part of Scapy
## See http://www.secdev.org/projects/scapy for more informations
## Copyright (C) Philippe Biondi <phil@secdev.org>
## This program is published under a GPLv2 license
 
"""
Resolve Autonomous Systems (AS).
"""
 
 
from __future__ import absolute_import
import socket, errno
from scapy.config import conf
from scapy.compat import *
 
class AS_resolver:
    server = None
    options = "-k" 
    def __init__(self, server=None, port=43, options=None):
        if server is not None:
            self.server = server
        self.port = port
        if options is not None:
            self.options = options
        
    def _start(self):
        self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.s.connect((self.server,self.port))
        if self.options:
            self.s.send(self.options.encode("utf8")+b"\n")
            self.s.recv(8192)
    def _stop(self):
        self.s.close()
        
    def _parse_whois(self, txt):
        asn,desc = None,b""
        for l in txt.splitlines():
            if not asn and l.startswith(b"origin:"):
                asn = plain_str(l[7:].strip())
            if l.startswith(b"descr:"):
                if desc:
                    desc += r"\n"
                desc += l[6:].strip()
            if asn is not None and desc:
                break
        return asn, plain_str(desc.strip())
 
    def _resolve_one(self, ip):
        self.s.send(("%s\n" % ip).encode("utf8"))
        x = b""
        while not (b"%" in x  or b"source" in x):
            x += self.s.recv(8192)
        asn, desc = self._parse_whois(x)
        return ip,asn,desc
    def resolve(self, *ips):
        self._start()
        ret = []
        for ip in ips:
            ip,asn,desc = self._resolve_one(ip)
            if asn is not None:
                ret.append((ip,asn,desc))
        self._stop()
        return ret
 
class AS_resolver_riswhois(AS_resolver):
    server = "riswhois.ripe.net"
    options = "-k -M -1"
 
 
class AS_resolver_radb(AS_resolver):
    server = "whois.ra.net"
    options = "-k -M"
    
 
class AS_resolver_cymru(AS_resolver):
    server = "whois.cymru.com"
    options = None
    def resolve(self, *ips):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect((self.server,self.port))
        s.send(b"begin\r\n"+b"\r\n".join(ip.encode("utf8") for ip in ips)+b"\r\nend\r\n")
        r = b""
        while True:
            l = s.recv(8192)
            if l == b"":
                break
            r += l
        s.close()
 
        return self.parse(r)
 
    def parse(self, data):
        """Parse bulk cymru data"""
 
        ASNlist = []
        for l in data.splitlines()[1:]:
            l = plain_str(l)
            if "|" not in l:
                continue
            asn, ip, desc = [elt.strip() for elt in l.split('|')]
            if asn == "NA":
                continue
            asn = "AS%s" % asn
            ASNlist.append((ip, asn, desc))
        return ASNlist
 
class AS_resolver_multi(AS_resolver):
    resolvers_list = ( AS_resolver_riswhois(),AS_resolver_radb(),AS_resolver_cymru() )
    def __init__(self, *reslist):
        if reslist:
            self.resolvers_list = reslist
    def resolve(self, *ips):
        todo = ips
        ret = []
        for ASres in self.resolvers_list:
            try:
                res = ASres.resolve(*todo)
            except socket.error as e:
                if e[0] in [errno.ECONNREFUSED, errno.ETIMEDOUT, errno.ECONNRESET]:
                    continue
            resolved = [ ip for ip,asn,desc in res ]
            todo = [ ip for ip in todo if ip not in resolved ]
            ret += res
            if len(todo) == 0:
                break
        if len(ips) != len(ret):
            raise RuntimeError("Could not contact whois providers")
        return ret
 
 
conf.AS_resolver = AS_resolver_multi()