mirror of
https://github.com/scrapy/scrapy.git
synced 2025-03-13 17:04:57 +00:00
Engine: deprecations and type hints (#5090)
This commit is contained in:
parent
5b78a64fca
commit
7e23677b52
@ -110,11 +110,10 @@ using the telnet console::
|
||||
Execution engine status
|
||||
|
||||
time()-engine.start_time : 8.62972998619
|
||||
engine.has_capacity() : False
|
||||
len(engine.downloader.active) : 16
|
||||
engine.scraper.is_idle() : False
|
||||
engine.spider.name : followall
|
||||
engine.spider_is_idle(engine.spider) : False
|
||||
engine.spider_is_idle() : False
|
||||
engine.slot.closing : False
|
||||
len(engine.slot.inprogress) : 16
|
||||
len(engine.slot.scheduler.dqs or []) : 0
|
||||
|
@ -1,51 +1,61 @@
|
||||
"""
|
||||
This is the Scrapy engine which controls the Scheduler, Downloader and Spiders.
|
||||
This is the Scrapy engine which controls the Scheduler, Downloader and Spider.
|
||||
|
||||
For more information see docs/topics/architecture.rst
|
||||
|
||||
"""
|
||||
import logging
|
||||
import warnings
|
||||
from time import time
|
||||
from typing import Callable, Iterable, Iterator, Optional, Set, Union
|
||||
|
||||
from twisted.internet import defer, task
|
||||
from twisted.internet.defer import Deferred, inlineCallbacks, succeed
|
||||
from twisted.internet.task import LoopingCall
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from scrapy import signals
|
||||
from scrapy.core.scraper import Scraper
|
||||
from scrapy.exceptions import DontCloseSpider
|
||||
from scrapy.exceptions import DontCloseSpider, ScrapyDeprecationWarning
|
||||
from scrapy.http import Response, Request
|
||||
from scrapy.utils.misc import load_object
|
||||
from scrapy.utils.reactor import CallLaterOnce
|
||||
from scrapy.spiders import Spider
|
||||
from scrapy.utils.log import logformatter_adapter, failure_to_exc_info
|
||||
from scrapy.utils.misc import create_instance, load_object
|
||||
from scrapy.utils.reactor import CallLaterOnce
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Slot:
|
||||
|
||||
def __init__(self, start_requests, close_if_idle, nextcall, scheduler):
|
||||
self.closing = False
|
||||
self.inprogress = set() # requests in progress
|
||||
self.start_requests = iter(start_requests)
|
||||
def __init__(
|
||||
self,
|
||||
start_requests: Iterable,
|
||||
close_if_idle: bool,
|
||||
nextcall: CallLaterOnce,
|
||||
scheduler,
|
||||
) -> None:
|
||||
self.closing: Optional[Deferred] = None
|
||||
self.inprogress: Set[Request] = set()
|
||||
self.start_requests: Optional[Iterator] = iter(start_requests)
|
||||
self.close_if_idle = close_if_idle
|
||||
self.nextcall = nextcall
|
||||
self.scheduler = scheduler
|
||||
self.heartbeat = task.LoopingCall(nextcall.schedule)
|
||||
self.heartbeat = LoopingCall(nextcall.schedule)
|
||||
|
||||
def add_request(self, request):
|
||||
def add_request(self, request: Request) -> None:
|
||||
self.inprogress.add(request)
|
||||
|
||||
def remove_request(self, request):
|
||||
def remove_request(self, request: Request) -> None:
|
||||
self.inprogress.remove(request)
|
||||
self._maybe_fire_closing()
|
||||
|
||||
def close(self):
|
||||
self.closing = defer.Deferred()
|
||||
def close(self) -> Deferred:
|
||||
self.closing = Deferred()
|
||||
self._maybe_fire_closing()
|
||||
return self.closing
|
||||
|
||||
def _maybe_fire_closing(self):
|
||||
if self.closing and not self.inprogress:
|
||||
def _maybe_fire_closing(self) -> None:
|
||||
if self.closing is not None and not self.inprogress:
|
||||
if self.nextcall:
|
||||
self.nextcall.cancel()
|
||||
if self.heartbeat.running:
|
||||
@ -54,210 +64,224 @@ class Slot:
|
||||
|
||||
|
||||
class ExecutionEngine:
|
||||
|
||||
def __init__(self, crawler, spider_closed_callback):
|
||||
def __init__(self, crawler, spider_closed_callback: Callable) -> None:
|
||||
self.crawler = crawler
|
||||
self.settings = crawler.settings
|
||||
self.signals = crawler.signals
|
||||
self.logformatter = crawler.logformatter
|
||||
self.slot = None
|
||||
self.spider = None
|
||||
self.slot: Optional[Slot] = None
|
||||
self.spider: Optional[Spider] = None
|
||||
self.running = False
|
||||
self.paused = False
|
||||
self.scheduler_cls = load_object(self.settings['SCHEDULER'])
|
||||
self.scheduler_cls = load_object(crawler.settings["SCHEDULER"])
|
||||
downloader_cls = load_object(self.settings['DOWNLOADER'])
|
||||
self.downloader = downloader_cls(crawler)
|
||||
self.scraper = Scraper(crawler)
|
||||
self._spider_closed_callback = spider_closed_callback
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def start(self):
|
||||
"""Start the execution engine"""
|
||||
@inlineCallbacks
|
||||
def start(self) -> Deferred:
|
||||
if self.running:
|
||||
raise RuntimeError("Engine already running")
|
||||
self.start_time = time()
|
||||
yield self.signals.send_catch_log_deferred(signal=signals.engine_started)
|
||||
self.running = True
|
||||
self._closewait = defer.Deferred()
|
||||
self._closewait = Deferred()
|
||||
yield self._closewait
|
||||
|
||||
def stop(self):
|
||||
"""Stop the execution engine gracefully"""
|
||||
def stop(self) -> Deferred:
|
||||
"""Gracefully stop the execution engine"""
|
||||
@inlineCallbacks
|
||||
def _finish_stopping_engine(_) -> Deferred:
|
||||
yield self.signals.send_catch_log_deferred(signal=signals.engine_stopped)
|
||||
self._closewait.callback(None)
|
||||
|
||||
if not self.running:
|
||||
raise RuntimeError("Engine not running")
|
||||
|
||||
self.running = False
|
||||
dfd = self._close_all_spiders()
|
||||
return dfd.addBoth(lambda _: self._finish_stopping_engine())
|
||||
dfd = self.close_spider(self.spider, reason="shutdown") if self.spider is not None else succeed(None)
|
||||
return dfd.addBoth(_finish_stopping_engine)
|
||||
|
||||
def close(self):
|
||||
"""Close the execution engine gracefully.
|
||||
|
||||
If it has already been started, stop it. In all cases, close all spiders
|
||||
and the downloader.
|
||||
def close(self) -> Deferred:
|
||||
"""
|
||||
Gracefully close the execution engine.
|
||||
If it has already been started, stop it. In all cases, close the spider and the downloader.
|
||||
"""
|
||||
if self.running:
|
||||
# Will also close spiders and downloader
|
||||
return self.stop()
|
||||
elif self.open_spiders:
|
||||
# Will also close downloader
|
||||
return self._close_all_spiders()
|
||||
else:
|
||||
return defer.succeed(self.downloader.close())
|
||||
return self.stop() # will also close spider and downloader
|
||||
if self.spider is not None:
|
||||
return self.close_spider(self.spider, reason="shutdown") # will also close downloader
|
||||
return succeed(self.downloader.close())
|
||||
|
||||
def pause(self):
|
||||
"""Pause the execution engine"""
|
||||
def pause(self) -> None:
|
||||
self.paused = True
|
||||
|
||||
def unpause(self):
|
||||
"""Resume the execution engine"""
|
||||
def unpause(self) -> None:
|
||||
self.paused = False
|
||||
|
||||
def _next_request(self, spider):
|
||||
slot = self.slot
|
||||
if not slot:
|
||||
return
|
||||
def _next_request(self) -> None:
|
||||
assert self.slot is not None # typing
|
||||
assert self.spider is not None # typing
|
||||
|
||||
if self.paused:
|
||||
return
|
||||
return None
|
||||
|
||||
while not self._needs_backout(spider):
|
||||
if not self._next_request_from_scheduler(spider):
|
||||
break
|
||||
while not self._needs_backout() and self._next_request_from_scheduler() is not None:
|
||||
pass
|
||||
|
||||
if slot.start_requests and not self._needs_backout(spider):
|
||||
if self.slot.start_requests is not None and not self._needs_backout():
|
||||
try:
|
||||
request = next(slot.start_requests)
|
||||
request = next(self.slot.start_requests)
|
||||
except StopIteration:
|
||||
slot.start_requests = None
|
||||
self.slot.start_requests = None
|
||||
except Exception:
|
||||
slot.start_requests = None
|
||||
logger.error('Error while obtaining start requests',
|
||||
exc_info=True, extra={'spider': spider})
|
||||
self.slot.start_requests = None
|
||||
logger.error('Error while obtaining start requests', exc_info=True, extra={'spider': self.spider})
|
||||
else:
|
||||
self.crawl(request, spider)
|
||||
self.crawl(request)
|
||||
|
||||
if self.spider_is_idle(spider) and slot.close_if_idle:
|
||||
self._spider_idle(spider)
|
||||
if self.spider_is_idle() and self.slot.close_if_idle:
|
||||
self._spider_idle()
|
||||
|
||||
def _needs_backout(self, spider):
|
||||
slot = self.slot
|
||||
def _needs_backout(self) -> bool:
|
||||
return (
|
||||
not self.running
|
||||
or slot.closing
|
||||
or self.slot.closing # type: ignore[union-attr]
|
||||
or self.downloader.needs_backout()
|
||||
or self.scraper.slot.needs_backout()
|
||||
or self.scraper.slot.needs_backout() # type: ignore[union-attr]
|
||||
)
|
||||
|
||||
def _next_request_from_scheduler(self, spider):
|
||||
slot = self.slot
|
||||
request = slot.scheduler.next_request()
|
||||
if not request:
|
||||
return
|
||||
d = self._download(request, spider)
|
||||
d.addBoth(self._handle_downloader_output, request, spider)
|
||||
def _next_request_from_scheduler(self) -> Optional[Deferred]:
|
||||
assert self.slot is not None # typing
|
||||
assert self.spider is not None # typing
|
||||
|
||||
request = self.slot.scheduler.next_request()
|
||||
if request is None:
|
||||
return None
|
||||
|
||||
d = self._download(request, self.spider)
|
||||
d.addBoth(self._handle_downloader_output, request, self.spider)
|
||||
d.addErrback(lambda f: logger.info('Error while handling downloader output',
|
||||
exc_info=failure_to_exc_info(f),
|
||||
extra={'spider': spider}))
|
||||
d.addBoth(lambda _: slot.remove_request(request))
|
||||
extra={'spider': self.spider}))
|
||||
d.addBoth(lambda _: self.slot.remove_request(request))
|
||||
d.addErrback(lambda f: logger.info('Error while removing request from slot',
|
||||
exc_info=failure_to_exc_info(f),
|
||||
extra={'spider': spider}))
|
||||
d.addBoth(lambda _: slot.nextcall.schedule())
|
||||
extra={'spider': self.spider}))
|
||||
d.addBoth(lambda _: self.slot.nextcall.schedule())
|
||||
d.addErrback(lambda f: logger.info('Error while scheduling new request',
|
||||
exc_info=failure_to_exc_info(f),
|
||||
extra={'spider': spider}))
|
||||
extra={'spider': self.spider}))
|
||||
return d
|
||||
|
||||
def _handle_downloader_output(self, response, request, spider):
|
||||
if not isinstance(response, (Request, Response, Failure)):
|
||||
raise TypeError(
|
||||
"Incorrect type: expected Request, Response or Failure, got "
|
||||
f"{type(response)}: {response!r}"
|
||||
)
|
||||
def _handle_downloader_output(
|
||||
self, result: Union[Request, Response, Failure], request: Request, spider: Spider
|
||||
) -> Optional[Deferred]:
|
||||
if not isinstance(result, (Request, Response, Failure)):
|
||||
raise TypeError(f"Incorrect type: expected Request, Response or Failure, got {type(result)}: {result!r}")
|
||||
|
||||
# downloader middleware can return requests (for example, redirects)
|
||||
if isinstance(response, Request):
|
||||
self.crawl(response, spider)
|
||||
return
|
||||
# response is a Response or Failure
|
||||
d = self.scraper.enqueue_scrape(response, request, spider)
|
||||
d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
|
||||
exc_info=failure_to_exc_info(f),
|
||||
extra={'spider': spider}))
|
||||
if isinstance(result, Request):
|
||||
self.crawl(result)
|
||||
return None
|
||||
|
||||
d = self.scraper.enqueue_scrape(result, request, spider)
|
||||
d.addErrback(
|
||||
lambda f: logger.error(
|
||||
"Error while enqueuing downloader output",
|
||||
exc_info=failure_to_exc_info(f),
|
||||
extra={'spider': spider},
|
||||
)
|
||||
)
|
||||
return d
|
||||
|
||||
def spider_is_idle(self, spider):
|
||||
if not self.scraper.slot.is_idle():
|
||||
# scraper is not idle
|
||||
def spider_is_idle(self, spider: Optional[Spider] = None) -> bool:
|
||||
if spider is not None:
|
||||
warnings.warn(
|
||||
"Passing a 'spider' argument to ExecutionEngine.spider_is_idle is deprecated",
|
||||
category=ScrapyDeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
if self.slot is None:
|
||||
raise RuntimeError("Engine slot not assigned")
|
||||
if not self.scraper.slot.is_idle(): # type: ignore[union-attr]
|
||||
return False
|
||||
|
||||
if self.downloader.active:
|
||||
# downloader has pending requests
|
||||
if self.downloader.active: # downloader has pending requests
|
||||
return False
|
||||
|
||||
if self.slot.start_requests is not None:
|
||||
# not all start requests are handled
|
||||
if self.slot.start_requests is not None: # not all start requests are handled
|
||||
return False
|
||||
|
||||
if self.slot.scheduler.has_pending_requests():
|
||||
# scheduler has pending requests
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
@property
|
||||
def open_spiders(self):
|
||||
return [self.spider] if self.spider else []
|
||||
def crawl(self, request: Request, spider: Optional[Spider] = None) -> None:
|
||||
"""Inject the request into the spider <-> downloader pipeline"""
|
||||
if spider is not None:
|
||||
warnings.warn(
|
||||
"Passing a 'spider' argument to ExecutionEngine.crawl is deprecated",
|
||||
category=ScrapyDeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
if spider is not self.spider:
|
||||
raise RuntimeError(f"The spider {spider.name!r} does not match the open spider")
|
||||
if self.spider is None:
|
||||
raise RuntimeError(f"No open spider to crawl: {request}")
|
||||
self._schedule_request(request, self.spider)
|
||||
self.slot.nextcall.schedule() # type: ignore[union-attr]
|
||||
|
||||
def has_capacity(self):
|
||||
"""Does the engine have capacity to handle more spiders"""
|
||||
return not bool(self.slot)
|
||||
|
||||
def crawl(self, request, spider):
|
||||
if spider not in self.open_spiders:
|
||||
raise RuntimeError(f"Spider {spider.name!r} not opened when crawling: {request}")
|
||||
self.schedule(request, spider)
|
||||
self.slot.nextcall.schedule()
|
||||
|
||||
def schedule(self, request, spider):
|
||||
def _schedule_request(self, request: Request, spider: Spider) -> None:
|
||||
self.signals.send_catch_log(signals.request_scheduled, request=request, spider=spider)
|
||||
if not self.slot.scheduler.enqueue_request(request):
|
||||
if not self.slot.scheduler.enqueue_request(request): # type: ignore[union-attr]
|
||||
self.signals.send_catch_log(signals.request_dropped, request=request, spider=spider)
|
||||
|
||||
def download(self, request, spider):
|
||||
d = self._download(request, spider)
|
||||
d.addBoth(self._downloaded, self.slot, request, spider)
|
||||
return d
|
||||
def download(self, request: Request, spider: Optional[Spider] = None) -> Deferred:
|
||||
"""Return a Deferred which fires with a Response as result, only downloader middlewares are applied"""
|
||||
if spider is None:
|
||||
spider = self.spider
|
||||
else:
|
||||
warnings.warn(
|
||||
"Passing a 'spider' argument to ExecutionEngine.download is deprecated",
|
||||
category=ScrapyDeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
if spider is not self.spider:
|
||||
logger.warning("The spider '%s' does not match the open spider", spider.name)
|
||||
if spider is None:
|
||||
raise RuntimeError(f"No open spider to crawl: {request}")
|
||||
return self._download(request, spider).addBoth(self._downloaded, request, spider)
|
||||
|
||||
def _downloaded(self, response, slot, request, spider):
|
||||
slot.remove_request(request)
|
||||
return self.download(response, spider) if isinstance(response, Request) else response
|
||||
def _downloaded(
|
||||
self, result: Union[Response, Request], request: Request, spider: Spider
|
||||
) -> Union[Deferred, Response]:
|
||||
assert self.slot is not None # typing
|
||||
self.slot.remove_request(request)
|
||||
return self.download(result, spider) if isinstance(result, Request) else result
|
||||
|
||||
def _download(self, request, spider):
|
||||
slot = self.slot
|
||||
slot.add_request(request)
|
||||
def _download(self, request: Request, spider: Spider) -> Deferred:
|
||||
assert self.slot is not None # typing
|
||||
|
||||
def _on_success(response):
|
||||
if not isinstance(response, (Response, Request)):
|
||||
raise TypeError(
|
||||
"Incorrect type: expected Response or Request, got "
|
||||
f"{type(response)}: {response!r}"
|
||||
)
|
||||
if isinstance(response, Response):
|
||||
if response.request is None:
|
||||
response.request = request
|
||||
logkws = self.logformatter.crawled(response.request, response, spider)
|
||||
self.slot.add_request(request)
|
||||
|
||||
def _on_success(result: Union[Response, Request]) -> Union[Response, Request]:
|
||||
if not isinstance(result, (Response, Request)):
|
||||
raise TypeError(f"Incorrect type: expected Response or Request, got {type(result)}: {result!r}")
|
||||
if isinstance(result, Response):
|
||||
if result.request is None:
|
||||
result.request = request
|
||||
logkws = self.logformatter.crawled(result.request, result, spider)
|
||||
if logkws is not None:
|
||||
logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
|
||||
logger.log(*logformatter_adapter(logkws), extra={"spider": spider})
|
||||
self.signals.send_catch_log(
|
||||
signal=signals.response_received,
|
||||
response=response,
|
||||
request=response.request,
|
||||
response=result,
|
||||
request=result.request,
|
||||
spider=spider,
|
||||
)
|
||||
return response
|
||||
return result
|
||||
|
||||
def _on_complete(_):
|
||||
slot.nextcall.schedule()
|
||||
self.slot.nextcall.schedule()
|
||||
return _
|
||||
|
||||
dwld = self.downloader.fetch(request, spider)
|
||||
@ -265,58 +289,52 @@ class ExecutionEngine:
|
||||
dwld.addBoth(_on_complete)
|
||||
return dwld
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def open_spider(self, spider, start_requests=(), close_if_idle=True):
|
||||
if not self.has_capacity():
|
||||
@inlineCallbacks
|
||||
def open_spider(self, spider: Spider, start_requests: Iterable = (), close_if_idle: bool = True):
|
||||
if self.slot is not None:
|
||||
raise RuntimeError(f"No free spider slot when opening {spider.name!r}")
|
||||
logger.info("Spider opened", extra={'spider': spider})
|
||||
nextcall = CallLaterOnce(self._next_request, spider)
|
||||
scheduler = self.scheduler_cls.from_crawler(self.crawler)
|
||||
nextcall = CallLaterOnce(self._next_request)
|
||||
scheduler = create_instance(self.scheduler_cls, settings=None, crawler=self.crawler)
|
||||
start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
|
||||
slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
|
||||
self.slot = slot
|
||||
self.slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
|
||||
self.spider = spider
|
||||
yield scheduler.open(spider)
|
||||
yield self.scraper.open_spider(spider)
|
||||
self.crawler.stats.open_spider(spider)
|
||||
yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
|
||||
slot.nextcall.schedule()
|
||||
slot.heartbeat.start(5)
|
||||
self.slot.nextcall.schedule()
|
||||
self.slot.heartbeat.start(5)
|
||||
|
||||
def _spider_idle(self, spider):
|
||||
"""Called when a spider gets idle. This function is called when there
|
||||
are no remaining pages to download or schedule. It can be called
|
||||
multiple times. If some extension raises a DontCloseSpider exception
|
||||
(in the spider_idle signal handler) the spider is not closed until the
|
||||
next loop and this function is guaranteed to be called (at least) once
|
||||
again for this spider.
|
||||
def _spider_idle(self) -> None:
|
||||
"""
|
||||
res = self.signals.send_catch_log(signals.spider_idle, spider=spider, dont_log=DontCloseSpider)
|
||||
Called when a spider gets idle, i.e. when there are no remaining requests to download or schedule.
|
||||
It can be called multiple times. If a handler for the spider_idle signal raises a DontCloseSpider
|
||||
exception, the spider is not closed until the next loop and this function is guaranteed to be called
|
||||
(at least) once again.
|
||||
"""
|
||||
assert self.spider is not None # typing
|
||||
res = self.signals.send_catch_log(signals.spider_idle, spider=self.spider, dont_log=DontCloseSpider)
|
||||
if any(isinstance(x, Failure) and isinstance(x.value, DontCloseSpider) for _, x in res):
|
||||
return
|
||||
return None
|
||||
if self.spider_is_idle():
|
||||
self.close_spider(self.spider, reason='finished')
|
||||
|
||||
if self.spider_is_idle(spider):
|
||||
self.close_spider(spider, reason='finished')
|
||||
|
||||
def close_spider(self, spider, reason='cancelled'):
|
||||
def close_spider(self, spider: Spider, reason: str = "cancelled") -> Deferred:
|
||||
"""Close (cancel) spider and clear all its outstanding requests"""
|
||||
if self.slot is None:
|
||||
raise RuntimeError("Engine slot not assigned")
|
||||
|
||||
slot = self.slot
|
||||
if slot.closing:
|
||||
return slot.closing
|
||||
logger.info("Closing spider (%(reason)s)",
|
||||
{'reason': reason},
|
||||
extra={'spider': spider})
|
||||
if self.slot.closing is not None:
|
||||
return self.slot.closing
|
||||
|
||||
dfd = slot.close()
|
||||
logger.info("Closing spider (%(reason)s)", {'reason': reason}, extra={'spider': spider})
|
||||
|
||||
def log_failure(msg):
|
||||
def errback(failure):
|
||||
logger.error(
|
||||
msg,
|
||||
exc_info=failure_to_exc_info(failure),
|
||||
extra={'spider': spider}
|
||||
)
|
||||
dfd = self.slot.close()
|
||||
|
||||
def log_failure(msg: str) -> Callable:
|
||||
def errback(failure: Failure) -> None:
|
||||
logger.error(msg, exc_info=failure_to_exc_info(failure), extra={'spider': spider})
|
||||
return errback
|
||||
|
||||
dfd.addBoth(lambda _: self.downloader.close())
|
||||
@ -325,19 +343,18 @@ class ExecutionEngine:
|
||||
dfd.addBoth(lambda _: self.scraper.close_spider(spider))
|
||||
dfd.addErrback(log_failure('Scraper close failure'))
|
||||
|
||||
dfd.addBoth(lambda _: slot.scheduler.close(reason))
|
||||
dfd.addBoth(lambda _: self.slot.scheduler.close(reason))
|
||||
dfd.addErrback(log_failure('Scheduler close failure'))
|
||||
|
||||
dfd.addBoth(lambda _: self.signals.send_catch_log_deferred(
|
||||
signal=signals.spider_closed, spider=spider, reason=reason))
|
||||
signal=signals.spider_closed, spider=spider, reason=reason,
|
||||
))
|
||||
dfd.addErrback(log_failure('Error while sending spider_close signal'))
|
||||
|
||||
dfd.addBoth(lambda _: self.crawler.stats.close_spider(spider, reason=reason))
|
||||
dfd.addErrback(log_failure('Stats close failure'))
|
||||
|
||||
dfd.addBoth(lambda _: logger.info("Spider closed (%(reason)s)",
|
||||
{'reason': reason},
|
||||
extra={'spider': spider}))
|
||||
dfd.addBoth(lambda _: logger.info("Spider closed (%(reason)s)", {'reason': reason}, extra={'spider': spider}))
|
||||
|
||||
dfd.addBoth(lambda _: setattr(self, 'slot', None))
|
||||
dfd.addErrback(log_failure('Error while unassigning slot'))
|
||||
@ -349,12 +366,26 @@ class ExecutionEngine:
|
||||
|
||||
return dfd
|
||||
|
||||
def _close_all_spiders(self):
|
||||
dfds = [self.close_spider(s, reason='shutdown') for s in self.open_spiders]
|
||||
dlist = defer.DeferredList(dfds)
|
||||
return dlist
|
||||
@property
|
||||
def open_spiders(self) -> list:
|
||||
warnings.warn(
|
||||
"ExecutionEngine.open_spiders is deprecated, please use ExecutionEngine.spider instead",
|
||||
category=ScrapyDeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
return [self.spider] if self.spider is not None else []
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _finish_stopping_engine(self):
|
||||
yield self.signals.send_catch_log_deferred(signal=signals.engine_stopped)
|
||||
self._closewait.callback(None)
|
||||
def has_capacity(self) -> bool:
|
||||
warnings.warn("ExecutionEngine.has_capacity is deprecated", ScrapyDeprecationWarning, stacklevel=2)
|
||||
return not bool(self.slot)
|
||||
|
||||
def schedule(self, request: Request, spider: Spider) -> None:
|
||||
warnings.warn(
|
||||
"ExecutionEngine.schedule is deprecated, please use "
|
||||
"ExecutionEngine.crawl or ExecutionEngine.download instead",
|
||||
category=ScrapyDeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
if self.slot is None:
|
||||
raise RuntimeError("Engine slot not assigned")
|
||||
self._schedule_request(request, spider)
|
||||
|
@ -200,7 +200,7 @@ class Scraper:
|
||||
"""
|
||||
assert self.slot is not None # typing
|
||||
if isinstance(output, Request):
|
||||
self.crawler.engine.crawl(request=output, spider=spider)
|
||||
self.crawler.engine.crawl(request=output)
|
||||
elif is_item(output):
|
||||
self.slot.itemproc_size += 1
|
||||
dfd = self.itemproc.process_item(output, spider)
|
||||
|
@ -67,7 +67,7 @@ class RobotsTxtMiddleware:
|
||||
priority=self.DOWNLOAD_PRIORITY,
|
||||
meta={'dont_obey_robotstxt': True}
|
||||
)
|
||||
dfd = self.crawler.engine.download(robotsreq, spider)
|
||||
dfd = self.crawler.engine.download(robotsreq)
|
||||
dfd.addCallback(self._parse_robots, netloc, spider)
|
||||
dfd.addErrback(self._logerror, robotsreq, spider)
|
||||
dfd.addErrback(self._robots_error, netloc)
|
||||
|
@ -88,10 +88,8 @@ class MemoryUsage:
|
||||
self._send_report(self.notify_mails, subj)
|
||||
self.crawler.stats.set_value('memusage/limit_notified', 1)
|
||||
|
||||
open_spiders = self.crawler.engine.open_spiders
|
||||
if open_spiders:
|
||||
for spider in open_spiders:
|
||||
self.crawler.engine.close_spider(spider, 'memusage_exceeded')
|
||||
if self.crawler.engine.spider is not None:
|
||||
self.crawler.engine.close_spider(self.crawler.engine.spider, 'memusage_exceeded')
|
||||
else:
|
||||
self.crawler.stop()
|
||||
|
||||
|
@ -173,7 +173,7 @@ class MediaPipeline:
|
||||
errback=self.media_failed, errbackArgs=(request, info))
|
||||
else:
|
||||
self._modify_media_request(request)
|
||||
dfd = self.crawler.engine.download(request, info.spider)
|
||||
dfd = self.crawler.engine.download(request)
|
||||
dfd.addCallbacks(
|
||||
callback=self.media_downloaded, callbackArgs=(request, info), callbackKeywords={'item': item},
|
||||
errback=self.media_failed, errbackArgs=(request, info))
|
||||
|
@ -79,7 +79,7 @@ class Shell:
|
||||
spider = self._open_spider(request, spider)
|
||||
d = _request_deferred(request)
|
||||
d.addCallback(lambda x: (x, spider))
|
||||
self.crawler.engine.crawl(request, spider)
|
||||
self.crawler.engine.crawl(request)
|
||||
return d
|
||||
|
||||
def _open_spider(self, request, spider):
|
||||
|
@ -8,11 +8,10 @@ def get_engine_status(engine):
|
||||
"""Return a report of the current engine status"""
|
||||
tests = [
|
||||
"time()-engine.start_time",
|
||||
"engine.has_capacity()",
|
||||
"len(engine.downloader.active)",
|
||||
"engine.scraper.is_idle()",
|
||||
"engine.spider.name",
|
||||
"engine.spider_is_idle(engine.spider)",
|
||||
"engine.spider_is_idle()",
|
||||
"engine.slot.closing",
|
||||
"len(engine.slot.inprogress)",
|
||||
"len(engine.slot.scheduler.dqs or [])",
|
||||
|
@ -42,7 +42,7 @@ Disallow: /some/randome/page.html
|
||||
""".encode('utf-8')
|
||||
response = TextResponse('http://site.local/robots.txt', body=ROBOTS)
|
||||
|
||||
def return_response(request, spider):
|
||||
def return_response(request):
|
||||
deferred = Deferred()
|
||||
reactor.callFromThread(deferred.callback, response)
|
||||
return deferred
|
||||
@ -79,7 +79,7 @@ Disallow: /some/randome/page.html
|
||||
crawler.settings.set('ROBOTSTXT_OBEY', True)
|
||||
response = Response('http://site.local/robots.txt', body=b'GIF89a\xd3\x00\xfe\x00\xa2')
|
||||
|
||||
def return_response(request, spider):
|
||||
def return_response(request):
|
||||
deferred = Deferred()
|
||||
reactor.callFromThread(deferred.callback, response)
|
||||
return deferred
|
||||
@ -102,7 +102,7 @@ Disallow: /some/randome/page.html
|
||||
crawler.settings.set('ROBOTSTXT_OBEY', True)
|
||||
response = Response('http://site.local/robots.txt')
|
||||
|
||||
def return_response(request, spider):
|
||||
def return_response(request):
|
||||
deferred = Deferred()
|
||||
reactor.callFromThread(deferred.callback, response)
|
||||
return deferred
|
||||
@ -122,7 +122,7 @@ Disallow: /some/randome/page.html
|
||||
self.crawler.settings.set('ROBOTSTXT_OBEY', True)
|
||||
err = error.DNSLookupError('Robotstxt address not found')
|
||||
|
||||
def return_failure(request, spider):
|
||||
def return_failure(request):
|
||||
deferred = Deferred()
|
||||
reactor.callFromThread(deferred.errback, failure.Failure(err))
|
||||
return deferred
|
||||
@ -138,7 +138,7 @@ Disallow: /some/randome/page.html
|
||||
self.crawler.settings.set('ROBOTSTXT_OBEY', True)
|
||||
err = error.DNSLookupError('Robotstxt address not found')
|
||||
|
||||
def immediate_failure(request, spider):
|
||||
def immediate_failure(request):
|
||||
deferred = Deferred()
|
||||
deferred.errback(failure.Failure(err))
|
||||
return deferred
|
||||
@ -150,7 +150,7 @@ Disallow: /some/randome/page.html
|
||||
def test_ignore_robotstxt_request(self):
|
||||
self.crawler.settings.set('ROBOTSTXT_OBEY', True)
|
||||
|
||||
def ignore_request(request, spider):
|
||||
def ignore_request(request):
|
||||
deferred = Deferred()
|
||||
reactor.callFromThread(deferred.errback, failure.Failure(IgnoreRequest()))
|
||||
return deferred
|
||||
|
@ -13,6 +13,7 @@ module with the ``runserver`` argument::
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import warnings
|
||||
from collections import defaultdict
|
||||
from urllib.parse import urlparse
|
||||
|
||||
@ -25,6 +26,7 @@ from twisted.web import server, static, util
|
||||
|
||||
from scrapy import signals
|
||||
from scrapy.core.engine import ExecutionEngine
|
||||
from scrapy.exceptions import ScrapyDeprecationWarning
|
||||
from scrapy.http import Request
|
||||
from scrapy.item import Item, Field
|
||||
from scrapy.linkextractors import LinkExtractor
|
||||
@ -382,22 +384,104 @@ class EngineTest(unittest.TestCase):
|
||||
yield e.close()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_close_spiders_downloader(self):
|
||||
e = ExecutionEngine(get_crawler(TestSpider), lambda _: None)
|
||||
yield e.open_spider(TestSpider(), [])
|
||||
self.assertEqual(len(e.open_spiders), 1)
|
||||
yield e.close()
|
||||
self.assertEqual(len(e.open_spiders), 0)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_close_engine_spiders_downloader(self):
|
||||
def test_start_already_running_exception(self):
|
||||
e = ExecutionEngine(get_crawler(TestSpider), lambda _: None)
|
||||
yield e.open_spider(TestSpider(), [])
|
||||
e.start()
|
||||
self.assertTrue(e.running)
|
||||
yield e.close()
|
||||
self.assertFalse(e.running)
|
||||
self.assertEqual(len(e.open_spiders), 0)
|
||||
yield self.assertFailure(e.start(), RuntimeError).addBoth(
|
||||
lambda exc: self.assertEqual(str(exc), "Engine already running")
|
||||
)
|
||||
yield e.stop()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_close_spiders_downloader(self):
|
||||
with warnings.catch_warnings(record=True) as warning_list:
|
||||
e = ExecutionEngine(get_crawler(TestSpider), lambda _: None)
|
||||
yield e.open_spider(TestSpider(), [])
|
||||
self.assertEqual(len(e.open_spiders), 1)
|
||||
yield e.close()
|
||||
self.assertEqual(len(e.open_spiders), 0)
|
||||
self.assertEqual(warning_list[0].category, ScrapyDeprecationWarning)
|
||||
self.assertEqual(
|
||||
str(warning_list[0].message),
|
||||
"ExecutionEngine.open_spiders is deprecated, please use ExecutionEngine.spider instead",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_close_engine_spiders_downloader(self):
|
||||
with warnings.catch_warnings(record=True) as warning_list:
|
||||
e = ExecutionEngine(get_crawler(TestSpider), lambda _: None)
|
||||
yield e.open_spider(TestSpider(), [])
|
||||
e.start()
|
||||
self.assertTrue(e.running)
|
||||
yield e.close()
|
||||
self.assertFalse(e.running)
|
||||
self.assertEqual(len(e.open_spiders), 0)
|
||||
self.assertEqual(warning_list[0].category, ScrapyDeprecationWarning)
|
||||
self.assertEqual(
|
||||
str(warning_list[0].message),
|
||||
"ExecutionEngine.open_spiders is deprecated, please use ExecutionEngine.spider instead",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_crawl_deprecated_spider_arg(self):
|
||||
with warnings.catch_warnings(record=True) as warning_list:
|
||||
e = ExecutionEngine(get_crawler(TestSpider), lambda _: None)
|
||||
spider = TestSpider()
|
||||
yield e.open_spider(spider, [])
|
||||
e.start()
|
||||
e.crawl(Request("data:,"), spider)
|
||||
yield e.close()
|
||||
self.assertEqual(warning_list[0].category, ScrapyDeprecationWarning)
|
||||
self.assertEqual(
|
||||
str(warning_list[0].message),
|
||||
"Passing a 'spider' argument to ExecutionEngine.crawl is deprecated",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_download_deprecated_spider_arg(self):
|
||||
with warnings.catch_warnings(record=True) as warning_list:
|
||||
e = ExecutionEngine(get_crawler(TestSpider), lambda _: None)
|
||||
spider = TestSpider()
|
||||
yield e.open_spider(spider, [])
|
||||
e.start()
|
||||
e.download(Request("data:,"), spider)
|
||||
yield e.close()
|
||||
self.assertEqual(warning_list[0].category, ScrapyDeprecationWarning)
|
||||
self.assertEqual(
|
||||
str(warning_list[0].message),
|
||||
"Passing a 'spider' argument to ExecutionEngine.download is deprecated",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_deprecated_schedule(self):
|
||||
with warnings.catch_warnings(record=True) as warning_list:
|
||||
e = ExecutionEngine(get_crawler(TestSpider), lambda _: None)
|
||||
spider = TestSpider()
|
||||
yield e.open_spider(spider, [])
|
||||
e.start()
|
||||
e.schedule(Request("data:,"), spider)
|
||||
yield e.close()
|
||||
self.assertEqual(warning_list[0].category, ScrapyDeprecationWarning)
|
||||
self.assertEqual(
|
||||
str(warning_list[0].message),
|
||||
"ExecutionEngine.schedule is deprecated, please use "
|
||||
"ExecutionEngine.crawl or ExecutionEngine.download instead",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_deprecated_has_capacity(self):
|
||||
with warnings.catch_warnings(record=True) as warning_list:
|
||||
e = ExecutionEngine(get_crawler(TestSpider), lambda _: None)
|
||||
self.assertTrue(e.has_capacity())
|
||||
spider = TestSpider()
|
||||
yield e.open_spider(spider, [])
|
||||
self.assertFalse(e.has_capacity())
|
||||
e.start()
|
||||
yield e.close()
|
||||
self.assertTrue(e.has_capacity())
|
||||
self.assertEqual(warning_list[0].category, ScrapyDeprecationWarning)
|
||||
self.assertEqual(str(warning_list[0].message), "ExecutionEngine.has_capacity is deprecated")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
Loading…
x
Reference in New Issue
Block a user