1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-24 06:43:43 +00:00

Core logic improvement: wait for Downloader and Scraper to close the spiders before going on and finish closing them

This commit is contained in:
Pablo Hoffman 2010-06-01 13:49:01 -03:00
parent 9523cab25c
commit 4595c92cc2
3 changed files with 36 additions and 35 deletions

View File

@ -139,6 +139,7 @@ class Downloader(object):
site = self.sites.get(spider)
if site and site.closing and not site.active:
del self.sites[spider]
site.closing.callback(None)
def _download(self, site, request, spider):
# The order is very important for the following deferreds. Do not change!
@ -165,9 +166,7 @@ class Downloader(object):
def open_spider(self, spider):
"""Allocate resources to begin processing a spider"""
if spider in self.sites:
raise RuntimeError('Downloader spider already opened: %s' % spider)
assert spider not in self.sites, "Spider already opened: %s" % spider
self.sites[spider] = SpiderInfo(
download_delay=getattr(spider, 'download_delay', None),
max_concurrent_requests=getattr(spider, 'max_concurrent_requests', None)
@ -175,13 +174,12 @@ class Downloader(object):
def close_spider(self, spider):
"""Free any resources associated with the given spider"""
assert spider in self.sites, "Spider not opened: %s" % spider
site = self.sites.get(spider)
if not site or site.closing:
raise RuntimeError('Downloader spider already closed: %s' % spider)
site.closing = True
site.closing = defer.Deferred()
site.cancel_request_calls()
self._process_queue(spider)
return site.closing
def is_idle(self):
return not self.sites

View File

@ -153,6 +153,8 @@ class ExecutionEngine(object):
log.msg("Unable to crawl Request with no callback: %s" % request,
level=log.ERROR, spider=spider)
return
if spider in self.closing: # ignore requests for spiders being closed
return
schd = mustbe_deferred(self.schedule, request, spider)
# FIXME: we can't log errors because we would be preventing them from
# propagating to the request errback. This should be fixed after the
@ -242,43 +244,41 @@ class ExecutionEngine(object):
def close_spider(self, spider, reason='cancelled'):
"""Close (cancel) spider and clear all its outstanding requests"""
if spider not in self.closing:
log.msg("Closing spider (%s)" % reason, spider=spider)
self.closing[spider] = reason
self.downloader.close_spider(spider)
self.scheduler.clear_pending_requests(spider)
return self._finish_closing_spider_if_idle(spider)
return defer.succeed(None)
if spider in self.closing:
return defer.succeed(None)
log.msg("Closing spider (%s)" % reason, spider=spider)
self.closing[spider] = reason
self.scheduler.clear_pending_requests(spider)
dfd = self.downloader.close_spider(spider)
dfd.addBoth(lambda _: self.scheduler.close_spider(spider))
dfd.addErrback(log.err, "Unhandled error in scheduler.close_spider()", \
spider=spider)
dfd.addBoth(lambda _: self.scraper.close_spider(spider))
dfd.addErrback(log.err, "Unhandled error in scraper.close_spider()", \
spider=spider)
dfd.addBoth(lambda _: self._finish_closing_spider(spider))
if self.killed:
return self._finish_closing_spider(spider)
return dfd
def _close_all_spiders(self):
dfds = [self.close_spider(s, reason='shutdown') for s in self.open_spiders]
dlist = defer.DeferredList(dfds)
return dlist
def _finish_closing_spider_if_idle(self, spider):
"""Call _finish_closing_spider if spider is idle"""
if self.spider_is_idle(spider) or self.killed:
return self._finish_closing_spider(spider)
else:
dfd = defer.Deferred()
dfd.addCallback(self._finish_closing_spider_if_idle)
delay = 5 if self.running else 1
reactor.callLater(delay, dfd.callback, spider)
return dfd
def _finish_closing_spider(self, spider):
"""This function is called after the spider has been closed"""
self.scheduler.close_spider(spider)
self.scraper.close_spider(spider)
reason = self.closing.pop(spider, 'finished')
send_catch_log(signal=signals.spider_closed, sender=self.__class__, \
spider=spider, reason=reason)
stats.close_spider(spider, reason=reason)
call = self._next_request_calls.pop(spider, None)
if call and call.active():
call.cancel()
dfd = defer.maybeDeferred(spiders.close_spider, spider)
dfd.addErrback(log.err, "Unhandled error on SpiderManager.close_spider()",
dfd = defer.maybeDeferred(stats.close_spider, spider, reason=reason)
dfd.addErrback(log.err, "Unhandled error in stats.close_spider()",
spider=spider)
dfd.addBoth(lambda _: spiders.close_spider(spider))
dfd.addErrback(log.err, "Unhandled error in spiders.close_spider()",
spider=spider)
dfd.addBoth(lambda _: log.msg("Spider closed (%s)" % reason, spider=spider))
dfd.addBoth(lambda _: self._spider_closed_callback(spider))

View File

@ -29,6 +29,7 @@ class SpiderInfo(object):
self.active = set()
self.active_size = 0
self.itemproc_size = 0
self.closing = None
def add_response_request(self, response, request):
deferred = defer.Deferred()
@ -68,16 +69,15 @@ class Scraper(object):
def open_spider(self, spider):
"""Open the given spider for scraping and allocate resources for it"""
if spider in self.sites:
raise RuntimeError('Scraper spider already opened: %s' % spider)
assert spider not in self.sites, "Spider already opened: %s" % spider
self.sites[spider] = SpiderInfo()
self.itemproc.open_spider(spider)
def close_spider(self, spider):
"""Close a spider being scraped and release its resources"""
if spider not in self.sites:
raise RuntimeError('Scraper spider already closed: %s' % spider)
self.sites.pop(spider)
assert spider in self.sites, "Spider not opened: %s" % spider
site = self.sites[spider]
site.closing = defer.Deferred()
self.itemproc.close_spider(spider)
def is_idle(self):
@ -93,6 +93,9 @@ class Scraper(object):
# spider=spider)
def finish_scraping(_):
site.finish_response(response)
if site.closing and site.is_idle():
del self.sites[spider]
site.closing.callback(None)
self._scrape_next(spider, site)
return _
dfd.addBoth(finish_scraping)