Page MenuHomePhabricator
Paste P9480

(An Untitled Masterwork)
ActivePublic

Authored by Bugreporter on Oct 25 2019, 4:05 PM.
Tags
None
Referenced Files
F30880226: raw.txt
Oct 25 2019, 4:05 PM
Subscribers
None
from distutils.version import StrictVersion
from socket import error as SocketError, timeout as SocketTimeout
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.poolmanager import (
PoolManager, HTTPSConnectionPool,
)
from requests.packages.urllib3.exceptions import ConnectTimeoutError
from requests.packages.urllib3.util import connection
from requests.packages.urllib3.exceptions import NewConnectionError
from requests.packages.urllib3.connection import HTTPSConnection
class ForcedIPHTTPSAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
self.dest_ip = kwargs.pop('dest_ip', None)
super(ForcedIPHTTPSAdapter, self).__init__(*args, **kwargs)
def init_poolmanager(self, *args, **pool_kwargs):
pool_kwargs['dest_ip'] = self.dest_ip
self.poolmanager = ForcedIPHTTPSPoolManager(*args, **pool_kwargs)
class ForcedIPHTTPSPoolManager(PoolManager):
def __init__(self, *args, **kwargs):
self.dest_ip = kwargs.pop('dest_ip', None)
super(ForcedIPHTTPSPoolManager, self).__init__(*args, **kwargs)
def _new_pool(self, scheme, host, port, request_context=None):
kwargs = self.connection_pool_kw
assert scheme == 'https'
kwargs['dest_ip'] = self.dest_ip
return ForcedIPHTTPSConnectionPool(host, port, **kwargs)
class ForcedIPHTTPSConnectionPool(HTTPSConnectionPool):
def __init__(self, *args, **kwargs):
self.dest_ip = kwargs.pop('dest_ip', None)
super(ForcedIPHTTPSConnectionPool, self).__init__(*args, **kwargs)
def _new_conn(self):
self.num_connections += 1
actual_host = self.host
actual_port = self.port
if self.proxy is not None:
actual_host = self.proxy.host
actual_port = self.proxy.port
self.conn_kw = getattr(self, 'conn_kw', {})
self.conn_kw['dest_ip'] = self.dest_ip
conn = ForcedIPHTTPSConnection(
host=actual_host, port=actual_port,
timeout=self.timeout.connect_timeout,
strict=self.strict, **self.conn_kw)
pc = self._prepare_conn(conn)
return pc
def __str__(self):
return '%s(host=%r, port=%r, dest_ip=%s)' % (
type(self).__name__, self.host, self.port, self.dest_ip)
class ForcedIPHTTPSConnection(HTTPSConnection, object):
def __init__(self, *args, **kwargs):
self.dest_ip = kwargs.pop('dest_ip', None)
super(ForcedIPHTTPSConnection, self).__init__(*args, **kwargs)
def _new_conn(self):
extra_kw = {}
if self.source_address:
extra_kw['source_address'] = self.source_address
if getattr(self, 'socket_options', None):
extra_kw['socket_options'] = self.socket_options
dest_host = self.dest_ip if self.dest_ip else self.host
try:
conn = connection.create_connection(
(dest_host, self.port), self.timeout, **extra_kw)
except SocketTimeout as e:
raise ConnectTimeoutError(
self, "Connection to %s timed out. (connect timeout=%s)" %
(self.host, self.timeout))
except SocketError as e:
raise NewConnectionError(
self, "Failed to establish a new connection: %s" % e)
return conn
# The script above comes from: https://github.com/Roadmaster/forcediphttpsadapter/blob/master/w/adapters.py
session = requests.Session()
session.mount("https://query.wikidata.org", ForcedIPHTTPSAdapter(dest_ip='198.35.26.96'))
response = session.get(
'https://query.wikidata.org/sparql?query=SELECT%20%0A%20%20%3Fitem%20%3Frelative%20%3FitemLabel%20%20%3FrelativeLabel%20%0A%7B%0A%20%20%20%20%3Fitem%20wdt%3AP40%20%3Frelative%20.%0A%20%20%20%20%3Fitem%20wdt%3AP31%20wd%3AQ5%20%3B%20wdt%3AP21%20wd%3AQ6581097.%0A%20%20%20%20%3Frelative%20wdt%3AP31%20wd%3AQ5%20.%0A%20%20%20%20MINUS%20%7B%20%3Frelative%20wdt%3AP22%20%3Fitem%20%7D%0A%7D', headers={'Host': 'query.wikidata.org'}, verify=False)
print(response.status_code)
print(response.text)