1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-22 14:13:11 +00:00

Merge pull request #828 from dangra/string-headers

normalize header values to bytes (and PY3)
This commit is contained in:
Mikhail Korobov 2014-08-01 15:51:57 +06:00
commit 7183a87c35
4 changed files with 81 additions and 47 deletions

View File

@ -1,3 +1,4 @@
import six
from w3lib.http import headers_dict_to_raw from w3lib.http import headers_dict_to_raw
from scrapy.utils.datatypes import CaselessDict from scrapy.utils.datatypes import CaselessDict
@ -10,19 +11,29 @@ class Headers(CaselessDict):
super(Headers, self).__init__(seq) super(Headers, self).__init__(seq)
def normkey(self, key): def normkey(self, key):
"""Headers must not be unicode""" """Normalize key to bytes"""
if isinstance(key, unicode): return self._tobytes(key.title())
return key.title().encode(self.encoding)
return key.title()
def normvalue(self, value): def normvalue(self, value):
"""Headers must not be unicode""" """Normalize values to bytes"""
if value is None: if value is None:
value = [] value = []
elif isinstance(value, (six.text_type, bytes)):
value = [value]
elif not hasattr(value, '__iter__'): elif not hasattr(value, '__iter__'):
value = [value] value = [value]
return [x.encode(self.encoding) if isinstance(x, unicode) else x \
for x in value] return [self._tobytes(x) for x in value]
def _tobytes(self, x):
if isinstance(x, bytes):
return x
elif isinstance(x, six.text_type):
return x.encode(self.encoding)
elif isinstance(x, int):
return six.text_type(x).encode(self.encoding)
else:
raise TypeError('Unsupported value type: {}'.format(type(x)))
def __getitem__(self, key): def __getitem__(self, key):
try: try:

View File

@ -30,7 +30,6 @@ tests/test_downloadermiddleware_useragent.py
tests/test_dupefilter.py tests/test_dupefilter.py
tests/test_engine.py tests/test_engine.py
tests/test_http_cookies.py tests/test_http_cookies.py
tests/test_http_headers.py
tests/test_http_request.py tests/test_http_request.py
tests/test_http_response.py tests/test_http_response.py
tests/test_link.py tests/test_link.py

View File

@ -426,7 +426,7 @@ class FTPTestCase(unittest.TestCase):
def _test(r): def _test(r):
self.assertEqual(r.status, 200) self.assertEqual(r.status, 200)
self.assertEqual(r.body, 'I have the power!') self.assertEqual(r.body, 'I have the power!')
self.assertEqual(r.headers, {'Local Filename': [''], 'Size': [17]}) self.assertEqual(r.headers, {'Local Filename': [''], 'Size': ['17']})
return self._add_test_callbacks(d, _test) return self._add_test_callbacks(d, _test)
def test_ftp_download_notexist(self): def test_ftp_download_notexist(self):
@ -446,7 +446,7 @@ class FTPTestCase(unittest.TestCase):
def _test(r): def _test(r):
self.assertEqual(r.body, local_fname) self.assertEqual(r.body, local_fname)
self.assertEqual(r.headers, {'Local Filename': ['/tmp/file.txt'], 'Size': [17]}) self.assertEqual(r.headers, {'Local Filename': ['/tmp/file.txt'], 'Size': ['17']})
self.assertTrue(os.path.exists(local_fname)) self.assertTrue(os.path.exists(local_fname))
with open(local_fname) as f: with open(local_fname) as f:
self.assertEqual(f.read(), "I have the power!") self.assertEqual(f.read(), "I have the power!")

View File

