pep8 isort
This commit is contained in:
parent
d6200d6302
commit
aac3793b35
3 changed files with 192 additions and 188 deletions
|
|
@ -3,31 +3,32 @@
|
|||
# from https://github.com/nusenu/trustor-poc
|
||||
# with minor refactoring to make the code more Pythonic.
|
||||
|
||||
import os
|
||||
import sys
|
||||
import datetime
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
import requests
|
||||
from stem.control import Controller
|
||||
from stem.util.tor_tools import *
|
||||
# from urllib.parse import urlparse
|
||||
# from stem.util.tor_tools import *
|
||||
from urllib3.util import parse_url as urlparse
|
||||
|
||||
try:
|
||||
# unbound is not on pypi
|
||||
from unbound import ub_ctx,RR_TYPE_TXT,RR_CLASS_IN
|
||||
from unbound import RR_CLASS_IN, RR_TYPE_TXT, ub_ctx
|
||||
except:
|
||||
ub_ctx = RR_TYPE_TXT = RR_CLASS_IN = None
|
||||
|
||||
global LOG
|
||||
import logging
|
||||
import warnings
|
||||
|
||||
warnings.filterwarnings('ignore')
|
||||
LOG = logging.getLogger()
|
||||
|
||||
# download this python library from
|
||||
# https://github.com/erans/torcontactinfoparser
|
||||
#sys.path.append('/home/....')
|
||||
# sys.path.append('/home/....')
|
||||
try:
|
||||
from torcontactinfo import TorContactInfoParser
|
||||
except:
|
||||
|
|
@ -42,7 +43,7 @@ def is_valid_hostname(hostname):
|
|||
return False
|
||||
if hostname[-1] == ".":
|
||||
hostname = hostname[:-1] # strip exactly one dot from the right, if present
|
||||
allowed = re.compile("(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE)
|
||||
allowed = re.compile("(?!-)[A-Z0-9-]{1,63}(?<!-)$", re.IGNORECASE)
|
||||
return all(allowed.match(x) for x in hostname.split("."))
|
||||
|
||||
def read_local_trust_config(trust_config):
|
||||
|
|
@ -127,7 +128,7 @@ def get_controller(address='127.0.0.1', port=9151, password=''):
|
|||
'''
|
||||
|
||||
try:
|
||||
#controller = Controller.from_socket_file(path=torsocketpath)
|
||||
# controller = Controller.from_socket_file(path=torsocketpath)
|
||||
controller = Controller.from_port(address=address, port=port)
|
||||
controller.authenticate(password=password)
|
||||
except Exception as e:
|
||||
|
|
@ -155,7 +156,7 @@ def find_validation_candidates(controller,
|
|||
{ 'emeraldonion.org' : { 'uri-rsa': ['044600FD968728A6F220D5347AD897F421B757C0', '09DCA3360179C6C8A5A20DDDE1C54662965EF1BA']}}
|
||||
'''
|
||||
# https://github.com/nusenu/ContactInfo-Information-Sharing-Specification#proof
|
||||
accepted_proof_types = ['uri-rsa','dns-rsa']
|
||||
accepted_proof_types = ['uri-rsa', 'dns-rsa']
|
||||
|
||||
# https://github.com/nusenu/ContactInfo-Information-Sharing-Specification#ciissversion
|
||||
accepted_ciissversions = ['2']
|
||||
|
|
@ -186,15 +187,15 @@ def find_validation_candidates(controller,
|
|||
if parsed_ci['ciissversion'] in accepted_ciissversions and prooftype in accepted_proof_types:
|
||||
if ciurl.startswith('http://') or ciurl.startswith('https://'):
|
||||
try:
|
||||
domain=urlparse(ciurl).netloc
|
||||
domain = urlparse(ciurl).netloc
|
||||
except:
|
||||
LOG.warning('failed to parse domain %s' % ciurl)
|
||||
domain='error'
|
||||
domain = 'error'
|
||||
continue
|
||||
else:
|
||||
domain=ciurl
|
||||
domain = ciurl
|
||||
if not is_valid_hostname(domain):
|
||||
domain='error'
|
||||
domain = 'error'
|
||||
continue
|
||||
# we can ignore relays that do not claim to be operated by a trusted operator
|
||||
# if we do not accept all
|
||||
|
|
@ -204,19 +205,19 @@ def find_validation_candidates(controller,
|
|||
if prooftype in result[domain].keys():
|
||||
result[domain][prooftype].append(fingerprint)
|
||||
else:
|
||||
result[domain] = { prooftype : [fingerprint] }
|
||||
result[domain] = {prooftype: [fingerprint]}
|
||||
# mixed proof types are not allowd as per spec but we are not strict here
|
||||
LOG.warning('%s is using mixed prooftypes %s' % (domain, prooftype))
|
||||
else:
|
||||
result[domain] = {prooftype : [fingerprint]}
|
||||
result[domain] = {prooftype: [fingerprint]}
|
||||
return result
|
||||
|
||||
def oDownloadUrlRequests(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050):
|
||||
# socks proxy used for outbound web requests (for validation of proofs)
|
||||
proxy = {'https': 'socks5h://' +host +':' +str(port)}
|
||||
proxy = {'https': "socks5h://{host}:{port}"}
|
||||
# we use this UA string when connecting to webservers to fetch rsa-fingerprint.txt proof files
|
||||
# https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#uri-rsa
|
||||
headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'}
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'}
|
||||
|
||||
LOG.debug("fetching %s...." % uri)
|
||||
try:
|
||||
|
|
@ -250,31 +251,32 @@ def oDownloadUrlRequests(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050):
|
|||
if not oReqResp.headers['Content-Type'].startswith('text/plain'):
|
||||
raise TrustorError(f"HTTP Content-Type != text/plain")
|
||||
|
||||
#check for redirects (not allowed as per spec)
|
||||
# check for redirects (not allowed as per spec)
|
||||
if oReqResp.url != uri:
|
||||
LOG.error(f'Redirect detected %s vs %s (final)' % (uri, oReqResp.url))
|
||||
raise TrustorError(f'Redirect detected %s vs %s (final)' % (uri, oReqResp.url))
|
||||
LOG.error(f'Redirect detected {uri} vs %s (final)' % (oReqResp.url))
|
||||
raise TrustorError(f'Redirect detected {uri} vs %s (final)' % (oReqResp.url))
|
||||
return oReqResp
|
||||
|
||||
logging.getLogger("urllib3").setLevel(logging.INFO)
|
||||
#import urllib3.contrib.pyopenssl
|
||||
#urllib3.contrib.pyopenssl.inject_into_urllib3()
|
||||
# import urllib3.contrib.pyopenssl
|
||||
# urllib3.contrib.pyopenssl.inject_into_urllib3()
|
||||
|
||||
import ipaddress
|
||||
|
||||
import urllib3.util
|
||||
import ipaddress
|
||||
|
||||
|
||||
def ballow_subdomain_matching(hostname, dnsnames):
|
||||
for elt in dnsnames:
|
||||
if len(hostname.split('.')) > len(elt.split('.')) and \
|
||||
hostname.endswith(elt):
|
||||
if len(hostname.split('.')) > len(elt.split('.')) and hostname.endswith(elt):
|
||||
# parent
|
||||
return True
|
||||
return False
|
||||
|
||||
from urllib3.util.ssl_match_hostname import (CertificateError,
|
||||
match_hostname,
|
||||
_dnsname_match,
|
||||
_ipaddress_match,
|
||||
)
|
||||
from urllib3.util.ssl_match_hostname import (CertificateError, _dnsname_match,
|
||||
_ipaddress_match)
|
||||
|
||||
|
||||
def my_match_hostname(cert, hostname):
|
||||
"""Verify that *cert* (in decoded format as returned by
|
||||
SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 and RFC 6125
|
||||
|
|
@ -341,10 +343,10 @@ def my_match_hostname(cert, hostname):
|
|||
raise CertificateError(
|
||||
"no appropriate commonName or subjectAltName fields were found"
|
||||
)
|
||||
match_hostname = my_match_hostname
|
||||
from urllib3.util.ssl_ import (
|
||||
is_ipaddress,
|
||||
)
|
||||
urllib3.util.ssl_match_hostname.match_hostname = my_match_hostname
|
||||
from urllib3.util.ssl_ import is_ipaddress
|
||||
|
||||
|
||||
def _my_match_hostname(cert, asserted_hostname):
|
||||
# Our upstream implementation of ssl.match_hostname()
|
||||
# only applies this normalization to IP addresses so it doesn't
|
||||
|
|
@ -364,11 +366,12 @@ def _my_match_hostname(cert, asserted_hostname):
|
|||
# the cert when catching the exception, if they want to
|
||||
e._peer_cert = cert
|
||||
raise
|
||||
from urllib3.connection import _match_hostname, HTTPSConnection
|
||||
urllib3.connection._match_hostname = _my_match_hostname
|
||||
|
||||
from urllib3.contrib.socks import SOCKSProxyManager
|
||||
from urllib3 import Retry
|
||||
|
||||
|
||||
# from urllib3 import Retry
|
||||
def oDownloadUrlUrllib3(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050):
|
||||
"""Theres no need to use requests here and it
|
||||
adds too many layers on the SSL to be able to get at things
|
||||
|
|
@ -384,7 +387,7 @@ def oDownloadUrlUrllib3(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050):
|
|||
|
||||
# we use this UA string when connecting to webservers to fetch rsa-fingerprint.txt proof files
|
||||
# https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#uri-rsa
|
||||
headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'}
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'}
|
||||
|
||||
LOG.debug("fetching %s...." % uri)
|
||||
try:
|
||||
|
|
@ -419,7 +422,7 @@ def oDownloadUrlUrllib3(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050):
|
|||
if not oReqResp.headers['Content-Type'].startswith('text/plain'):
|
||||
raise TrustorError(f"HTTP Content-Type != text/plain")
|
||||
|
||||
#check for redirects (not allowed as per spec)
|
||||
# check for redirects (not allowed as per spec)
|
||||
if oReqResp.geturl() != uri:
|
||||
LOG.error(f'Redirect detected %s vs %s (final)' % (uri, oReqResp.geturl()))
|
||||
raise TrustorError(f'Redirect detected %s vs %s (final)' % (uri, oReqResp.geturl()))
|
||||
|
|
@ -427,10 +430,12 @@ def oDownloadUrlUrllib3(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050):
|
|||
|
||||
return oReqResp
|
||||
import urllib3.connectionpool
|
||||
from urllib3.connection import HTTPSConnection
|
||||
|
||||
urllib3.connectionpool.VerifiedHTTPSConnection = HTTPSConnection
|
||||
|
||||
def lDownloadUrlFps(domain, sCAfile, timeout=30, host='127.0.0.1', port=9050):
|
||||
uri="https://"+domain+"/.well-known/tor-relay/rsa-fingerprint.txt"
|
||||
uri = f"https://{domain}/.well-known/tor-relay/rsa-fingerprint.txt"
|
||||
o = oDownloadUrlRequests(uri, sCAfile, timeout=timeout, host=host, port=port)
|
||||
well_known_content = o.text.upper().strip().split('\n')
|
||||
well_known_content = [i for i in well_known_content if i and len(i) == 40]
|
||||
|
|
@ -460,7 +465,7 @@ def validate_proofs(candidates, validation_cache_file, timeout=20, host='127.0.0
|
|||
LOG.error('%s:%s:%s' % (fingerprint, domain, prooftype))
|
||||
elif prooftype == 'dns-rsa' and ub_ctx:
|
||||
for fingerprint in candidates[domain][prooftype]:
|
||||
fp_domain = fingerprint+'.'+domain
|
||||
fp_domain = fingerprint + '.' + domain
|
||||
if idns_validate(fp_domain,
|
||||
libunbound_resolv_file='resolv.conf',
|
||||
dnssec_DS_file='dnssec-root-trust',
|
||||
|
|
@ -488,7 +493,6 @@ def idns_validate(domain,
|
|||
# this is not the system wide /etc/resolv.conf
|
||||
# use dnscrypt-proxy to encrypt your DNS and route it via tor's SOCKSPort
|
||||
|
||||
|
||||
ctx = ub_ctx()
|
||||
if (os.path.isfile(libunbound_resolv_file)):
|
||||
ctx.resolvconf(libunbound_resolv_file)
|
||||
|
|
@ -526,12 +530,10 @@ def configure_tor(controller, trusted_fingerprints, exitonly=True):
|
|||
try:
|
||||
controller.set_conf('ExitNodes', trusted_fingerprints)
|
||||
LOG.error('limited exits to %s relays' % relay_count)
|
||||
except Exception as e:
|
||||
except Exception as e: # noqa
|
||||
LOG.exception('Failed to set ExitNodes tor config to trusted relays')
|
||||
sys.exit(20)
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
CAfile = '/etc/ssl/certs/ca-certificates.crt'
|
||||
trust_config = 'trust_config'
|
||||
|
|
@ -542,12 +544,12 @@ if __name__ == '__main__':
|
|||
trusted_fingerprints = read_local_validation_cache(validation_cache_file,
|
||||
trusted_domains=trusted_domains)
|
||||
# tor ControlPort password
|
||||
controller_password=''
|
||||
controller_password = ''
|
||||
# tor ControlPort IP
|
||||
controller_address = '127.0.0.1'
|
||||
timeout = 20
|
||||
port = 9050
|
||||
controller = get_controller(address=controller_address,password=controller_password)
|
||||
controller = get_controller(address=controller_address, password=controller_password)
|
||||
|
||||
r = find_validation_candidates(controller,
|
||||
validation_cache=trusted_fingerprints,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue