1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-06 11:00:46 +00:00
scrapy/tests/test_downloadermiddleware_httpcompression.py
2024-12-12 20:33:35 +05:00

702 lines
27 KiB
Python

from gzip import GzipFile
from io import BytesIO
from logging import WARNING
from pathlib import Path
from unittest import SkipTest, TestCase
from testfixtures import LogCapture
from w3lib.encoding import resolve_encoding
from scrapy.downloadermiddlewares.httpcompression import (
ACCEPTED_ENCODINGS,
HttpCompressionMiddleware,
)
from scrapy.exceptions import IgnoreRequest, NotConfigured
from scrapy.http import HtmlResponse, Request, Response
from scrapy.responsetypes import responsetypes
from scrapy.spiders import Spider
from scrapy.utils.gz import gunzip
from scrapy.utils.test import get_crawler
from tests import tests_datadir
SAMPLEDIR = Path(tests_datadir, "compressed")
FORMAT = {
"gzip": ("html-gzip.bin", "gzip"),
"x-gzip": ("html-gzip.bin", "gzip"),
"rawdeflate": ("html-rawdeflate.bin", "deflate"),
"zlibdeflate": ("html-zlibdeflate.bin", "deflate"),
"gzip-deflate": ("html-gzip-deflate.bin", "gzip, deflate"),
"gzip-deflate-gzip": ("html-gzip-deflate-gzip.bin", "gzip, deflate, gzip"),
"br": ("html-br.bin", "br"),
# $ zstd raw.html --content-size -o html-zstd-static-content-size.bin
"zstd-static-content-size": ("html-zstd-static-content-size.bin", "zstd"),
# $ zstd raw.html --no-content-size -o html-zstd-static-no-content-size.bin
"zstd-static-no-content-size": ("html-zstd-static-no-content-size.bin", "zstd"),
# $ cat raw.html | zstd -o html-zstd-streaming-no-content-size.bin
"zstd-streaming-no-content-size": (
"html-zstd-streaming-no-content-size.bin",
"zstd",
),
**{
f"bomb-{format_id}": (f"bomb-{format_id}.bin", format_id)
for format_id in (
"br", # 34 → 11 511 612
"deflate", # 27 968 → 11 511 612
"gzip", # 27 988 → 11 511 612
"zstd", # 1 096 → 11 511 612
)
},
}
class HttpCompressionTest(TestCase):
def setUp(self):
self.crawler = get_crawler(Spider)
self.spider = self.crawler._create_spider("scrapytest.org")
self.mw = HttpCompressionMiddleware.from_crawler(self.crawler)
self.crawler.stats.open_spider(self.spider)
def _getresponse(self, coding):
if coding not in FORMAT:
raise ValueError
samplefile, contentencoding = FORMAT[coding]
body = (SAMPLEDIR / samplefile).read_bytes()
headers = {
"Server": "Yaws/1.49 Yet Another Web Server",
"Date": "Sun, 08 Mar 2009 00:41:03 GMT",
"Content-Length": len(body),
"Content-Type": "text/html",
"Content-Encoding": contentencoding,
}
response = Response("http://scrapytest.org/", body=body, headers=headers)
response.request = Request(
"http://scrapytest.org", headers={"Accept-Encoding": "gzip, deflate"}
)
return response
def assertStatsEqual(self, key, value):
self.assertEqual(
self.crawler.stats.get_value(key, spider=self.spider),
value,
str(self.crawler.stats.get_stats(self.spider)),
)
def test_setting_false_compression_enabled(self):
self.assertRaises(
NotConfigured,
HttpCompressionMiddleware.from_crawler,
get_crawler(settings_dict={"COMPRESSION_ENABLED": False}),
)
def test_setting_default_compression_enabled(self):
self.assertIsInstance(
HttpCompressionMiddleware.from_crawler(get_crawler()),
HttpCompressionMiddleware,
)
def test_setting_true_compression_enabled(self):
self.assertIsInstance(
HttpCompressionMiddleware.from_crawler(
get_crawler(settings_dict={"COMPRESSION_ENABLED": True})
),
HttpCompressionMiddleware,
)
def test_process_request(self):
request = Request("http://scrapytest.org")
assert "Accept-Encoding" not in request.headers
self.mw.process_request(request, self.spider)
self.assertEqual(
request.headers.get("Accept-Encoding"), b", ".join(ACCEPTED_ENCODINGS)
)
def test_process_response_gzip(self):
response = self._getresponse("gzip")
request = response.request
self.assertEqual(response.headers["Content-Encoding"], b"gzip")
newresponse = self.mw.process_response(request, response, self.spider)
assert newresponse is not response
assert newresponse.body.startswith(b"<!DOCTYPE")
assert "Content-Encoding" not in newresponse.headers
self.assertStatsEqual("httpcompression/response_count", 1)
self.assertStatsEqual("httpcompression/response_bytes", 74837)
def test_process_response_br(self):
try:
try:
import brotli # noqa: F401
except ImportError:
import brotlicffi # noqa: F401
except ImportError:
raise SkipTest("no brotli")
response = self._getresponse("br")
request = response.request
self.assertEqual(response.headers["Content-Encoding"], b"br")
newresponse = self.mw.process_response(request, response, self.spider)
assert newresponse is not response
assert newresponse.body.startswith(b"<!DOCTYPE")
assert "Content-Encoding" not in newresponse.headers
self.assertStatsEqual("httpcompression/response_count", 1)
self.assertStatsEqual("httpcompression/response_bytes", 74837)
def test_process_response_zstd(self):
try:
import zstandard # noqa: F401
except ImportError:
raise SkipTest("no zstd support (zstandard)")
raw_content = None
for check_key in FORMAT:
if not check_key.startswith("zstd-"):
continue
response = self._getresponse(check_key)
request = response.request
self.assertEqual(response.headers["Content-Encoding"], b"zstd")
newresponse = self.mw.process_response(request, response, self.spider)
if raw_content is None:
raw_content = newresponse.body
else:
assert raw_content == newresponse.body
assert newresponse is not response
assert newresponse.body.startswith(b"<!DOCTYPE")
assert "Content-Encoding" not in newresponse.headers
def test_process_response_rawdeflate(self):
response = self._getresponse("rawdeflate")
request = response.request
self.assertEqual(response.headers["Content-Encoding"], b"deflate")
newresponse = self.mw.process_response(request, response, self.spider)
assert newresponse is not response
assert newresponse.body.startswith(b"<!DOCTYPE")
assert "Content-Encoding" not in newresponse.headers
self.assertStatsEqual("httpcompression/response_count", 1)
self.assertStatsEqual("httpcompression/response_bytes", 74840)
def test_process_response_zlibdelate(self):
response = self._getresponse("zlibdeflate")
request = response.request
self.assertEqual(response.headers["Content-Encoding"], b"deflate")
newresponse = self.mw.process_response(request, response, self.spider)
assert newresponse is not response
assert newresponse.body.startswith(b"<!DOCTYPE")
assert "Content-Encoding" not in newresponse.headers
self.assertStatsEqual("httpcompression/response_count", 1)
self.assertStatsEqual("httpcompression/response_bytes", 74840)
def test_process_response_plain(self):
response = Response("http://scrapytest.org", body=b"<!DOCTYPE...")
request = Request("http://scrapytest.org")
assert not response.headers.get("Content-Encoding")
newresponse = self.mw.process_response(request, response, self.spider)
assert newresponse is response
assert newresponse.body.startswith(b"<!DOCTYPE")
self.assertStatsEqual("httpcompression/response_count", None)
self.assertStatsEqual("httpcompression/response_bytes", None)
def test_multipleencodings(self):
response = self._getresponse("gzip")
response.headers["Content-Encoding"] = ["uuencode", "gzip"]
request = response.request
newresponse = self.mw.process_response(request, response, self.spider)
assert newresponse is not response
self.assertEqual(newresponse.headers.getlist("Content-Encoding"), [b"uuencode"])
def test_multi_compression_single_header(self):
response = self._getresponse("gzip-deflate")
request = response.request
newresponse = self.mw.process_response(request, response, self.spider)
assert newresponse is not response
assert "Content-Encoding" not in newresponse.headers
assert newresponse.body.startswith(b"<!DOCTYPE")
def test_multi_compression_single_header_invalid_compression(self):
response = self._getresponse("gzip-deflate")
response.headers["Content-Encoding"] = [b"gzip, foo, deflate"]
request = response.request
newresponse = self.mw.process_response(request, response, self.spider)
assert newresponse is not response
self.assertEqual(
newresponse.headers.getlist("Content-Encoding"), [b"gzip", b"foo"]
)
def test_multi_compression_multiple_header(self):
response = self._getresponse("gzip-deflate")
response.headers["Content-Encoding"] = ["gzip", "deflate"]
request = response.request
newresponse = self.mw.process_response(request, response, self.spider)
assert newresponse is not response
assert "Content-Encoding" not in newresponse.headers
assert newresponse.body.startswith(b"<!DOCTYPE")
def test_multi_compression_multiple_header_invalid_compression(self):
response = self._getresponse("gzip-deflate")
response.headers["Content-Encoding"] = ["gzip", "foo", "deflate"]
request = response.request
newresponse = self.mw.process_response(request, response, self.spider)
assert newresponse is not response
self.assertEqual(
newresponse.headers.getlist("Content-Encoding"), [b"gzip", b"foo"]
)
def test_multi_compression_single_and_multiple_header(self):
response = self._getresponse("gzip-deflate-gzip")
response.headers["Content-Encoding"] = ["gzip", "deflate, gzip"]
request = response.request
newresponse = self.mw.process_response(request, response, self.spider)
assert newresponse is not response
assert "Content-Encoding" not in newresponse.headers
assert newresponse.body.startswith(b"<!DOCTYPE")
def test_multi_compression_single_and_multiple_header_invalid_compression(self):
response = self._getresponse("gzip-deflate")
response.headers["Content-Encoding"] = ["gzip", "foo,deflate"]
request = response.request
newresponse = self.mw.process_response(request, response, self.spider)
assert newresponse is not response
self.assertEqual(
newresponse.headers.getlist("Content-Encoding"), [b"gzip", b"foo"]
)
def test_process_response_encoding_inside_body(self):
headers = {
"Content-Type": "text/html",
"Content-Encoding": "gzip",
}
f = BytesIO()
plainbody = (
b"<html><head><title>Some page</title>"
b'<meta http-equiv="Content-Type" content="text/html; charset=gb2312">'
)
zf = GzipFile(fileobj=f, mode="wb")
zf.write(plainbody)
zf.close()
response = Response(
"http;//www.example.com/", headers=headers, body=f.getvalue()
)
request = Request("http://www.example.com/")
newresponse = self.mw.process_response(request, response, self.spider)
assert isinstance(newresponse, HtmlResponse)
self.assertEqual(newresponse.body, plainbody)
self.assertEqual(newresponse.encoding, resolve_encoding("gb2312"))
self.assertStatsEqual("httpcompression/response_count", 1)
self.assertStatsEqual("httpcompression/response_bytes", len(plainbody))
def test_process_response_force_recalculate_encoding(self):
headers = {
"Content-Type": "text/html",
"Content-Encoding": "gzip",
}
f = BytesIO()
plainbody = (
b"<html><head><title>Some page</title>"
b'<meta http-equiv="Content-Type" content="text/html; charset=gb2312">'
)
zf = GzipFile(fileobj=f, mode="wb")
zf.write(plainbody)
zf.close()
response = HtmlResponse(
"http;//www.example.com/page.html", headers=headers, body=f.getvalue()
)
request = Request("http://www.example.com/")
newresponse = self.mw.process_response(request, response, self.spider)
assert isinstance(newresponse, HtmlResponse)
self.assertEqual(newresponse.body, plainbody)
self.assertEqual(newresponse.encoding, resolve_encoding("gb2312"))
self.assertStatsEqual("httpcompression/response_count", 1)
self.assertStatsEqual("httpcompression/response_bytes", len(plainbody))
def test_process_response_no_content_type_header(self):
headers = {
"Content-Encoding": "identity",
}
plainbody = (
b"<html><head><title>Some page</title>"
b'<meta http-equiv="Content-Type" content="text/html; charset=gb2312">'
)
respcls = responsetypes.from_args(
url="http://www.example.com/index", headers=headers, body=plainbody
)
response = respcls(
"http://www.example.com/index", headers=headers, body=plainbody
)
request = Request("http://www.example.com/index")
newresponse = self.mw.process_response(request, response, self.spider)
assert isinstance(newresponse, respcls)
self.assertEqual(newresponse.body, plainbody)
self.assertEqual(newresponse.encoding, resolve_encoding("gb2312"))
self.assertStatsEqual("httpcompression/response_count", 1)
self.assertStatsEqual("httpcompression/response_bytes", len(plainbody))
def test_process_response_gzipped_contenttype(self):
response = self._getresponse("gzip")
response.headers["Content-Type"] = "application/gzip"
request = response.request
newresponse = self.mw.process_response(request, response, self.spider)
self.assertIsNot(newresponse, response)
self.assertTrue(newresponse.body.startswith(b"<!DOCTYPE"))
self.assertNotIn("Content-Encoding", newresponse.headers)
self.assertStatsEqual("httpcompression/response_count", 1)
self.assertStatsEqual("httpcompression/response_bytes", 74837)
def test_process_response_gzip_app_octetstream_contenttype(self):
response = self._getresponse("gzip")
response.headers["Content-Type"] = "application/octet-stream"
request = response.request
newresponse = self.mw.process_response(request, response, self.spider)
self.assertIsNot(newresponse, response)
self.assertTrue(newresponse.body.startswith(b"<!DOCTYPE"))
self.assertNotIn("Content-Encoding", newresponse.headers)
self.assertStatsEqual("httpcompression/response_count", 1)
self.assertStatsEqual("httpcompression/response_bytes", 74837)
def test_process_response_gzip_binary_octetstream_contenttype(self):
response = self._getresponse("x-gzip")
response.headers["Content-Type"] = "binary/octet-stream"
request = response.request
newresponse = self.mw.process_response(request, response, self.spider)
self.assertIsNot(newresponse, response)
self.assertTrue(newresponse.body.startswith(b"<!DOCTYPE"))
self.assertNotIn("Content-Encoding", newresponse.headers)
self.assertStatsEqual("httpcompression/response_count", 1)
self.assertStatsEqual("httpcompression/response_bytes", 74837)
def test_process_response_gzipped_gzip_file(self):
"""Test that a gzip Content-Encoded .gz file is gunzipped
only once by the middleware, leaving gunzipping of the file
to upper layers.
"""
headers = {
"Content-Type": "application/gzip",
"Content-Encoding": "gzip",
}
# build a gzipped file (here, a sitemap)
f = BytesIO()
plainbody = b"""<?xml version="1.0" encoding="UTF-8"?>
<urlset xmlns="http://www.google.com/schemas/sitemap/0.84">
<url>
<loc>http://www.example.com/</loc>
<lastmod>2009-08-16</lastmod>
<changefreq>daily</changefreq>
<priority>1</priority>
</url>
<url>
<loc>http://www.example.com/Special-Offers.html</loc>
<lastmod>2009-08-16</lastmod>
<changefreq>weekly</changefreq>
<priority>0.8</priority>
</url>
</urlset>"""
gz_file = GzipFile(fileobj=f, mode="wb")
gz_file.write(plainbody)
gz_file.close()
# build a gzipped response body containing this gzipped file
r = BytesIO()
gz_resp = GzipFile(fileobj=r, mode="wb")
gz_resp.write(f.getvalue())
gz_resp.close()
response = Response(
"http;//www.example.com/", headers=headers, body=r.getvalue()
)
request = Request("http://www.example.com/")
newresponse = self.mw.process_response(request, response, self.spider)
self.assertEqual(gunzip(newresponse.body), plainbody)
self.assertStatsEqual("httpcompression/response_count", 1)
self.assertStatsEqual("httpcompression/response_bytes", 230)
def test_process_response_head_request_no_decode_required(self):
response = self._getresponse("gzip")
response.headers["Content-Type"] = "application/gzip"
request = response.request
request.method = "HEAD"
response = response.replace(body=None)
newresponse = self.mw.process_response(request, response, self.spider)
self.assertIs(newresponse, response)
self.assertEqual(response.body, b"")
self.assertStatsEqual("httpcompression/response_count", None)
self.assertStatsEqual("httpcompression/response_bytes", None)
def _test_compression_bomb_setting(self, compression_id):
settings = {"DOWNLOAD_MAXSIZE": 10_000_000}
crawler = get_crawler(Spider, settings_dict=settings)
spider = crawler._create_spider("scrapytest.org")
mw = HttpCompressionMiddleware.from_crawler(crawler)
mw.open_spider(spider)
response = self._getresponse(f"bomb-{compression_id}")
self.assertRaises(
IgnoreRequest,
mw.process_response,
response.request,
response,
spider,
)
def test_compression_bomb_setting_br(self):
try:
try:
import brotli # noqa: F401
except ImportError:
import brotlicffi # noqa: F401
except ImportError:
raise SkipTest("no brotli")
self._test_compression_bomb_setting("br")
def test_compression_bomb_setting_deflate(self):
self._test_compression_bomb_setting("deflate")
def test_compression_bomb_setting_gzip(self):
self._test_compression_bomb_setting("gzip")
def test_compression_bomb_setting_zstd(self):
try:
import zstandard # noqa: F401
except ImportError:
raise SkipTest("no zstd support (zstandard)")
self._test_compression_bomb_setting("zstd")
def _test_compression_bomb_spider_attr(self, compression_id):
class DownloadMaxSizeSpider(Spider):
download_maxsize = 10_000_000
crawler = get_crawler(DownloadMaxSizeSpider)
spider = crawler._create_spider("scrapytest.org")
mw = HttpCompressionMiddleware.from_crawler(crawler)
mw.open_spider(spider)
response = self._getresponse(f"bomb-{compression_id}")
self.assertRaises(
IgnoreRequest,
mw.process_response,
response.request,
response,
spider,
)
def test_compression_bomb_spider_attr_br(self):
try:
try:
import brotli # noqa: F401
except ImportError:
import brotlicffi # noqa: F401
except ImportError:
raise SkipTest("no brotli")
self._test_compression_bomb_spider_attr("br")
def test_compression_bomb_spider_attr_deflate(self):
self._test_compression_bomb_spider_attr("deflate")
def test_compression_bomb_spider_attr_gzip(self):
self._test_compression_bomb_spider_attr("gzip")
def test_compression_bomb_spider_attr_zstd(self):
try:
import zstandard # noqa: F401
except ImportError:
raise SkipTest("no zstd support (zstandard)")
self._test_compression_bomb_spider_attr("zstd")
def _test_compression_bomb_request_meta(self, compression_id):
crawler = get_crawler(Spider)
spider = crawler._create_spider("scrapytest.org")
mw = HttpCompressionMiddleware.from_crawler(crawler)
mw.open_spider(spider)
response = self._getresponse(f"bomb-{compression_id}")
response.meta["download_maxsize"] = 10_000_000
self.assertRaises(
IgnoreRequest,
mw.process_response,
response.request,
response,
spider,
)
def test_compression_bomb_request_meta_br(self):
try:
try:
import brotli # noqa: F401
except ImportError:
import brotlicffi # noqa: F401
except ImportError:
raise SkipTest("no brotli")
self._test_compression_bomb_request_meta("br")
def test_compression_bomb_request_meta_deflate(self):
self._test_compression_bomb_request_meta("deflate")
def test_compression_bomb_request_meta_gzip(self):
self._test_compression_bomb_request_meta("gzip")
def test_compression_bomb_request_meta_zstd(self):
try:
import zstandard # noqa: F401
except ImportError:
raise SkipTest("no zstd support (zstandard)")
self._test_compression_bomb_request_meta("zstd")
def _test_download_warnsize_setting(self, compression_id):
settings = {"DOWNLOAD_WARNSIZE": 10_000_000}
crawler = get_crawler(Spider, settings_dict=settings)
spider = crawler._create_spider("scrapytest.org")
mw = HttpCompressionMiddleware.from_crawler(crawler)
mw.open_spider(spider)
response = self._getresponse(f"bomb-{compression_id}")
with LogCapture(
"scrapy.downloadermiddlewares.httpcompression",
propagate=False,
level=WARNING,
) as log:
mw.process_response(response.request, response, spider)
log.check(
(
"scrapy.downloadermiddlewares.httpcompression",
"WARNING",
(
"<200 http://scrapytest.org/> body size after "
"decompression (11511612 B) is larger than the download "
"warning size (10000000 B)."
),
),
)
def test_download_warnsize_setting_br(self):
try:
try:
import brotli # noqa: F401
except ImportError:
import brotlicffi # noqa: F401
except ImportError:
raise SkipTest("no brotli")
self._test_download_warnsize_setting("br")
def test_download_warnsize_setting_deflate(self):
self._test_download_warnsize_setting("deflate")
def test_download_warnsize_setting_gzip(self):
self._test_download_warnsize_setting("gzip")
def test_download_warnsize_setting_zstd(self):
try:
import zstandard # noqa: F401
except ImportError:
raise SkipTest("no zstd support (zstandard)")
self._test_download_warnsize_setting("zstd")
def _test_download_warnsize_spider_attr(self, compression_id):
class DownloadWarnSizeSpider(Spider):
download_warnsize = 10_000_000
crawler = get_crawler(DownloadWarnSizeSpider)
spider = crawler._create_spider("scrapytest.org")
mw = HttpCompressionMiddleware.from_crawler(crawler)
mw.open_spider(spider)
response = self._getresponse(f"bomb-{compression_id}")
with LogCapture(
"scrapy.downloadermiddlewares.httpcompression",
propagate=False,
level=WARNING,
) as log:
mw.process_response(response.request, response, spider)
log.check(
(
"scrapy.downloadermiddlewares.httpcompression",
"WARNING",
(
"<200 http://scrapytest.org/> body size after "
"decompression (11511612 B) is larger than the download "
"warning size (10000000 B)."
),
),
)
def test_download_warnsize_spider_attr_br(self):
try:
try:
import brotli # noqa: F401
except ImportError:
import brotlicffi # noqa: F401
except ImportError:
raise SkipTest("no brotli")
self._test_download_warnsize_spider_attr("br")
def test_download_warnsize_spider_attr_deflate(self):
self._test_download_warnsize_spider_attr("deflate")
def test_download_warnsize_spider_attr_gzip(self):
self._test_download_warnsize_spider_attr("gzip")
def test_download_warnsize_spider_attr_zstd(self):
try:
import zstandard # noqa: F401
except ImportError:
raise SkipTest("no zstd support (zstandard)")
self._test_download_warnsize_spider_attr("zstd")
def _test_download_warnsize_request_meta(self, compression_id):
crawler = get_crawler(Spider)
spider = crawler._create_spider("scrapytest.org")
mw = HttpCompressionMiddleware.from_crawler(crawler)
mw.open_spider(spider)
response = self._getresponse(f"bomb-{compression_id}")
response.meta["download_warnsize"] = 10_000_000
with LogCapture(
"scrapy.downloadermiddlewares.httpcompression",
propagate=False,
level=WARNING,
) as log:
mw.process_response(response.request, response, spider)
log.check(
(
"scrapy.downloadermiddlewares.httpcompression",
"WARNING",
(
"<200 http://scrapytest.org/> body size after "
"decompression (11511612 B) is larger than the download "
"warning size (10000000 B)."
),
),
)
def test_download_warnsize_request_meta_br(self):
try:
try:
import brotli # noqa: F401
except ImportError:
import brotlicffi # noqa: F401
except ImportError:
raise SkipTest("no brotli")
self._test_download_warnsize_request_meta("br")
def test_download_warnsize_request_meta_deflate(self):
self._test_download_warnsize_request_meta("deflate")
def test_download_warnsize_request_meta_gzip(self):
self._test_download_warnsize_request_meta("gzip")
def test_download_warnsize_request_meta_zstd(self):
try:
import zstandard # noqa: F401
except ImportError:
raise SkipTest("no zstd support (zstandard)")
self._test_download_warnsize_request_meta("zstd")