mirror of
https://github.com/scrapy/scrapy.git
synced 2025-02-26 06:23:41 +00:00
Correctly detect when all managed crawlers are done in CrawlerRunner
This commit is contained in:
parent
68954fa503
commit
8ddf0811a8
@ -53,7 +53,8 @@ class Command(ScrapyCommand):
|
|||||||
|
|
||||||
# The crawler is created this way since the Shell manually handles the
|
# The crawler is created this way since the Shell manually handles the
|
||||||
# crawling engine, so the set up in the crawl method won't work
|
# crawling engine, so the set up in the crawl method won't work
|
||||||
crawler = self.crawler_process._create_logged_crawler(spidercls)
|
crawler = self.crawler_process._create_crawler(spidercls)
|
||||||
|
self.crawler_process._setup_crawler_logging(crawler)
|
||||||
# The Shell class needs a persistent engine in the crawler
|
# The Shell class needs a persistent engine in the crawler
|
||||||
crawler.engine = crawler._create_engine()
|
crawler.engine = crawler._create_engine()
|
||||||
crawler.engine.start()
|
crawler.engine.start()
|
||||||
|
@ -76,22 +76,21 @@ class CrawlerRunner(object):
|
|||||||
smcls = load_object(settings['SPIDER_MANAGER_CLASS'])
|
smcls = load_object(settings['SPIDER_MANAGER_CLASS'])
|
||||||
self.spiders = smcls.from_settings(settings.frozencopy())
|
self.spiders = smcls.from_settings(settings.frozencopy())
|
||||||
self.crawlers = set()
|
self.crawlers = set()
|
||||||
self.crawl_deferreds = set()
|
self._active = set()
|
||||||
|
|
||||||
def crawl(self, spidercls, *args, **kwargs):
|
def crawl(self, spidercls, *args, **kwargs):
|
||||||
crawler = self._create_logged_crawler(spidercls)
|
|
||||||
self.crawlers.add(crawler)
|
|
||||||
|
|
||||||
d = crawler.crawl(*args, **kwargs)
|
|
||||||
self.crawl_deferreds.add(d)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def _create_logged_crawler(self, spidercls):
|
|
||||||
crawler = self._create_crawler(spidercls)
|
crawler = self._create_crawler(spidercls)
|
||||||
log_observer = log.start_from_crawler(crawler)
|
self._setup_crawler_logging(crawler)
|
||||||
if log_observer:
|
self.crawlers.add(crawler)
|
||||||
crawler.signals.connect(log_observer.stop, signals.engine_stopped)
|
d = crawler.crawl(*args, **kwargs)
|
||||||
return crawler
|
self._active.add(d)
|
||||||
|
|
||||||
|
def _done(result):
|
||||||
|
self.crawlers.discard(crawler)
|
||||||
|
self._active.discard(d)
|
||||||
|
return result
|
||||||
|
|
||||||
|
return d.addBoth(_done)
|
||||||
|
|
||||||
def _create_crawler(self, spidercls):
|
def _create_crawler(self, spidercls):
|
||||||
if isinstance(spidercls, six.string_types):
|
if isinstance(spidercls, six.string_types):
|
||||||
@ -100,13 +99,22 @@ class CrawlerRunner(object):
|
|||||||
crawler_settings = self.settings.copy()
|
crawler_settings = self.settings.copy()
|
||||||
spidercls.update_settings(crawler_settings)
|
spidercls.update_settings(crawler_settings)
|
||||||
crawler_settings.freeze()
|
crawler_settings.freeze()
|
||||||
|
return Crawler(spidercls, crawler_settings)
|
||||||
|
|
||||||
crawler = Crawler(spidercls, crawler_settings)
|
def _setup_crawler_logging(self, crawler):
|
||||||
return crawler
|
log_observer = log.start_from_crawler(crawler)
|
||||||
|
if log_observer:
|
||||||
|
crawler.signals.connect(log_observer.stop, signals.engine_stopped)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
return defer.DeferredList(c.stop() for c in self.crawlers)
|
return defer.DeferredList(c.stop() for c in self.crawlers)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def join(self):
|
||||||
|
"""Wait for all managed crawlers to complete"""
|
||||||
|
while self._active:
|
||||||
|
yield defer.DeferredList(self._active)
|
||||||
|
|
||||||
|
|
||||||
class CrawlerProcess(CrawlerRunner):
|
class CrawlerProcess(CrawlerRunner):
|
||||||
"""A class to run multiple scrapy crawlers in a process simultaneously"""
|
"""A class to run multiple scrapy crawlers in a process simultaneously"""
|
||||||
@ -135,13 +143,15 @@ class CrawlerProcess(CrawlerRunner):
|
|||||||
|
|
||||||
def start(self, stop_after_crawl=True):
|
def start(self, stop_after_crawl=True):
|
||||||
if stop_after_crawl:
|
if stop_after_crawl:
|
||||||
d = defer.DeferredList(self.crawl_deferreds)
|
d = self.join()
|
||||||
|
# Don't start the reactor if the deferreds are already fired
|
||||||
if d.called:
|
if d.called:
|
||||||
# Don't start the reactor if the deferreds are already fired
|
|
||||||
return
|
return
|
||||||
d.addBoth(lambda _: self._stop_reactor())
|
d.addBoth(lambda _: self._stop_reactor())
|
||||||
|
|
||||||
if self.settings.getbool('DNSCACHE_ENABLED'):
|
if self.settings.getbool('DNSCACHE_ENABLED'):
|
||||||
reactor.installResolver(CachingThreadedResolver(reactor))
|
reactor.installResolver(CachingThreadedResolver(reactor))
|
||||||
|
|
||||||
reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
|
reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
|
||||||
reactor.run(installSignalHandlers=False) # blocking call
|
reactor.run(installSignalHandlers=False) # blocking call
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user