@ -17,49 +17,47 @@ class HeadersTest(unittest.TestCase):
self.assertEqual(h.get('Accept'), None) self.assertEqual(h.get('Accept'), None)
self.assertEqual(h.getlist('Accept'), []) self.assertEqual(h.getlist('Accept'), [])
self.assertEqual(h.get('Accept', '*/*'), '*/*') self.assertEqual(h.get('Accept', '*/*'), b'*/*')
self.assertEqual(h.getlist('Accept', '*/*'), ['*/*']) self.assertEqual(h.getlist('Accept', '*/*'), [b'*/*'])
self.assertEqual(h.getlist('Accept', ['text/html', 'images/jpeg']), ['text/html','images/jpeg']) self.assertEqual(h.getlist('Accept', ['text/html', 'images/jpeg']),
[b'text/html', b'images/jpeg'])
def test_single_value(self): def test_single_value(self):
h = Headers() h = Headers()
h['Content-Type'] = 'text/html' h['Content-Type'] = 'text/html'
self.assertEqual(h['Content-Type'], 'text/html') self.assertEqual(h['Content-Type'], b'text/html')
self.assertEqual(h.get('Content-Type'), 'text/html') self.assertEqual(h.get('Content-Type'), b'text/html')
self.assertEqual(h.getlist('Content-Type'), ['text/html']) self.assertEqual(h.getlist('Content-Type'), [b'text/html'])
def test_multivalue(self): def test_multivalue(self):
h = Headers() h = Headers()
h['X-Forwarded-For'] = hlist = ['ip1', 'ip2'] h['X-Forwarded-For'] = hlist = ['ip1', 'ip2']
self.assertEqual(h['X-Forwarded-For'], 'ip2') self.assertEqual(h['X-Forwarded-For'], b'ip2')
self.assertEqual(h.get('X-Forwarded-For'), 'ip2') self.assertEqual(h.get('X-Forwarded-For'), b'ip2')
self.assertEqual(h.getlist('X-Forwarded-For'), hlist) self.assertEqual(h.getlist('X-Forwarded-For'), [b'ip1', b'ip2'])
assert h.getlist('X-Forwarded-For') is not hlist assert h.getlist('X-Forwarded-For') is not hlist
def test_encode_utf8(self): def test_encode_utf8(self):
h = Headers({u'key': u'\xa3'}, encoding='utf-8') h = Headers({u'key': u'\xa3'}, encoding='utf-8')
key, val = dict(h).items()[0] key, val = dict(h).popitem()
assert isinstance(key, str), key assert isinstance(key, bytes), key
assert isinstance(val[0], str), val[0] assert isinstance(val[0], bytes), val[0]
self.assertEqual(val[0], '\xc2\xa3') self.assertEqual(val[0], b'\xc2\xa3')
def test_encode_latin1(self): def test_encode_latin1(self):
h = Headers({u'key': u'\xa3'}, encoding='latin1') h = Headers({u'key': u'\xa3'}, encoding='latin1')
key, val = dict(h).items()[0] key, val = dict(h).popitem()
self.assertEqual(val[0], '\xa3') self.assertEqual(val[0], b'\xa3')
def test_encode_multiple(self): def test_encode_multiple(self):
h = Headers({u'key': [u'\xa3']}, encoding='utf-8') h = Headers({u'key': [u'\xa3']}, encoding='utf-8')
key, val = dict(h).items()[0] key, val = dict(h).popitem()
self.assertEqual(val[0], '\xc2\xa3') self.assertEqual(val[0], b'\xc2\xa3')
def test_delete_and_contains(self): def test_delete_and_contains(self):
h = Headers() h = Headers()
h['Content-Type'] = 'text/html' h['Content-Type'] = 'text/html'
assert 'Content-Type' in h assert 'Content-Type' in h
del h['Content-Type'] del h['Content-Type']
assert 'Content-Type' not in h assert 'Content-Type' not in h
@ -72,7 +70,7 @@ class HeadersTest(unittest.TestCase):
h = Headers() h = Headers()
olist = h.setdefault('X-Forwarded-For', 'ip1') olist = h.setdefault('X-Forwarded-For', 'ip1')
self.assertEqual(h.getlist('X-Forwarded-For'), ['ip1']) self.assertEqual(h.getlist('X-Forwarded-For'), [b'ip1'])
assert h.getlist('X-Forwarded-For') is olist assert h.getlist('X-Forwarded-For') is olist
def test_iterables(self): def test_iterables(self):
@ -80,18 +78,24 @@ class HeadersTest(unittest.TestCase):
h = Headers(idict) h = Headers(idict)
self.assertDictEqual(dict(h), self.assertDictEqual(dict(h),
{'Content-Type': ['text/html'], 'X-Forwarded-For': ['ip1', 'ip2']}) {b'Content-Type': [b'text/html'],
self.assertSortedEqual(h.keys(), ['X-Forwarded-For', 'Content-Type']) b'X-Forwarded-For': [b'ip1', b'ip2']})
self.assertSortedEqual(h.items(), [('X-Forwarded-For', ['ip1', 'ip2']), ('Content-Type', ['text/html'])]) self.assertSortedEqual(h.keys(),
self.assertSortedEqual(h.iteritems(), [('X-Forwarded-For', ['ip1', 'ip2']), ('Content-Type', ['text/html'])]) [b'X-Forwarded-For', b'Content-Type'])
self.assertSortedEqual(h.items(),
self.assertSortedEqual(h.values(), ['ip2', 'text/html']) [(b'X-Forwarded-For', [b'ip1', b'ip2']),
(b'Content-Type', [b'text/html'])])
self.assertSortedEqual(h.iteritems(),
[(b'X-Forwarded-For', [b'ip1', b'ip2']),
(b'Content-Type', [b'text/html'])])
self.assertSortedEqual(h.values(), [b'ip2', b'text/html'])
def test_update(self): def test_update(self):
h = Headers() h = Headers()
h.update({'Content-Type': 'text/html', 'X-Forwarded-For': ['ip1', 'ip2']}) h.update({'Content-Type': 'text/html',
self.assertEqual(h.getlist('Content-Type'), ['text/html']) 'X-Forwarded-For': ['ip1', 'ip2']})
self.assertEqual(h.getlist('X-Forwarded-For'), ['ip1', 'ip2']) self.assertEqual(h.getlist('Content-Type'), [b'text/html'])
self.assertEqual(h.getlist('X-Forwarded-For'), [b'ip1', b'ip2'])
def test_copy(self): def test_copy(self):
h1 = Headers({'header1': ['value1', 'value2']}) h1 = Headers({'header1': ['value1', 'value2']})
@ -104,25 +108,25 @@ class HeadersTest(unittest.TestCase):
def test_appendlist(self): def test_appendlist(self):
h1 = Headers({'header1': 'value1'}) h1 = Headers({'header1': 'value1'})
h1.appendlist('header1', 'value3') h1.appendlist('header1', 'value3')
self.assertEqual(h1.getlist('header1'), ['value1', 'value3']) self.assertEqual(h1.getlist('header1'), [b'value1', b'value3'])
h1 = Headers() h1 = Headers()
h1.appendlist('header1', 'value1') h1.appendlist('header1', 'value1')
h1.appendlist('header1', 'value3') h1.appendlist('header1', 'value3')
self.assertEqual(h1.getlist('header1'), ['value1', 'value3']) self.assertEqual(h1.getlist('header1'), [b'value1', b'value3'])
def test_setlist(self): def test_setlist(self):
h1 = Headers({'header1': 'value1'}) h1 = Headers({'header1': 'value1'})
self.assertEqual(h1.getlist('header1'), ['value1']) self.assertEqual(h1.getlist('header1'), [b'value1'])
h1.setlist('header1', ['value2', 'value3']) h1.setlist('header1', [b'value2', b'value3'])
self.assertEqual(h1.getlist('header1'), ['value2', 'value3']) self.assertEqual(h1.getlist('header1'), [b'value2', b'value3'])
def test_setlistdefault(self): def test_setlistdefault(self):
h1 = Headers({'header1': 'value1'}) h1 = Headers({'header1': 'value1'})
h1.setlistdefault('header1', ['value2', 'value3']) h1.setlistdefault('header1', ['value2', 'value3'])
h1.setlistdefault('header2', ['value2', 'value3']) h1.setlistdefault('header2', ['value2', 'value3'])
self.assertEqual(h1.getlist('header1'), ['value1']) self.assertEqual(h1.getlist('header1'), [b'value1'])
self.assertEqual(h1.getlist('header2'), ['value2', 'value3']) self.assertEqual(h1.getlist('header2'), [b'value2', b'value3'])
def test_none_value(self): def test_none_value(self):
h1 = Headers() h1 = Headers()
@ -131,3 +135,23 @@ class HeadersTest(unittest.TestCase):
h1.setdefault('foo', 'bar') h1.setdefault('foo', 'bar')
self.assertEqual(h1.get('foo'), None) self.assertEqual(h1.get('foo'), None)
self.assertEqual(h1.getlist('foo'), []) self.assertEqual(h1.getlist('foo'), [])
def test_int_value(self):
h1 = Headers({'hey': 5})
h1['foo'] = 1
h1.setdefault('bar', 2)
h1.setlist('buz', [1, 'dos', 3])
self.assertEqual(h1.getlist('foo'), [b'1'])
self.assertEqual(h1.getlist('bar'), [b'2'])
self.assertEqual(h1.getlist('buz'), [b'1', b'dos', b'3'])
self.assertEqual(h1.getlist('hey'), [b'5'])
def test_invalid_value(self):
self.assertRaisesRegexp(TypeError, 'Unsupported value type',
Headers, {'foo': object()})
self.assertRaisesRegexp(TypeError, 'Unsupported value type',
Headers().__setitem__, 'foo', object())
self.assertRaisesRegexp(TypeError, 'Unsupported value type',
Headers().setdefault, 'foo', object())
self.assertRaisesRegexp(TypeError, 'Unsupported value type',
Headers().setlist, 'foo', [object()])