mirror of
https://github.com/scrapy/scrapy.git
synced 2025-02-23 23:23:44 +00:00
194 lines
8.0 KiB
Python
194 lines
8.0 KiB
Python
from io import BytesIO
|
|
from unittest import TestCase, SkipTest
|
|
from os.path import join
|
|
from gzip import GzipFile
|
|
|
|
from scrapy.spiders import Spider
|
|
from scrapy.http import Response, Request, HtmlResponse
|
|
from scrapy.downloadermiddlewares.httpcompression import HttpCompressionMiddleware, \
|
|
ACCEPTED_ENCODINGS
|
|
from tests import tests_datadir
|
|
from w3lib.encoding import resolve_encoding
|
|
|
|
|
|
SAMPLEDIR = join(tests_datadir, 'compressed')
|
|
|
|
FORMAT = {
|
|
'gzip': ('html-gzip.bin', 'gzip'),
|
|
'x-gzip': ('html-gzip.bin', 'gzip'),
|
|
'rawdeflate': ('html-rawdeflate.bin', 'deflate'),
|
|
'zlibdeflate': ('html-zlibdeflate.bin', 'deflate'),
|
|
'br': ('html-br.bin', 'br')
|
|
}
|
|
|
|
|
|
class HttpCompressionTest(TestCase):
|
|
|
|
def setUp(self):
|
|
self.spider = Spider('foo')
|
|
self.mw = HttpCompressionMiddleware()
|
|
|
|
def _getresponse(self, coding):
|
|
if coding not in FORMAT:
|
|
raise ValueError()
|
|
|
|
samplefile, contentencoding = FORMAT[coding]
|
|
|
|
with open(join(SAMPLEDIR, samplefile), 'rb') as sample:
|
|
body = sample.read()
|
|
|
|
headers = {
|
|
'Server': 'Yaws/1.49 Yet Another Web Server',
|
|
'Date': 'Sun, 08 Mar 2009 00:41:03 GMT',
|
|
'Content-Length': len(body),
|
|
'Content-Type': 'text/html',
|
|
'Content-Encoding': contentencoding,
|
|
}
|
|
|
|
response = Response('http://scrapytest.org/', body=body, headers=headers)
|
|
response.request = Request('http://scrapytest.org', headers={'Accept-Encoding': 'gzip,deflate'})
|
|
return response
|
|
|
|
def test_process_request(self):
|
|
request = Request('http://scrapytest.org')
|
|
assert 'Accept-Encoding' not in request.headers
|
|
self.mw.process_request(request, self.spider)
|
|
self.assertEqual(request.headers.get('Accept-Encoding'),
|
|
b','.join(ACCEPTED_ENCODINGS))
|
|
|
|
def test_process_response_gzip(self):
|
|
response = self._getresponse('gzip')
|
|
request = response.request
|
|
|
|
self.assertEqual(response.headers['Content-Encoding'], b'gzip')
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert newresponse is not response
|
|
assert newresponse.body.startswith(b'<!DOCTYPE')
|
|
assert 'Content-Encoding' not in newresponse.headers
|
|
|
|
def test_process_response_br(self):
|
|
try:
|
|
import brotli
|
|
except ImportError:
|
|
raise SkipTest("no brotli")
|
|
response = self._getresponse('br')
|
|
request = response.request
|
|
self.assertEqual(response.headers['Content-Encoding'], b'br')
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert newresponse is not response
|
|
assert newresponse.body.startswith(b"<!DOCTYPE")
|
|
assert 'Content-Encoding' not in newresponse.headers
|
|
|
|
def test_process_response_rawdeflate(self):
|
|
response = self._getresponse('rawdeflate')
|
|
request = response.request
|
|
|
|
self.assertEqual(response.headers['Content-Encoding'], b'deflate')
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert newresponse is not response
|
|
assert newresponse.body.startswith(b'<!DOCTYPE')
|
|
assert 'Content-Encoding' not in newresponse.headers
|
|
|
|
def test_process_response_zlibdelate(self):
|
|
response = self._getresponse('zlibdeflate')
|
|
request = response.request
|
|
|
|
self.assertEqual(response.headers['Content-Encoding'], b'deflate')
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert newresponse is not response
|
|
assert newresponse.body.startswith(b'<!DOCTYPE')
|
|
assert 'Content-Encoding' not in newresponse.headers
|
|
|
|
def test_process_response_plain(self):
|
|
response = Response('http://scrapytest.org', body=b'<!DOCTYPE...')
|
|
request = Request('http://scrapytest.org')
|
|
|
|
assert not response.headers.get('Content-Encoding')
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert newresponse is response
|
|
assert newresponse.body.startswith(b'<!DOCTYPE')
|
|
|
|
def test_multipleencodings(self):
|
|
response = self._getresponse('gzip')
|
|
response.headers['Content-Encoding'] = ['uuencode', 'gzip']
|
|
request = response.request
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert newresponse is not response
|
|
self.assertEqual(newresponse.headers.getlist('Content-Encoding'), [b'uuencode'])
|
|
|
|
def test_process_response_encoding_inside_body(self):
|
|
headers = {
|
|
'Content-Type': 'text/html',
|
|
'Content-Encoding': 'gzip',
|
|
}
|
|
f = BytesIO()
|
|
plainbody = b"""<html><head><title>Some page</title><meta http-equiv="Content-Type" content="text/html; charset=gb2312">"""
|
|
zf = GzipFile(fileobj=f, mode='wb')
|
|
zf.write(plainbody)
|
|
zf.close()
|
|
response = Response("http;//www.example.com/", headers=headers, body=f.getvalue())
|
|
request = Request("http://www.example.com/")
|
|
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert isinstance(newresponse, HtmlResponse)
|
|
self.assertEqual(newresponse.body, plainbody)
|
|
self.assertEqual(newresponse.encoding, resolve_encoding('gb2312'))
|
|
|
|
def test_process_response_force_recalculate_encoding(self):
|
|
headers = {
|
|
'Content-Type': 'text/html',
|
|
'Content-Encoding': 'gzip',
|
|
}
|
|
f = BytesIO()
|
|
plainbody = b"""<html><head><title>Some page</title><meta http-equiv="Content-Type" content="text/html; charset=gb2312">"""
|
|
zf = GzipFile(fileobj=f, mode='wb')
|
|
zf.write(plainbody)
|
|
zf.close()
|
|
response = HtmlResponse("http;//www.example.com/page.html", headers=headers, body=f.getvalue())
|
|
request = Request("http://www.example.com/")
|
|
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
assert isinstance(newresponse, HtmlResponse)
|
|
self.assertEqual(newresponse.body, plainbody)
|
|
self.assertEqual(newresponse.encoding, resolve_encoding('gb2312'))
|
|
|
|
def test_process_response_gzipped_contenttype(self):
|
|
response = self._getresponse('gzip')
|
|
response.headers['Content-Type'] = 'application/gzip'
|
|
request = response.request
|
|
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
self.assertIs(newresponse, response)
|
|
self.assertEqual(response.headers['Content-Encoding'], b'gzip')
|
|
self.assertEqual(response.headers['Content-Type'], b'application/gzip')
|
|
|
|
def test_process_response_gzip_app_octetstream_contenttype(self):
|
|
response = self._getresponse('gzip')
|
|
response.headers['Content-Type'] = 'application/octet-stream'
|
|
request = response.request
|
|
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
self.assertIs(newresponse, response)
|
|
self.assertEqual(response.headers['Content-Encoding'], b'gzip')
|
|
self.assertEqual(response.headers['Content-Type'], b'application/octet-stream')
|
|
|
|
def test_process_response_gzip_binary_octetstream_contenttype(self):
|
|
response = self._getresponse('x-gzip')
|
|
response.headers['Content-Type'] = 'binary/octet-stream'
|
|
request = response.request
|
|
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
self.assertIs(newresponse, response)
|
|
self.assertEqual(response.headers['Content-Encoding'], b'gzip')
|
|
self.assertEqual(response.headers['Content-Type'], b'binary/octet-stream')
|
|
|
|
def test_process_response_head_request_no_decode_required(self):
|
|
response = self._getresponse('gzip')
|
|
response.headers['Content-Type'] = 'application/gzip'
|
|
request = response.request
|
|
request.method = 'HEAD'
|
|
response = response.replace(body = None)
|
|
newresponse = self.mw.process_response(request, response, self.spider)
|
|
self.assertIs(newresponse, response)
|
|
self.assertEquals(response.body, b'')
|