1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-24 11:24:10 +00:00

PY3 port utils.request

This commit is contained in:
Mikhail Korobov 2015-07-25 17:14:56 +02:00
parent f750ee4c00
commit 7874bb9f13
3 changed files with 23 additions and 20 deletions

View File

@ -10,6 +10,7 @@ from six.moves.urllib.parse import urlunparse
from twisted.internet.defer import Deferred
from w3lib.http import basic_auth_header
from scrapy.utils.python import to_bytes, to_native_str
from scrapy.utils.url import canonicalize_url
from scrapy.utils.httpobj import urlparse_cached
@ -44,13 +45,14 @@ def request_fingerprint(request, include_headers=None):
"""
if include_headers:
include_headers = tuple([h.lower() for h in sorted(include_headers)])
include_headers = tuple([to_bytes(h.lower())
for h in sorted(include_headers)])
cache = _fingerprint_cache.setdefault(request, {})
if include_headers not in cache:
fp = hashlib.sha1()
fp.update(request.method)
fp.update(canonicalize_url(request.url))
fp.update(request.body or '')
fp.update(to_bytes(request.method))
fp.update(to_bytes(canonicalize_url(request.url)))
fp.update(request.body or b'')
if include_headers:
for hdr in include_headers:
if hdr in request.headers:
@ -60,12 +62,14 @@ def request_fingerprint(request, include_headers=None):
cache[include_headers] = fp.hexdigest()
return cache[include_headers]
def request_authenticate(request, username, password):
"""Autenticate the given request (in place) using the HTTP basic access
authentication mechanism (RFC 2617) and the given username and password
"""
request.headers['Authorization'] = basic_auth_header(username, password)
def request_httprepr(request):
"""Return the raw HTTP representation (as string) of the given request.
This is provided only for reference since it's not the actual stream of
@ -74,11 +78,11 @@ def request_httprepr(request):
"""
parsed = urlparse_cached(request)
path = urlunparse(('', '', parsed.path or '/', parsed.params, parsed.query, ''))
s = "%s %s HTTP/1.1\r\n" % (request.method, path)
s += "Host: %s\r\n" % parsed.hostname
s = to_bytes(request.method) + b" " + to_bytes(path) + b" HTTP/1.1\r\n"
s += b"Host: " + to_bytes(parsed.hostname) + b"\r\n"
if request.headers:
s += request.headers.to_string() + "\r\n"
s += "\r\n"
s += request.headers.to_string() + b"\r\n"
s += b"\r\n"
s += request.body
return s

View File

@ -49,7 +49,6 @@ tests/test_utils_defer.py
tests/test_utils_iterators.py
tests/test_utils_log.py
tests/test_utils_reqser.py
tests/test_utils_request.py
tests/test_utils_response.py
tests/test_utils_signal.py
tests/test_utils_template.py

View File

@ -21,15 +21,15 @@ class UtilsRequestTest(unittest.TestCase):
r1 = Request("http://www.example.com/members/offers.html")
r2 = Request("http://www.example.com/members/offers.html")
r2.headers['SESSIONID'] = "somehash"
r2.headers['SESSIONID'] = b"somehash"
self.assertEqual(request_fingerprint(r1), request_fingerprint(r2))
r1 = Request("http://www.example.com/")
r2 = Request("http://www.example.com/")
r2.headers['Accept-Language'] = 'en'
r2.headers['Accept-Language'] = b'en'
r3 = Request("http://www.example.com/")
r3.headers['Accept-Language'] = 'en'
r3.headers['SESSIONID'] = "somehash"
r3.headers['Accept-Language'] = b'en'
r3.headers['SESSIONID'] = b"somehash"
self.assertEqual(request_fingerprint(r1), request_fingerprint(r2), request_fingerprint(r3))
@ -44,7 +44,7 @@ class UtilsRequestTest(unittest.TestCase):
r1 = Request("http://www.example.com")
r2 = Request("http://www.example.com", method='POST')
r3 = Request("http://www.example.com", method='POST', body='request body')
r3 = Request("http://www.example.com", method='POST', body=b'request body')
self.assertNotEqual(request_fingerprint(r1), request_fingerprint(r2))
self.assertNotEqual(request_fingerprint(r2), request_fingerprint(r3))
@ -52,24 +52,24 @@ class UtilsRequestTest(unittest.TestCase):
# cached fingerprint must be cleared on request copy
r1 = Request("http://www.example.com")
fp1 = request_fingerprint(r1)
r2 = r1.replace(url = "http://www.example.com/other")
r2 = r1.replace(url="http://www.example.com/other")
fp2 = request_fingerprint(r2)
self.assertNotEqual(fp1, fp2)
def test_request_authenticate(self):
r = Request("http://www.example.com")
request_authenticate(r, 'someuser', 'somepass')
self.assertEqual(r.headers['Authorization'], 'Basic c29tZXVzZXI6c29tZXBhc3M=')
self.assertEqual(r.headers['Authorization'], b'Basic c29tZXVzZXI6c29tZXBhc3M=')
def test_request_httprepr(self):
r1 = Request("http://www.example.com")
self.assertEqual(request_httprepr(r1), 'GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n')
self.assertEqual(request_httprepr(r1), b'GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n')
r1 = Request("http://www.example.com/some/page.html?arg=1")
self.assertEqual(request_httprepr(r1), 'GET /some/page.html?arg=1 HTTP/1.1\r\nHost: www.example.com\r\n\r\n')
self.assertEqual(request_httprepr(r1), b'GET /some/page.html?arg=1 HTTP/1.1\r\nHost: www.example.com\r\n\r\n')
r1 = Request("http://www.example.com", method='POST', headers={"Content-type": "text/html"}, body="Some body")
self.assertEqual(request_httprepr(r1), 'POST / HTTP/1.1\r\nHost: www.example.com\r\nContent-Type: text/html\r\n\r\nSome body')
r1 = Request("http://www.example.com", method='POST', headers={"Content-type": b"text/html"}, body=b"Some body")
self.assertEqual(request_httprepr(r1), b'POST / HTTP/1.1\r\nHost: www.example.com\r\nContent-Type: text/html\r\n\r\nSome body')
if __name__ == "__main__":
unittest.main()