# Copyright (c) 2013-2017 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import sys
import socket
import dns.resolver
import json
import logging
from time import sleep
# Import the dnspython rdtypes to fix the dynamic import problem when frozen.
import dns.rdtypes.ANY.TXT # @UnusedImport
from .exceptions import (IPDefinedError, ASNLookupError, BlacklistError,
WhoisLookupError, HTTPLookupError, HostLookupError,
HTTPRateLimitError, WhoisRateLimitError)
from .whois import RIR_WHOIS
from .asn import ASN_ORIGIN_WHOIS
from .utils import ipv4_is_defined, ipv6_is_defined
if sys.version_info >= (3, 3): # pragma: no cover
from ipaddress import (ip_address,
IPv4Address,
IPv6Address)
else: # pragma: no cover
from ipaddr import (IPAddress as ip_address,
IPv4Address,
IPv6Address)
try: # pragma: no cover
from urllib.request import (OpenerDirector,
ProxyHandler,
build_opener,
Request,
URLError,
HTTPError)
from urllib.parse import urlencode
except ImportError: # pragma: no cover
from urllib2 import (OpenerDirector,
ProxyHandler,
build_opener,
Request,
URLError,
HTTPError)
from urllib import urlencode
log = logging.getLogger(__name__)
# POSSIBLY UPDATE TO USE RDAP
ARIN = 'http://whois.arin.net/rest/nets;q={0}?showDetails=true&showARIN=true'
CYMRU_WHOIS = 'whois.cymru.com'
IPV4_DNS_ZONE = '{0}.origin.asn.cymru.com'
IPV6_DNS_ZONE = '{0}.origin6.asn.cymru.com'
BLACKLIST = [
'root.rwhois.net'
]
ORG_MAP = {
'ARIN': 'arin',
'VR-ARIN': 'arin',
'RIPE': 'ripencc',
'APNIC': 'apnic',
'LACNIC': 'lacnic',
'AFRINIC': 'afrinic',
'DNIC': 'arin'
}
[docs]class Net:
"""
The class for performing network queries.
Args:
address: An IPv4 or IPv6 address in string format.
timeout: The default timeout for socket connections in seconds.
proxy_opener: The urllib.request.OpenerDirector request for proxy
support or None.
allow_permutations: Use additional methods if DNS lookups to Cymru
fail.
Raises:
IPDefinedError: The address provided is defined (does not need to be
resolved).
"""
def __init__(self, address, timeout=5, proxy_opener=None,
allow_permutations=True):
# IPv4Address or IPv6Address
if isinstance(address, IPv4Address) or isinstance(
address, IPv6Address):
self.address = address
else:
# Use ipaddress package exception handling.
self.address = ip_address(address)
# Default timeout for socket connections.
self.timeout = timeout
# Allow other than DNS lookups for ASNs.
self.allow_permutations = allow_permutations
self.dns_resolver = dns.resolver.Resolver()
self.dns_resolver.timeout = timeout
self.dns_resolver.lifetime = timeout
# Proxy opener.
if isinstance(proxy_opener, OpenerDirector):
self.opener = proxy_opener
else:
handler = ProxyHandler()
self.opener = build_opener(handler)
# IP address in string format for use in queries.
self.address_str = self.address.__str__()
# Determine the IP version, 4 or 6
self.version = self.address.version
if self.version == 4:
# Check if no ASN/whois resolution needs to occur.
is_defined = ipv4_is_defined(self.address_str)
if is_defined[0]:
raise IPDefinedError(
'IPv4 address {0} is already defined as {1} via '
'{2}.'.format(
self.address_str, is_defined[1], is_defined[2]
)
)
# Reverse the IPv4Address for the DNS ASN query.
split = self.address_str.split('.')
split.reverse()
self.reversed = '.'.join(split)
self.dns_zone = IPV4_DNS_ZONE.format(self.reversed)
else:
# Check if no ASN/whois resolution needs to occur.
is_defined = ipv6_is_defined(self.address_str)
if is_defined[0]:
raise IPDefinedError(
'IPv6 address {0} is already defined as {1} via '
'{2}.'.format(
self.address_str, is_defined[1], is_defined[2]
)
)
# Explode the IPv6Address to fill in any missing 0's.
exploded = self.address.exploded
# Cymru seems to timeout when the IPv6 address has trailing '0000'
# groups. Remove these groups.
groups = exploded.split(':')
for index, value in reversed(list(enumerate(groups))):
if value == '0000':
del groups[index]
else:
break
exploded = ':'.join(groups)
# Reverse the IPv6Address for the DNS ASN query.
val = str(exploded).replace(':', '')
val = val[::-1]
self.reversed = '.'.join(val)
self.dns_zone = IPV6_DNS_ZONE.format(self.reversed)
[docs] def lookup_asn(self, *args, **kwargs):
"""
Temporary wrapper for IP ASN lookups (moved to
asn.IPASN.lookup()). This will be removed in a future
release (1.0.0).
"""
from warnings import warn
warn('Net.lookup_asn() has been deprecated and will be removed. '
'You should now use asn.IPASN.lookup() for IP ASN lookups.')
from .asn import IPASN
response = None
ipasn = IPASN(self)
return ipasn.lookup(*args, **kwargs), response
[docs] def get_asn_dns(self):
"""
The function for retrieving ASN information for an IP address from
Cymru via port 53 (DNS).
Returns:
String: The raw ASN data.
Raises:
ASNLookupError: The ASN lookup failed.
"""
try:
log.debug('ASN query for {0}'.format(self.dns_zone))
data = self.dns_resolver.query(self.dns_zone, 'TXT')
return str(data[0])
except (dns.resolver.NXDOMAIN, dns.resolver.NoNameservers,
dns.resolver.NoAnswer, dns.exception.Timeout) as e:
raise ASNLookupError(
'ASN lookup failed (DNS {0}) for {1}.'.format(
e.__class__.__name__, self.address_str)
)
except: # pragma: no cover
raise ASNLookupError(
'ASN lookup failed for {0}.'.format(self.address_str)
)
[docs] def get_asn_whois(self, retry_count=3):
"""
The function for retrieving ASN information for an IP address from
Cymru via port 43/tcp (WHOIS).
Args:
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
Returns:
String: The raw ASN data.
Raises:
ASNLookupError: The ASN lookup failed.
"""
try:
# Create the connection for the Cymru whois query.
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.settimeout(self.timeout)
log.debug('ASN query for {0}'.format(self.address_str))
conn.connect((CYMRU_WHOIS, 43))
# Query the Cymru whois server, and store the results.
conn.send((
' -r -a -c -p -f -o {0}{1}'.format(
self.address_str, '\r\n')
).encode())
data = ''
while True:
d = conn.recv(4096).decode()
data += d
if not d:
break
conn.close()
return str(data)
except (socket.timeout, socket.error) as e: # pragma: no cover
log.debug('ASN query socket error: {0}'.format(e))
if retry_count > 0:
log.debug('ASN query retrying (count: {0})'.format(
str(retry_count)))
return self.get_asn_whois(retry_count - 1)
else:
raise ASNLookupError(
'ASN lookup failed for {0}.'.format(self.address_str)
)
except: # pragma: no cover
raise ASNLookupError(
'ASN lookup failed for {0}.'.format(self.address_str)
)
[docs] def get_asn_http(self, retry_count=3):
"""
The function for retrieving ASN information for an IP address from
Arin via port 80 (HTTP). Currently limited to fetching asn_registry
through a Arin whois (REST) lookup. The other values are returned as
None to keep a consistent dict output. This should be used as a last
chance fallback call behind ASN DNS & ASN Whois lookups.
Args:
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
Returns:
Dictionary: The ASN data in json format.
Raises:
ASNLookupError: The ASN lookup failed.
"""
try:
# Lets attempt to get the ASN registry information from
# ARIN.
log.debug('ASN query for {0}'.format(self.address_str))
response = self.get_http_json(
url=str(ARIN).format(self.address_str),
retry_count=retry_count,
headers={'Accept': 'application/json'}
)
return response
except (socket.timeout, socket.error) as e: # pragma: no cover
log.debug('ASN query socket error: {0}'.format(e))
if retry_count > 0:
log.debug('ASN query retrying (count: {0})'.format(
str(retry_count)))
return self.get_asn_http(retry_count=retry_count-1)
else:
raise ASNLookupError(
'ASN lookup failed for {0}.'.format(self.address_str)
)
except:
raise ASNLookupError(
'ASN lookup failed for {0}.'.format(self.address_str)
)
[docs] def get_asn_origin_whois(self, asn_registry='radb', asn=None,
retry_count=3, server=None, port=43):
"""
The function for retrieving CIDR info for an ASN via whois.
Args:
asn_registry: The source to run the query against
(asn.ASN_ORIGIN_WHOIS).
asn: The ASN string (required).
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
server: An optional server to connect to. Defaults to RADB.
port: The network port to connect on.
Returns:
String: The raw ASN origin whois data.
Raises:
WhoisLookupError: The ASN origin whois lookup failed.
WhoisRateLimitError: The ASN origin Whois request rate limited and
retries were exhausted.
"""
try:
if server is None:
server = ASN_ORIGIN_WHOIS[asn_registry]['server']
# Create the connection for the whois query.
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.settimeout(self.timeout)
log.debug('ASN origin WHOIS query for {0} at {1}:{2}'.format(
asn, server, port))
conn.connect((server, port))
# Prep the query.
query = ' -i origin {0}{1}'.format(asn, '\r\n')
# Query the whois server, and store the results.
conn.send(query.encode())
response = ''
while True:
d = conn.recv(4096).decode()
response += d
if not d:
break
conn.close()
# TODO: this was taken from get_whois(). Need to test rate limiting
if 'Query rate limit exceeded' in response: # pragma: no cover
if retry_count > 0:
log.debug('ASN origin WHOIS query rate limit exceeded. '
'Waiting...')
sleep(1)
return self.get_asn_origin_whois(
asn_registry=asn_registry, asn=asn,
retry_count=retry_count-1,
server=server, port=port
)
else:
raise WhoisRateLimitError(
'ASN origin Whois lookup failed for {0}. Rate limit '
'exceeded, wait and try again (possibly a '
'temporary block).'.format(asn))
elif ('error 501' in response or 'error 230' in response
): # pragma: no cover
log.debug('ASN origin WHOIS query error: {0}'.format(response))
raise ValueError
return str(response)
except (socket.timeout, socket.error) as e:
log.debug('ASN origin WHOIS query socket error: {0}'.format(e))
if retry_count > 0:
log.debug('ASN origin WHOIS query retrying (count: {0})'
''.format(str(retry_count)))
return self.get_asn_origin_whois(
asn_registry=asn_registry, asn=asn,
retry_count=retry_count-1, server=server, port=port
)
else:
raise WhoisLookupError(
'ASN origin WHOIS lookup failed for {0}.'.format(asn)
)
except WhoisRateLimitError: # pragma: no cover
raise
except: # pragma: no cover
raise WhoisLookupError(
'ASN origin WHOIS lookup failed for {0}.'.format(asn)
)
[docs] def get_whois(self, asn_registry='arin', retry_count=3, server=None,
port=43, extra_blacklist=None):
"""
The function for retrieving whois or rwhois information for an IP
address via any port. Defaults to port 43/tcp (WHOIS).
Args:
asn_registry: The NIC to run the query against.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
server: An optional server to connect to. If provided, asn_registry
will be ignored.
port: The network port to connect on.
extra_blacklist: A list of blacklisted whois servers in addition to
the global BLACKLIST.
Returns:
String: The raw whois data.
Raises:
BlacklistError: Raised if the whois server provided is in the
global BLACKLIST or extra_blacklist.
WhoisLookupError: The whois lookup failed.
WhoisRateLimitError: The Whois request rate limited and retries
were exhausted.
"""
try:
extra_bl = extra_blacklist if extra_blacklist else []
if any(server in srv for srv in (BLACKLIST, extra_bl)):
raise BlacklistError(
'The server {0} is blacklisted.'.format(server)
)
if server is None:
server = RIR_WHOIS[asn_registry]['server']
# Create the connection for the whois query.
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.settimeout(self.timeout)
log.debug('WHOIS query for {0} at {1}:{2}'.format(
self.address_str, server, port))
conn.connect((server, port))
# Prep the query.
query = self.address_str + '\r\n'
if asn_registry == 'arin':
query = 'n + {0}'.format(query)
# Query the whois server, and store the results.
conn.send(query.encode())
response = ''
while True:
d = conn.recv(4096).decode('ascii', 'ignore')
response += d
if not d:
break
conn.close()
if 'Query rate limit exceeded' in response: # pragma: no cover
if retry_count > 0:
log.debug('WHOIS query rate limit exceeded. Waiting...')
sleep(1)
return self.get_whois(
asn_registry=asn_registry, retry_count=retry_count-1,
server=server, port=port,
extra_blacklist=extra_blacklist
)
else:
raise WhoisRateLimitError(
'Whois lookup failed for {0}. Rate limit '
'exceeded, wait and try again (possibly a '
'temporary block).'.format(self.address_str))
elif ('error 501' in response or 'error 230' in response
): # pragma: no cover
log.debug('WHOIS query error: {0}'.format(response))
raise ValueError
return str(response)
except (socket.timeout, socket.error) as e:
log.debug('WHOIS query socket error: {0}'.format(e))
if retry_count > 0:
log.debug('WHOIS query retrying (count: {0})'.format(
str(retry_count)))
return self.get_whois(
asn_registry=asn_registry, retry_count=retry_count-1,
server=server, port=port, extra_blacklist=extra_blacklist
)
else:
raise WhoisLookupError(
'WHOIS lookup failed for {0}.'.format(self.address_str)
)
except WhoisRateLimitError: # pragma: no cover
raise
except BlacklistError:
raise
except: # pragma: no cover
raise WhoisLookupError(
'WHOIS lookup failed for {0}.'.format(self.address_str)
)
[docs] def get_http_json(self, url=None, retry_count=3, rate_limit_timeout=120,
headers=None):
"""
The function for retrieving a json result via HTTP.
Args:
url: The URL to retrieve.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
rate_limit_timeout: The number of seconds to wait before retrying
when a rate limit notice is returned via rdap+json.
headers: The HTTP headers dictionary. The Accept header defaults
to 'application/rdap+json'.
Returns:
Dictionary: The data in json format.
Raises:
HTTPLookupError: The HTTP lookup failed.
HTTPRateLimitError: The HTTP request rate limited and retries
were exhausted.
"""
if headers is None:
headers = {'Accept': 'application/rdap+json'}
try:
# Create the connection for the whois query.
log.debug('HTTP query for {0} at {1}'.format(
self.address_str, url))
conn = Request(url, headers=headers)
data = self.opener.open(conn, timeout=self.timeout)
try:
d = json.loads(data.readall().decode('utf-8', 'ignore'))
except AttributeError: # pragma: no cover
d = json.loads(data.read().decode('utf-8', 'ignore'))
try:
# Tests written but commented out. I do not want to send a
# flood of requests on every test.
for tmp in d['notices']: # pragma: no cover
if tmp['title'] == 'Rate Limit Notice':
log.debug('RDAP query rate limit exceeded.')
if retry_count > 0:
log.debug('Waiting {0} seconds...'.format(
str(rate_limit_timeout)))
sleep(rate_limit_timeout)
return self.get_http_json(
url=url, retry_count=retry_count-1,
rate_limit_timeout=rate_limit_timeout,
headers=headers
)
else:
raise HTTPRateLimitError(
'HTTP lookup failed for {0}. Rate limit '
'exceeded, wait and try again (possibly a '
'temporary block).'.format(url))
except (KeyError, IndexError): # pragma: no cover
pass
return d
except HTTPError as e: # pragma: no cover
# RIPE is producing this HTTP error rather than a JSON error.
if e.code == 429:
log.debug('HTTP query rate limit exceeded.')
if retry_count > 0:
log.debug('Waiting {0} seconds...'.format(
str(rate_limit_timeout)))
sleep(rate_limit_timeout)
return self.get_http_json(
url=url, retry_count=retry_count - 1,
rate_limit_timeout=rate_limit_timeout,
headers=headers
)
else:
raise HTTPRateLimitError(
'HTTP lookup failed for {0}. Rate limit '
'exceeded, wait and try again (possibly a '
'temporary block).'.format(url))
else:
raise HTTPLookupError('HTTP lookup failed for {0} with error '
'code {1}.'.format(url, str(e.code)))
except (URLError, socket.timeout, socket.error) as e:
# Check needed for Python 2.6, also why URLError is caught.
try: # pragma: no cover
if not isinstance(e.reason, (socket.timeout, socket.error)):
raise HTTPLookupError('HTTP lookup failed for {0}.'
''.format(url))
except AttributeError: # pragma: no cover
pass
log.debug('HTTP query socket error: {0}'.format(e))
if retry_count > 0:
log.debug('HTTP query retrying (count: {0})'.format(
str(retry_count)))
return self.get_http_json(
url=url, retry_count=retry_count-1,
rate_limit_timeout=rate_limit_timeout, headers=headers
)
else:
raise HTTPLookupError('HTTP lookup failed for {0}.'.format(
url))
except (HTTPLookupError, HTTPRateLimitError) as e: # pragma: no cover
raise e
except: # pragma: no cover
raise HTTPLookupError('HTTP lookup failed for {0}.'.format(url))
[docs] def get_host(self, retry_count=3):
"""
The function for retrieving host information for an IP address.
Args:
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
Returns:
Tuple: hostname, aliaslist, ipaddrlist
Raises:
HostLookupError: The host lookup failed.
"""
try:
default_timeout_set = False
if not socket.getdefaulttimeout():
socket.setdefaulttimeout(self.timeout)
default_timeout_set = True
log.debug('Host query for {0}'.format(self.address_str))
ret = socket.gethostbyaddr(self.address_str)
if default_timeout_set: # pragma: no cover
socket.setdefaulttimeout(None)
return ret
except (socket.timeout, socket.error) as e:
log.debug('Host query socket error: {0}'.format(e))
if retry_count > 0:
log.debug('Host query retrying (count: {0})'.format(
str(retry_count)))
return self.get_host(retry_count - 1)
else:
raise HostLookupError(
'Host lookup failed for {0}.'.format(self.address_str)
)
except: # pragma: no cover
raise HostLookupError(
'Host lookup failed for {0}.'.format(self.address_str)
)
[docs] def get_http_raw(self, url=None, retry_count=3, headers=None,
request_type='GET', form_data=None):
"""
The function for retrieving a raw HTML result via HTTP.
Args:
url: The URL to retrieve.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
headers: The HTTP headers dictionary. The Accept header defaults
to 'text/html'.
request_type: 'GET' or 'POST'
form_data: Dictionary of form POST data
Returns:
String: The raw data.
Raises:
HTTPLookupError: The HTTP lookup failed.
"""
if headers is None:
headers = {'Accept': 'text/html'}
enc_form_data = None
if form_data:
enc_form_data = urlencode(form_data)
try:
# Py 2 inspection will alert on the encoding arg, no harm done.
enc_form_data = bytes(enc_form_data, encoding='ascii')
except TypeError: # pragma: no cover
pass
try:
# Create the connection for the HTTP query.
log.debug('HTTP query for {0} at {1}'.format(
self.address_str, url))
try:
# Py 2 inspection alert bypassed by using kwargs dict.
conn = Request(url=url, data=enc_form_data, headers=headers,
**{'method': request_type})
except TypeError: # pragma: no cover
conn = Request(url=url, data=enc_form_data, headers=headers)
data = self.opener.open(conn, timeout=self.timeout)
try:
d = data.readall().decode('ascii', 'ignore')
except AttributeError: # pragma: no cover
d = data.read().decode('ascii', 'ignore')
return str(d)
except (URLError, socket.timeout, socket.error) as e:
# Check needed for Python 2.6, also why URLError is caught.
try: # pragma: no cover
if not isinstance(e.reason, (socket.timeout, socket.error)):
raise HTTPLookupError('HTTP lookup failed for {0}.'
''.format(url))
except AttributeError: # pragma: no cover
pass
log.debug('HTTP query socket error: {0}'.format(e))
if retry_count > 0:
log.debug('HTTP query retrying (count: {0})'.format(
str(retry_count)))
return self.get_http_raw(
url=url, retry_count=retry_count - 1, headers=headers,
request_type=request_type, form_data=form_data
)
else:
raise HTTPLookupError('HTTP lookup failed for {0}.'.format(
url))
except HTTPLookupError as e: # pragma: no cover
raise e
except Exception: # pragma: no cover
raise HTTPLookupError('HTTP lookup failed for {0}.'.format(url))