1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-06 11:00:46 +00:00

Bump ruff, switch from black to ruff-format (#6631)

This commit is contained in:
Andrey Rakhmatullin 2025-01-27 14:07:09 +04:00 committed by GitHub
parent c03fb2abb8
commit cec0aeca58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 151 additions and 172 deletions

View File

@ -1,13 +1,10 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.8.4
rev: v0.9.3
hooks:
- id: ruff
args: [ --fix ]
- repo: https://github.com/psf/black.git
rev: 24.10.0
hooks:
- id: black
- id: ruff-format
- repo: https://github.com/adamchainz/blacken-docs
rev: 1.19.1
hooks:

View File

@ -188,9 +188,9 @@ class Command(ScrapyCommand):
return True
return False
assert (
self.crawler_process is not None
), "crawler_process must be set before calling run"
assert self.crawler_process is not None, (
"crawler_process must be set before calling run"
)
try:
spidercls = self.crawler_process.spider_loader.load(name)

View File

@ -34,13 +34,12 @@ class DownloadHandlerProtocol(Protocol):
class DownloadHandlers:
def __init__(self, crawler: Crawler):
self._crawler: Crawler = crawler
self._schemes: dict[str, str | Callable[..., Any]] = (
{}
) # stores acceptable schemes on instancing
self._handlers: dict[str, DownloadHandlerProtocol] = (
{}
) # stores instanced handlers for schemes
self._notconfigured: dict[str, str] = {} # remembers failed handlers
# stores acceptable schemes on instancing
self._schemes: dict[str, str | Callable[..., Any]] = {}
# stores instanced handlers for schemes
self._handlers: dict[str, DownloadHandlerProtocol] = {}
# remembers failed handlers
self._notconfigured: dict[str, str] = {}
handlers: dict[str, str | Callable[..., Any]] = without_none_values(
cast(
"dict[str, str | Callable[..., Any]]",

View File

@ -193,7 +193,7 @@ class Stream:
url.netloc == str(self._protocol.metadata["uri"].host, "utf-8")
or url.netloc == str(self._protocol.metadata["uri"].netloc, "utf-8")
or url.netloc
== f'{self._protocol.metadata["ip_address"]}:{self._protocol.metadata["uri"].port}'
== f"{self._protocol.metadata['ip_address']}:{self._protocol.metadata['uri'].port}"
)
def _get_request_headers(self) -> list[tuple[str, str]]:
@ -339,7 +339,7 @@ class Stream:
if self._log_warnsize:
self.metadata["reached_warnsize"] = True
warning_msg = (
f'Received more ({self._response["flow_controlled_size"]}) bytes than download '
f"Received more ({self._response['flow_controlled_size']}) bytes than download "
f"warn size ({self._download_warnsize}) in request {self._request}"
)
logger.warning(warning_msg)
@ -445,7 +445,7 @@ class Stream:
ResponseFailed(
[
Failure(
f'Remote peer {self._protocol.metadata["ip_address"]} sent RST_STREAM',
f"Remote peer {self._protocol.metadata['ip_address']} sent RST_STREAM",
ProtocolError,
)
]
@ -465,7 +465,7 @@ class Stream:
InvalidHostname(
self._request,
str(self._protocol.metadata["uri"].host, "utf-8"),
f'{self._protocol.metadata["ip_address"]}:{self._protocol.metadata["uri"].port}',
f"{self._protocol.metadata['ip_address']}:{self._protocol.metadata['uri'].port}",
)
)

View File

@ -54,8 +54,7 @@ class CookiesMiddleware:
) -> None:
for cookie in cookies:
cookie_domain = cookie.domain
if cookie_domain.startswith("."):
cookie_domain = cookie_domain[1:]
cookie_domain = cookie_domain.removeprefix(".")
hostname = urlparse_cached(request).hostname
assert hostname is not None

View File

@ -89,5 +89,5 @@ class OffsiteMiddleware:
warnings.warn(message)
else:
domains.append(re.escape(domain))
regex = rf'^(.*\.)?({"|".join(domains)})$'
regex = rf"^(.*\.)?({'|'.join(domains)})$"
return re.compile(regex)

View File

@ -63,7 +63,9 @@ class RobotsTxtMiddleware:
if request.url.startswith("data:") or request.url.startswith("file:"):
return None
d: Deferred[RobotParser | None] = maybeDeferred(
self.robot_parser, request, spider # type: ignore[call-overload]
self.robot_parser,
request,
spider, # type: ignore[call-overload]
)
d2: Deferred[None] = d.addCallback(self.process_request_2, request, spider)
return d2

View File

@ -19,7 +19,7 @@ if TYPE_CHECKING:
def get_header_size(
headers: dict[str, list[str | bytes] | tuple[str | bytes, ...]]
headers: dict[str, list[str | bytes] | tuple[str | bytes, ...]],
) -> int:
size = 0
for key, value in headers.items():

View File

@ -84,9 +84,7 @@ class TelnetConsole(protocol.ServerFactory):
"""An implementation of IPortal"""
@defers
def login(
self_, credentials, mind, *interfaces
): # pylint: disable=no-self-argument
def login(self_, credentials, mind, *interfaces): # pylint: disable=no-self-argument
if not (
credentials.username == self.username.encode("utf8")
and credentials.checkPassword(self.password.encode("utf8"))

View File

@ -105,7 +105,8 @@ class Headers(CaselessDict):
def values(self) -> list[bytes | None]: # type: ignore[override]
return [
self[k] for k in self.keys() # pylint: disable=consider-using-dict-items
self[k]
for k in self.keys() # pylint: disable=consider-using-dict-items
]
def to_string(self) -> bytes:

View File

@ -24,7 +24,6 @@ from scrapy.http.request import Request
from scrapy.utils.python import is_listlike, to_bytes
if TYPE_CHECKING:
# typing.Self requires Python 3.11
from typing_extensions import Self

View File

@ -94,8 +94,7 @@ class Response(object_ref):
return self.request.meta # type: ignore[union-attr]
except AttributeError:
raise AttributeError(
"Response.meta not available, this response "
"is not tied to any request"
"Response.meta not available, this response is not tied to any request"
)
@property

View File

@ -25,7 +25,6 @@ from scrapy.utils.response import get_base_url
from scrapy.utils.url import url_has_any_extension, url_is_from_any_domain
if TYPE_CHECKING:
from lxml.html import HtmlElement
from scrapy import Selector

View File

@ -202,7 +202,9 @@ class S3FilesStore:
return cast(
"Deferred[dict[str, Any]]",
deferToThread(
self.s3_client.head_object, Bucket=self.bucket, Key=key_name # type: ignore[attr-defined]
self.s3_client.head_object, # type: ignore[attr-defined]
Bucket=self.bucket,
Key=key_name,
),
)

View File

@ -81,8 +81,7 @@ class Selector(_ParselSelector, object_ref):
):
if response is not None and text is not None:
raise ValueError(
f"{self.__class__.__name__}.__init__() received "
"both response and text"
f"{self.__class__.__name__}.__init__() received both response and text"
)
st = _st(response, type)

View File

@ -539,7 +539,7 @@ def iter_default_settings() -> Iterable[tuple[str, Any]]:
def overridden_settings(
settings: Mapping[_SettingsKeyT, Any]
settings: Mapping[_SettingsKeyT, Any],
) -> Iterable[tuple[str, Any]]:
"""Return an iterable of the settings that have been overridden"""
for name, defvalue in iter_default_settings():

View File

@ -333,7 +333,7 @@ TEMPLATES_DIR = str((Path(__file__).parent / ".." / "templates").resolve())
URLLENGTH_LIMIT = 2083
USER_AGENT = f'Scrapy/{import_module("scrapy").__version__} (+https://scrapy.org)'
USER_AGENT = f"Scrapy/{import_module('scrapy').__version__} (+https://scrapy.org)"
TELNETCONSOLE_ENABLED = 1
TELNETCONSOLE_PORT = [6023, 6073]

View File

@ -110,7 +110,7 @@ class OffsiteMiddleware:
warnings.warn(message, PortWarning)
else:
domains.append(re.escape(domain))
regex = rf'^(.*\.)?({"|".join(domains)})$'
regex = rf"^(.*\.)?({'|'.join(domains)})$"
return re.compile(regex)
def spider_opened(self, spider: Spider) -> None:

View File

@ -147,16 +147,24 @@ def _pickle_serialize(obj: Any) -> bytes:
# queue.*Queue aren't subclasses of queue.BaseQueue
_PickleFifoSerializationDiskQueue = _serializable_queue(
_with_mkdir(queue.FifoDiskQueue), _pickle_serialize, pickle.loads # type: ignore[arg-type]
_with_mkdir(queue.FifoDiskQueue), # type: ignore[arg-type]
_pickle_serialize,
pickle.loads,
)
_PickleLifoSerializationDiskQueue = _serializable_queue(
_with_mkdir(queue.LifoDiskQueue), _pickle_serialize, pickle.loads # type: ignore[arg-type]
_with_mkdir(queue.LifoDiskQueue), # type: ignore[arg-type]
_pickle_serialize,
pickle.loads,
)
_MarshalFifoSerializationDiskQueue = _serializable_queue(
_with_mkdir(queue.FifoDiskQueue), marshal.dumps, marshal.loads # type: ignore[arg-type]
_with_mkdir(queue.FifoDiskQueue), # type: ignore[arg-type]
marshal.dumps,
marshal.loads,
)
_MarshalLifoSerializationDiskQueue = _serializable_queue(
_with_mkdir(queue.LifoDiskQueue), marshal.dumps, marshal.loads # type: ignore[arg-type]
_with_mkdir(queue.LifoDiskQueue), # type: ignore[arg-type]
marshal.dumps,
marshal.loads,
)
# public queue classes

View File

@ -22,8 +22,7 @@ class DataAction(argparse.Action):
option_string: str | None = None,
) -> None:
value = str(values)
if value.startswith("$"):
value = value[1:]
value = value.removeprefix("$")
setattr(namespace, self.dest, value)
@ -96,7 +95,7 @@ def curl_to_request_kwargs(
parsed_args, argv = curl_parser.parse_known_args(curl_args[1:])
if argv:
msg = f'Unrecognized options: {", ".join(argv)}'
msg = f"Unrecognized options: {', '.join(argv)}"
if ignore_unknown_options:
warnings.warn(msg)
else:

View File

@ -377,7 +377,7 @@ def deferred_from_coro(o: _T) -> Deferred | _T:
def deferred_f_from_coro_f(
coro_f: Callable[_P, Coroutine[Any, Any, _T]]
coro_f: Callable[_P, Coroutine[Any, Any, _T]],
) -> Callable[_P, Deferred[_T]]:
"""Converts a coroutine function into a function that returns a Deferred.

View File

@ -71,7 +71,7 @@ def xmliter(obj: Response | str | bytes, nodename: str) -> Iterator[Selector]:
nodetext = (
document_header
+ match.group().replace(
nodename, f'{nodename} {" ".join(namespaces.values())}', 1
nodename, f"{nodename} {' '.join(namespaces.values())}", 1
)
+ header_end
)

View File

@ -16,7 +16,6 @@ from scrapy.settings import Settings, _SettingsKeyT
from scrapy.utils.versions import get_versions
if TYPE_CHECKING:
from scrapy.crawler import Crawler
from scrapy.logformatter import LogFormatterResult

View File

@ -119,8 +119,7 @@ def to_unicode(
return text
if not isinstance(text, (bytes, str)):
raise TypeError(
"to_unicode must receive a bytes or str "
f"object, got {type(text).__name__}"
f"to_unicode must receive a bytes or str object, got {type(text).__name__}"
)
if encoding is None:
encoding = "utf-8"
@ -183,7 +182,7 @@ _SelfT = TypeVar("_SelfT")
def memoizemethod_noargs(
method: Callable[Concatenate[_SelfT, _P], _T]
method: Callable[Concatenate[_SelfT, _P], _T],
) -> Callable[Concatenate[_SelfT, _P], _T]:
"""Decorator to cache the result of a method (without arguments) using a
weak reference to its object
@ -313,7 +312,7 @@ def without_none_values(iterable: Iterable[_KT]) -> Iterable[_KT]: ...
def without_none_values(
iterable: Mapping[_KT, _VT] | Iterable[_KT]
iterable: Mapping[_KT, _VT] | Iterable[_KT],
) -> dict[_KT, _VT] | Iterable[_KT]:
"""Return a copy of ``iterable`` with all ``None`` entries removed.

View File

@ -338,9 +338,9 @@ class BrokenStartRequestsSpider(FollowAllSpider):
if self.fail_yielding:
2 / 0
assert (
self.seedsseen
), "All start requests consumed before any download happened"
assert self.seedsseen, (
"All start requests consumed before any download happened"
)
def parse(self, response):
self.seedsseen.append(response.meta.get("seed"))

View File

@ -529,7 +529,7 @@ class ContractsManagerTest(unittest.TestCase):
return TestItem()
with MockServer() as mockserver:
contract_doc = f'@url {mockserver.url("/status?n=200")}'
contract_doc = f"@url {mockserver.url('/status?n=200')}"
TestSameUrlSpider.parse_first.__doc__ = contract_doc
TestSameUrlSpider.parse_second.__doc__ = contract_doc
@ -567,7 +567,6 @@ class CustomFailContractPostProcess(Contract):
class CustomContractPrePostProcess(unittest.TestCase):
def setUp(self):
self.results = TextTestResult(stream=None, descriptions=False, verbosity=0)

View File

@ -94,7 +94,7 @@ class TestHttpProxyMiddleware(TestCase):
def test_proxy_auth_encoding(self):
# utf-8 encoding
os.environ["http_proxy"] = "https://m\u00E1n:pass@proxy:3128"
os.environ["http_proxy"] = "https://m\u00e1n:pass@proxy:3128"
mw = HttpProxyMiddleware(auth_encoding="utf-8")
req = Request("http://scrapytest.org")
assert mw.process_request(req, spider) is None
@ -103,7 +103,7 @@ class TestHttpProxyMiddleware(TestCase):
# proxy from request.meta
req = Request(
"http://scrapytest.org", meta={"proxy": "https://\u00FCser:pass@proxy:3128"}
"http://scrapytest.org", meta={"proxy": "https://\u00fcser:pass@proxy:3128"}
)
assert mw.process_request(req, spider) is None
self.assertEqual(req.meta["proxy"], "https://proxy:3128")
@ -120,7 +120,7 @@ class TestHttpProxyMiddleware(TestCase):
# proxy from request.meta, latin-1 encoding
req = Request(
"http://scrapytest.org", meta={"proxy": "https://\u00FCser:pass@proxy:3128"}
"http://scrapytest.org", meta={"proxy": "https://\u00fcser:pass@proxy:3128"}
)
assert mw.process_request(req, spider) is None
self.assertEqual(req.meta["proxy"], "https://proxy:3128")

View File

@ -55,12 +55,12 @@ class Base:
assert isinstance(req2, Request)
self.assertEqual(req2.url, url2)
self.assertEqual(req2.method, "GET")
assert (
"Content-Type" not in req2.headers
), "Content-Type header must not be present in redirected request"
assert (
"Content-Length" not in req2.headers
), "Content-Length header must not be present in redirected request"
assert "Content-Type" not in req2.headers, (
"Content-Type header must not be present in redirected request"
)
assert "Content-Length" not in req2.headers, (
"Content-Length header must not be present in redirected request"
)
assert not req2.body, f"Redirected body must be empty, not '{req2.body}'"
def test_max_redirect_times(self):
@ -1243,12 +1243,12 @@ class MetaRefreshMiddlewareTest(Base.Test):
assert isinstance(req2, Request)
self.assertEqual(req2.url, "http://example.org/newpage")
self.assertEqual(req2.method, "GET")
assert (
"Content-Type" not in req2.headers
), "Content-Type header must not be present in redirected request"
assert (
"Content-Length" not in req2.headers
), "Content-Length header must not be present in redirected request"
assert "Content-Type" not in req2.headers, (
"Content-Type header must not be present in redirected request"
)
assert "Content-Length" not in req2.headers, (
"Content-Length header must not be present in redirected request"
)
assert not req2.body, f"Redirected body must be empty, not '{req2.body}'"
def test_ignore_tags_default(self):

View File

@ -93,6 +93,6 @@ def test_params():
_, actual = downloader._get_slot(request, spider=None)
expected = Slot(**params)
for param in params:
assert getattr(expected, param) == getattr(
actual, param
), f"Slot.{param}: {getattr(expected, param)!r} != {getattr(actual, param)!r}"
assert getattr(expected, param) == getattr(actual, param), (
f"Slot.{param}: {getattr(expected, param)!r} != {getattr(actual, param)!r}"
)

View File

@ -294,9 +294,9 @@ class EngineTest(unittest.TestCase):
]
urls_visited = {rp[0].url for rp in run.respplug}
urls_expected = {run.geturl(p) for p in must_be_visited}
assert (
urls_expected <= urls_visited
), f"URLs not visited: {list(urls_expected - urls_visited)}"
assert urls_expected <= urls_visited, (
f"URLs not visited: {list(urls_expected - urls_visited)}"
)
def _assert_scheduled_requests(self, run: CrawlerRun, count=None):
self.assertEqual(count, len(run.reqplug))
@ -496,9 +496,9 @@ def test_request_scheduled_signal(caplog):
drop_request = Request("https://drop.example")
caplog.set_level(DEBUG)
engine._schedule_request(drop_request, spider)
assert scheduler.enqueued == [
keep_request
], f"{scheduler.enqueued!r} != [{keep_request!r}]"
assert scheduler.enqueued == [keep_request], (
f"{scheduler.enqueued!r} != [{keep_request!r}]"
)
crawler.signals.disconnect(signal_handler, request_scheduled)

View File

@ -67,6 +67,6 @@ class HeadersReceivedEngineTest(EngineTest):
must_be_visited = ["/", "/redirect", "/redirected"]
urls_visited = {rp[0].url for rp in run.respplug}
urls_expected = {run.geturl(p) for p in must_be_visited}
assert (
urls_expected <= urls_visited
), f"URLs not visited: {list(urls_expected - urls_visited)}"
assert urls_expected <= urls_visited, (
f"URLs not visited: {list(urls_expected - urls_visited)}"
)

View File

@ -390,14 +390,14 @@ class CsvItemExporterTest(BaseItemExporterTest):
def test_errors_default(self):
with self.assertRaises(UnicodeEncodeError):
self.assertExportResult(
item={"text": "W\u0275\u200Brd"},
item={"text": "W\u0275\u200brd"},
expected=None,
encoding="windows-1251",
)
def test_errors_xmlcharrefreplace(self):
self.assertExportResult(
item={"text": "W\u0275\u200Brd"},
item={"text": "W\u0275\u200brd"},
include_headers_line=False,
expected="W&#629;&#8203;rd\r\n",
encoding="windows-1251",

View File

@ -1190,8 +1190,7 @@ class FeedExportTest(FeedExportTestBase):
"csv": b"baz,egg,foo\r\n,spam1,bar1\r\n",
"json": b'[\n{"hello": "world2", "foo": "bar2"}\n]',
"jsonlines": (
b'{"foo": "bar1", "egg": "spam1"}\n'
b'{"hello": "world2", "foo": "bar2"}\n'
b'{"foo": "bar1", "egg": "spam1"}\n{"hello": "world2", "foo": "bar2"}\n'
),
"xml": (
b'<?xml version="1.0" encoding="utf-8"?>\n<items>\n<item>'
@ -2289,9 +2288,9 @@ class BatchDeliveriesTest(FeedExportTestBase):
settings.update(
{
"FEEDS": {
self._random_temp_filename()
/ "jl"
/ self._file_mark: {"format": "jl"},
self._random_temp_filename() / "jl" / self._file_mark: {
"format": "jl"
},
},
}
)
@ -2311,9 +2310,9 @@ class BatchDeliveriesTest(FeedExportTestBase):
settings.update(
{
"FEEDS": {
self._random_temp_filename()
/ "csv"
/ self._file_mark: {"format": "csv"},
self._random_temp_filename() / "csv" / self._file_mark: {
"format": "csv"
},
},
}
)
@ -2331,9 +2330,9 @@ class BatchDeliveriesTest(FeedExportTestBase):
settings.update(
{
"FEEDS": {
self._random_temp_filename()
/ "xml"
/ self._file_mark: {"format": "xml"},
self._random_temp_filename() / "xml" / self._file_mark: {
"format": "xml"
},
},
}
)
@ -2352,12 +2351,12 @@ class BatchDeliveriesTest(FeedExportTestBase):
settings.update(
{
"FEEDS": {
self._random_temp_filename()
/ "xml"
/ self._file_mark: {"format": "xml"},
self._random_temp_filename()
/ "json"
/ self._file_mark: {"format": "json"},
self._random_temp_filename() / "xml" / self._file_mark: {
"format": "xml"
},
self._random_temp_filename() / "json" / self._file_mark: {
"format": "json"
},
},
}
)
@ -2384,9 +2383,9 @@ class BatchDeliveriesTest(FeedExportTestBase):
settings.update(
{
"FEEDS": {
self._random_temp_filename()
/ "pickle"
/ self._file_mark: {"format": "pickle"},
self._random_temp_filename() / "pickle" / self._file_mark: {
"format": "pickle"
},
},
}
)
@ -2406,9 +2405,9 @@ class BatchDeliveriesTest(FeedExportTestBase):
settings.update(
{
"FEEDS": {
self._random_temp_filename()
/ "marshal"
/ self._file_mark: {"format": "marshal"},
self._random_temp_filename() / "marshal" / self._file_mark: {
"format": "marshal"
},
},
}
)
@ -2455,9 +2454,9 @@ class BatchDeliveriesTest(FeedExportTestBase):
for fmt in ("json", "jsonlines", "xml", "csv"):
settings = {
"FEEDS": {
self._random_temp_filename()
/ fmt
/ self._file_mark: {"format": fmt},
self._random_temp_filename() / fmt / self._file_mark: {
"format": fmt
},
},
"FEED_EXPORT_BATCH_ITEM_COUNT": 1,
"FEED_STORE_EMPTY": False,
@ -2478,9 +2477,9 @@ class BatchDeliveriesTest(FeedExportTestBase):
for fmt, expctd in formats:
settings = {
"FEEDS": {
self._random_temp_filename()
/ fmt
/ self._file_mark: {"format": fmt},
self._random_temp_filename() / fmt / self._file_mark: {
"format": fmt
},
},
"FEED_STORE_EMPTY": True,
"FEED_EXPORT_INDENT": None,
@ -2520,25 +2519,19 @@ class BatchDeliveriesTest(FeedExportTestBase):
settings = {
"FEEDS": {
self._random_temp_filename()
/ "json"
/ self._file_mark: {
self._random_temp_filename() / "json" / self._file_mark: {
"format": "json",
"indent": 0,
"fields": ["bar"],
"encoding": "utf-8",
},
self._random_temp_filename()
/ "xml"
/ self._file_mark: {
self._random_temp_filename() / "xml" / self._file_mark: {
"format": "xml",
"indent": 2,
"fields": ["foo"],
"encoding": "latin-1",
},
self._random_temp_filename()
/ "csv"
/ self._file_mark: {
self._random_temp_filename() / "csv" / self._file_mark: {
"format": "csv",
"indent": None,
"fields": ["foo", "bar"],
@ -2563,9 +2556,7 @@ class BatchDeliveriesTest(FeedExportTestBase):
}
settings = {
"FEEDS": {
self._random_temp_filename()
/ "json"
/ self._file_mark: {
self._random_temp_filename() / "json" / self._file_mark: {
"format": "json",
"indent": None,
"encoding": "utf-8",
@ -2591,8 +2582,7 @@ class BatchDeliveriesTest(FeedExportTestBase):
]
settings = {
"FEEDS": {
self._random_temp_filename()
/ "%(batch_id)d": {
self._random_temp_filename() / "%(batch_id)d": {
"format": "json",
},
},

View File

@ -226,9 +226,9 @@ class RequestTest(unittest.TestCase):
self.assertEqual(r1.flags, r2.flags)
# make sure cb_kwargs dict is shallow copied
assert (
r1.cb_kwargs is not r2.cb_kwargs
), "cb_kwargs must be a shallow copy, not identical"
assert r1.cb_kwargs is not r2.cb_kwargs, (
"cb_kwargs must be a shallow copy, not identical"
)
self.assertEqual(r1.cb_kwargs, r2.cb_kwargs)
# make sure meta dict is shallow copied
@ -236,9 +236,9 @@ class RequestTest(unittest.TestCase):
self.assertEqual(r1.meta, r2.meta)
# make sure headers attribute is shallow copied
assert (
r1.headers is not r2.headers
), "headers must be a shallow copy, not identical"
assert r1.headers is not r2.headers, (
"headers must be a shallow copy, not identical"
)
self.assertEqual(r1.headers, r2.headers)
self.assertEqual(r1.encoding, r2.encoding)
self.assertEqual(r1.dont_filter, r2.dont_filter)

View File

@ -99,9 +99,9 @@ class BaseResponseTest(unittest.TestCase):
self.assertEqual(r1.flags, r2.flags)
# make sure headers attribute is shallow copied
assert (
r1.headers is not r2.headers
), "headers must be a shallow copy, not identical"
assert r1.headers is not r2.headers, (
"headers must be a shallow copy, not identical"
)
self.assertEqual(r1.headers, r2.headers)
def test_copy_meta(self):

View File

@ -289,9 +289,7 @@ class ItemMetaTest(unittest.TestCase):
class ItemMetaClassCellRegression(unittest.TestCase):
def test_item_meta_classcell_regression(self):
class MyItem(Item, metaclass=ItemMeta):
def __init__(
self, *args, **kwargs
): # pylint: disable=useless-parent-delegation
def __init__(self, *args, **kwargs): # pylint: disable=useless-parent-delegation
# This call to super() trigger the __classcell__ propagation
# requirement. When not done properly raises an error:
# TypeError: __class__ set to <class '__main__.MyItem'>

View File

@ -215,7 +215,7 @@ class FilesPipelineTestCase(unittest.TestCase):
class CustomFilesPipeline(FilesPipeline):
def file_path(self, request, response=None, info=None, item=None):
return f'full/{item.get("path")}'
return f"full/{item.get('path')}"
file_path = CustomFilesPipeline.from_crawler(
get_crawler(None, {"FILES_STORE": self.tempdir})

View File

@ -35,7 +35,6 @@ def _mocked_download_func(request, info):
class UserDefinedPipeline(MediaPipeline):
def media_to_download(self, request, info, *, item=None):
pass
@ -376,7 +375,6 @@ class MediaPipelineTestCase(BaseMediaPipelineTestCase):
class MediaPipelineAllowRedirectSettingsTestCase(unittest.TestCase):
def _assert_request_no3xx(self, pipeline_class, settings):
pipe = pipeline_class(crawler=get_crawler(None, settings))
request = Request("http://url")
@ -403,11 +401,9 @@ class MediaPipelineAllowRedirectSettingsTestCase(unittest.TestCase):
self.assertNotIn(status, request.meta["handle_httpstatus_list"])
def test_subclass_standard_setting(self):
self._assert_request_no3xx(UserDefinedPipeline, {"MEDIA_ALLOW_REDIRECTS": True})
def test_subclass_specific_setting(self):
self._assert_request_no3xx(
UserDefinedPipeline, {"USERDEFINEDPIPELINE_MEDIA_ALLOW_REDIRECTS": True}
)

View File

@ -27,10 +27,7 @@ class BaseRobotParserTest:
def test_allowed(self):
robotstxt_robotstxt_body = (
b"User-agent: * \n"
b"Disallow: /disallowed \n"
b"Allow: /allowed \n"
b"Crawl-delay: 10"
b"User-agent: * \nDisallow: /disallowed \nAllow: /allowed \nCrawl-delay: 10"
)
rp = self.parser_cls.from_crawler(
crawler=None, robotstxt_body=robotstxt_robotstxt_body
@ -140,7 +137,7 @@ class DecodeRobotsTxtTest(unittest.TestCase):
self.assertEqual(decoded_content, "User-agent: *\nDisallow: /\n")
def test_decode_non_utf8(self):
robotstxt_body = b"User-agent: *\n\xFFDisallow: /\n"
robotstxt_body = b"User-agent: *\n\xffDisallow: /\n"
decoded_content = decode_robotstxt(robotstxt_body, spider=None)
self.assertEqual(decoded_content, "User-agent: *\nDisallow: /\n")

View File

@ -107,9 +107,9 @@ class SelectorTestCase(unittest.TestCase):
"""Check that classes are using slots and are weak-referenceable"""
x = Selector(text="")
weakref.ref(x)
assert not hasattr(
x, "__dict__"
), f"{x.__class__.__name__} does not use __slots__"
assert not hasattr(x, "__dict__"), (
f"{x.__class__.__name__} does not use __slots__"
)
def test_selector_bad_args(self):
with self.assertRaisesRegex(ValueError, "received both response and text"):

View File

@ -158,18 +158,18 @@ class ResponseUtilsTest(unittest.TestCase):
)
assert open_in_browser(r1, _openfunc=check_base_url), "Inject base url"
assert open_in_browser(
r2, _openfunc=check_base_url
), "Inject base url with argumented head"
assert open_in_browser(
r3, _openfunc=check_base_url
), "Inject unique base url with misleading tag"
assert open_in_browser(
r4, _openfunc=check_base_url
), "Inject unique base url with misleading comment"
assert open_in_browser(
r5, _openfunc=check_base_url
), "Inject unique base url with conditional comment"
assert open_in_browser(r2, _openfunc=check_base_url), (
"Inject base url with argumented head"
)
assert open_in_browser(r3, _openfunc=check_base_url), (
"Inject unique base url with misleading tag"
)
assert open_in_browser(r4, _openfunc=check_base_url), (
"Inject unique base url with misleading comment"
)
assert open_in_browser(r5, _openfunc=check_base_url), (
"Inject unique base url with conditional comment"
)
def test_open_in_browser_redos_comment(self):
MAX_CPU_TIME = 0.02
@ -240,6 +240,6 @@ class ResponseUtilsTest(unittest.TestCase):
),
)
def test_remove_html_comments(input_body, output_body):
assert (
_remove_html_comments(input_body) == output_body
), f"{_remove_html_comments(input_body)=} == {output_body=}"
assert _remove_html_comments(input_body) == output_body, (
f"{_remove_html_comments(input_body)=} == {output_body=}"
)

View File

@ -321,9 +321,9 @@ class GuessSchemeTest(unittest.TestCase):
def create_guess_scheme_t(args):
def do_expected(self):
url = guess_scheme(args[0])
assert url.startswith(
args[1]
), f"Wrong scheme guessed: for `{args[0]}` got `{url}`, expected `{args[1]}...`"
assert url.startswith(args[1]), (
f"Wrong scheme guessed: for `{args[0]}` got `{url}`, expected `{args[1]}...`"
)
return do_expected