mirror of
https://github.com/scrapy/scrapy.git
synced 2025-02-23 09:43:50 +00:00
- change ``failIf`` to ``assertFalse`` - change ``asertEquals`` to ``assertEqual`` - change ``assert_`` to ``assertTrue`` https://docs.python.org/2/library/unittest.html#deprecated-aliases
252 lines
10 KiB
Python
252 lines
10 KiB
Python
from io import BytesIO
|
|
from unittest import TestCase, SkipTest
|
|
from os.path import join
|
|
from gzip import GzipFile
|
|
|
|
from scrapy.spiders import Spider
|
|
from scrapy.http import Response, Request, HtmlResponse
|
|
from scrapy.downloadermiddlewares.httpcompression import HttpCompressionMiddleware, \
|
|
ACCEPTED_ENCODINGS
|
|
from scrapy.responsetypes import responsetypes
|
|
from scrapy.utils.gz import gunzip
|
|
from tests import tests_datadir
|
|
from w3lib.encoding import resolve_encoding
|
|
|
|
|
|
SAMPLEDIR = join(tests_datadir, 'compressed')
|
|
|
|
FORMAT = {
|
|
'gzip': ('html-gzip.bin', 'gzip'),
|
|
'x-gzip': ('html-gzip.bin', 'gzip'),
|
|
'rawdeflate': ('html-rawdeflate.bin', 'deflate'),
|
|
'zlibdeflate': ('html-zlibdeflate.bin', 'deflate'),
|
|
'br': ('html-br.bin', 'br')
|
|
}
|
|
|
|
|
|
class HttpCompressionTest(TestCase):
|
|
|
|
def setUp(self):
|
|
self.spider = Spider('foo')
|
|
self.mw = HttpCompressionMiddleware()
|
|
|
|
def _getresponse(self, coding):
|
|
if coding not in FORMAT:
|
|
raise ValueError()
|
|
|
|
samplefile, contentencoding = FORMAT[coding]
|
|
|
|
with open(join(SAMPLEDIR, samplefile), 'rb') as sample:
|
|
body = sample.read()
|
|
|
|
headers = {
|
|
'Server': 'Yaws/1.49 Yet Another Web Server',
|
|
'Date': 'Sun, 08 Mar 2009 00:41:03 GMT',
|
|
'Content-Length': len(body),
|
|
'Content-Type': 'text/html',
|
|
'Content-Encoding': contentencoding,
|
|
}
|
|
|
|
response = Response('http://scrapytest.org/', body=body, headers=headers)
|
|
response.request = Request('http://scrapytest.org', headers={'Accept-Encoding': 'gzip,deflate'})
|
|
return response
|
|
|
|
def test_process_request(self):
|
|
request = Request('http://scrapytest.org')
|
|
assert 'Accept-Encoding' not in request.headers
|
|
self.mw.process_request(request, self.spider)
|
|
self.assertEqual(request.headers.get('Accept-Encoding'),
|
|
b','.join(ACCEPTED_ENCODINGS))
|
|
|
|
def test_process_response_gzip(self):
|
|
response = self._getresponse('gzip')
|
|
request = response.request
|
|
|
|
self.assertEqual(response.headers['Content-Encoding'], b'gzip')
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert newresponse is not response
|
|
assert newresponse.body.startswith(b'<!DOCTYPE')
|
|
assert 'Content-Encoding' not in newresponse.headers
|
|
|
|
def test_process_response_br(self):
|
|
try:
|
|
import brotli
|
|
except ImportError:
|
|
raise SkipTest("no brotli")
|
|
response = self._getresponse('br')
|
|
request = response.request
|
|
self.assertEqual(response.headers['Content-Encoding'], b'br')
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert newresponse is not response
|
|
assert newresponse.body.startswith(b"<!DOCTYPE")
|
|
assert 'Content-Encoding' not in newresponse.headers
|
|
|
|
def test_process_response_rawdeflate(self):
|
|
response = self._getresponse('rawdeflate')
|
|
request = response.request
|
|
|
|
self.assertEqual(response.headers['Content-Encoding'], b'deflate')
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert newresponse is not response
|
|
assert newresponse.body.startswith(b'<!DOCTYPE')
|
|
assert 'Content-Encoding' not in newresponse.headers
|
|
|
|
def test_process_response_zlibdelate(self):
|
|
response = self._getresponse('zlibdeflate')
|
|
request = response.request
|
|
|
|
self.assertEqual(response.headers['Content-Encoding'], b'deflate')
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert newresponse is not response
|
|
assert newresponse.body.startswith(b'<!DOCTYPE')
|
|
assert 'Content-Encoding' not in newresponse.headers
|
|
|
|
def test_process_response_plain(self):
|
|
response = Response('http://scrapytest.org', body=b'<!DOCTYPE...')
|
|
request = Request('http://scrapytest.org')
|
|
|
|
assert not response.headers.get('Content-Encoding')
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert newresponse is response
|
|
assert newresponse.body.startswith(b'<!DOCTYPE')
|
|
|
|
def test_multipleencodings(self):
|
|
response = self._getresponse('gzip')
|
|
response.headers['Content-Encoding'] = ['uuencode', 'gzip']
|
|
request = response.request
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert newresponse is not response
|
|
self.assertEqual(newresponse.headers.getlist('Content-Encoding'), [b'uuencode'])
|
|
|
|
def test_process_response_encoding_inside_body(self):
|
|
headers = {
|
|
'Content-Type': 'text/html',
|
|
'Content-Encoding': 'gzip',
|
|
}
|
|
f = BytesIO()
|
|
plainbody = b"""<html><head><title>Some page</title><meta http-equiv="Content-Type" content="text/html; charset=gb2312">"""
|
|
zf = GzipFile(fileobj=f, mode='wb')
|
|
zf.write(plainbody)
|
|
zf.close()
|
|
response = Response("http;//www.example.com/", headers=headers, body=f.getvalue())
|
|
request = Request("http://www.example.com/")
|
|
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert isinstance(newresponse, HtmlResponse)
|
|
self.assertEqual(newresponse.body, plainbody)
|
|
self.assertEqual(newresponse.encoding, resolve_encoding('gb2312'))
|
|
|
|
def test_process_response_force_recalculate_encoding(self):
|
|
headers = {
|
|
'Content-Type': 'text/html',
|
|
'Content-Encoding': 'gzip',
|
|
}
|
|
f = BytesIO()
|
|
plainbody = b"""<html><head><title>Some page</title><meta http-equiv="Content-Type" content="text/html; charset=gb2312">"""
|
|
zf = GzipFile(fileobj=f, mode='wb')
|
|
zf.write(plainbody)
|
|
zf.close()
|
|
response = HtmlResponse("http;//www.example.com/page.html", headers=headers, body=f.getvalue())
|
|
request = Request("http://www.example.com/")
|
|
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert isinstance(newresponse, HtmlResponse)
|
|
self.assertEqual(newresponse.body, plainbody)
|
|
self.assertEqual(newresponse.encoding, resolve_encoding('gb2312'))
|
|
|
|
def test_process_response_no_content_type_header(self):
|
|
headers = {
|
|
'Content-Encoding': 'identity',
|
|
}
|
|
plainbody = b"""<html><head><title>Some page</title><meta http-equiv="Content-Type" content="text/html; charset=gb2312">"""
|
|
respcls = responsetypes.from_args(url="http://www.example.com/index", headers=headers, body=plainbody)
|
|
response = respcls("http://www.example.com/index", headers=headers, body=plainbody)
|
|
request = Request("http://www.example.com/index")
|
|
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert isinstance(newresponse, respcls)
|
|
self.assertEqual(newresponse.body, plainbody)
|
|
self.assertEqual(newresponse.encoding, resolve_encoding('gb2312'))
|
|
|
|
def test_process_response_gzipped_contenttype(self):
|
|
response = self._getresponse('gzip')
|
|
response.headers['Content-Type'] = 'application/gzip'
|
|
request = response.request
|
|
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
self.assertIsNot(newresponse, response)
|
|
self.assertTrue(newresponse.body.startswith(b'<!DOCTYPE'))
|
|
self.assertNotIn('Content-Encoding', newresponse.headers)
|
|
|
|
def test_process_response_gzip_app_octetstream_contenttype(self):
|
|
response = self._getresponse('gzip')
|
|
response.headers['Content-Type'] = 'application/octet-stream'
|
|
request = response.request
|
|
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
self.assertIsNot(newresponse, response)
|
|
self.assertTrue(newresponse.body.startswith(b'<!DOCTYPE'))
|
|
self.assertNotIn('Content-Encoding', newresponse.headers)
|
|
|
|
def test_process_response_gzip_binary_octetstream_contenttype(self):
|
|
response = self._getresponse('x-gzip')
|
|
response.headers['Content-Type'] = 'binary/octet-stream'
|
|
request = response.request
|
|
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
self.assertIsNot(newresponse, response)
|
|
self.assertTrue(newresponse.body.startswith(b'<!DOCTYPE'))
|
|
self.assertNotIn('Content-Encoding', newresponse.headers)
|
|
|
|
def test_process_response_gzipped_gzip_file(self):
|
|
"""Test that a gzip Content-Encoded .gz file is gunzipped
|
|
only once by the middleware, leaving gunzipping of the file
|
|
to upper layers.
|
|
"""
|
|
headers = {
|
|
'Content-Type': 'application/gzip',
|
|
'Content-Encoding': 'gzip',
|
|
}
|
|
# build a gzipped file (here, a sitemap)
|
|
f = BytesIO()
|
|
plainbody = b"""<?xml version="1.0" encoding="UTF-8"?>
|
|
<urlset xmlns="http://www.google.com/schemas/sitemap/0.84">
|
|
<url>
|
|
<loc>http://www.example.com/</loc>
|
|
<lastmod>2009-08-16</lastmod>
|
|
<changefreq>daily</changefreq>
|
|
<priority>1</priority>
|
|
</url>
|
|
<url>
|
|
<loc>http://www.example.com/Special-Offers.html</loc>
|
|
<lastmod>2009-08-16</lastmod>
|
|
<changefreq>weekly</changefreq>
|
|
<priority>0.8</priority>
|
|
</url>
|
|
</urlset>"""
|
|
gz_file = GzipFile(fileobj=f, mode='wb')
|
|
gz_file.write(plainbody)
|
|
gz_file.close()
|
|
|
|
# build a gzipped response body containing this gzipped file
|
|
r = BytesIO()
|
|
gz_resp = GzipFile(fileobj=r, mode='wb')
|
|
gz_resp.write(f.getvalue())
|
|
gz_resp.close()
|
|
|
|
response = Response("http;//www.example.com/", headers=headers, body=r.getvalue())
|
|
request = Request("http://www.example.com/")
|
|
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
self.assertEqual(gunzip(newresponse.body), plainbody)
|
|
|
|
def test_process_response_head_request_no_decode_required(self):
|
|
response = self._getresponse('gzip')
|
|
response.headers['Content-Type'] = 'application/gzip'
|
|
request = response.request
|
|
request.method = 'HEAD'
|
|
response = response.replace(body = None)
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
self.assertIs(newresponse, response)
|
|
self.assertEqual(response.body, b'')
|