mirror of
https://github.com/scrapy/scrapy.git
synced 2025-02-25 14:23:43 +00:00
more downloader cleanup and fixed bug which was preventing domains to get properly closed
This commit is contained in:
parent
e8543ca1f9
commit
fb8e24acb5
@ -11,7 +11,8 @@ from scrapy.spider import spiders
|
||||
from scrapy.core.downloader.middleware import DownloaderMiddlewareManager
|
||||
from scrapy.core.downloader.handlers import download_any
|
||||
from scrapy.conf import settings
|
||||
from scrapy.utils.defer import chain_deferred, mustbe_deferred
|
||||
from scrapy.utils.defer import mustbe_deferred
|
||||
from scrapy import log
|
||||
|
||||
|
||||
class SiteInfo(object):
|
||||
@ -33,20 +34,14 @@ class SiteInfo(object):
|
||||
self.queue = []
|
||||
self.active = set()
|
||||
self.transferring = set()
|
||||
self.closed = False
|
||||
self.closing = False
|
||||
self.lastseen = None
|
||||
|
||||
def is_idle(self):
|
||||
return not (self.active or self.transferring)
|
||||
|
||||
def capacity(self):
|
||||
def free_transfer_slots(self):
|
||||
return self.max_concurrent_requests - len(self.transferring)
|
||||
|
||||
def outstanding(self):
|
||||
return len(self.active) + len(self.queue)
|
||||
|
||||
def needs_backout(self):
|
||||
return self.outstanding() > (2 * self.max_concurrent_requests)
|
||||
return len(self.queue) > 2 * self.max_concurrent_requests
|
||||
|
||||
|
||||
class Downloader(object):
|
||||
@ -74,12 +69,13 @@ class Downloader(object):
|
||||
not be downloaded from site.
|
||||
"""
|
||||
site = self.sites[spider.domain_name]
|
||||
if site.closed:
|
||||
raise IgnoreRequest('Can\'t fetch on a closed domain')
|
||||
if site.closing:
|
||||
raise IgnoreRequest('Can\'t fetch on a closing domain')
|
||||
|
||||
site.active.add(request)
|
||||
def _deactivate(_):
|
||||
site.active.remove(request)
|
||||
self._close_if_idle(spider.domain_name)
|
||||
return _
|
||||
|
||||
return self.middleware.download(self.enqueue, request, spider).addBoth(_deactivate)
|
||||
@ -117,12 +113,15 @@ class Downloader(object):
|
||||
site.lastseen = now
|
||||
|
||||
# Process requests in queue if there are free slots to transfer for this site
|
||||
while site.queue and site.capacity() > 0:
|
||||
while site.queue and site.free_transfer_slots() > 0:
|
||||
request, deferred = site.queue.pop(0)
|
||||
self._download(site, request, spider).chainDeferred(deferred)
|
||||
|
||||
# Free site resources if domain was asked to be closed and it is idle.
|
||||
if site.closed and site.is_idle():
|
||||
self._close_if_idle(domain)
|
||||
|
||||
def _close_if_idle(self, domain):
|
||||
site = self.sites.get(domain)
|
||||
if site and site.closing and not site.active:
|
||||
del self.sites[domain]
|
||||
self.engine.closed_domain(domain) # notify engine.
|
||||
|
||||
@ -148,29 +147,13 @@ class Downloader(object):
|
||||
def close_domain(self, domain):
|
||||
"""Free any resources associated with the given domain"""
|
||||
site = self.sites.get(domain)
|
||||
if not site or site.closed:
|
||||
if not site or site.closing:
|
||||
raise RuntimeError('Downloader domain already closed: %s' % domain)
|
||||
|
||||
site.closed = True
|
||||
site.closing = True
|
||||
spider = spiders.fromdomain(domain)
|
||||
self.process_queue(spider)
|
||||
|
||||
def needs_backout(self, domain):
|
||||
site = self.sites.get(domain)
|
||||
return (site.needs_backout() if site else True)
|
||||
|
||||
# Most of the following functions must be reviewed to decide if are really needed
|
||||
def domain_is_open(self, domain):
|
||||
return domain in self.sites
|
||||
|
||||
def outstanding(self, domain):
|
||||
"""The number of outstanding requests for a domain
|
||||
This includes both active requests and pending requests.
|
||||
"""
|
||||
site = self.sites.get(domain)
|
||||
if site:
|
||||
return site.outstanding()
|
||||
|
||||
def has_capacity(self):
|
||||
"""Does the downloader have capacity to handle more domains"""
|
||||
return len(self.sites) < self.concurrent_domains
|
||||
|
@ -164,7 +164,7 @@ class ExecutionEngine(object):
|
||||
self.open_domain(domain, spider)
|
||||
return domain
|
||||
|
||||
def next_request(self, spider, breakloop=True):
|
||||
def next_request(self, spider, now=False):
|
||||
"""Scrape the next request for the domain passed.
|
||||
|
||||
The next request to be scraped is retrieved from the scheduler and
|
||||
@ -175,18 +175,16 @@ class ExecutionEngine(object):
|
||||
if self.paused:
|
||||
return reactor.callLater(5, self.next_request, spider)
|
||||
|
||||
if breakloop:
|
||||
# delaying make reentrant call to next_request safe
|
||||
return reactor.callLater(0, self.next_request, spider, breakloop=False)
|
||||
# call next_request in next reactor loop by default
|
||||
if not now:
|
||||
return reactor.callLater(0, self.next_request, spider, now=True)
|
||||
|
||||
domain = spider.domain_name
|
||||
|
||||
# check that the engine is still running and domain is open
|
||||
if not self.running:
|
||||
return
|
||||
|
||||
# backout enqueing downloads if domain needs it
|
||||
if domain in self.closing or self.downloader.needs_backout(domain):
|
||||
if not self.running or \
|
||||
domain in self.closing or \
|
||||
self.domain_is_closed(domain) or \
|
||||
self.downloader.sites[domain].needs_backout():
|
||||
return
|
||||
|
||||
# Next pending request from scheduler
|
||||
@ -208,12 +206,12 @@ class ExecutionEngine(object):
|
||||
def domain_is_idle(self, domain):
|
||||
scraping = self._scraping.get(domain)
|
||||
pending = self.scheduler.domain_has_pending_requests(domain)
|
||||
downloading = domain in self.downloader.sites and self.downloader.sites[domain].outstanding() > 0
|
||||
downloading = domain in self.downloader.sites and self.downloader.sites[domain].active
|
||||
haspipe = not self.pipeline.domain_is_idle(domain)
|
||||
return not (pending or downloading or haspipe or scraping)
|
||||
|
||||
def domain_is_open(self, domain):
|
||||
return domain in self.downloader.sites
|
||||
def domain_is_closed(self, domain):
|
||||
return domain not in self.downloader.sites
|
||||
|
||||
@property
|
||||
def open_domains(self):
|
||||
@ -283,9 +281,11 @@ class ExecutionEngine(object):
|
||||
|
||||
def schedule(self, request, spider):
|
||||
domain = spider.domain_name
|
||||
if domain in self.closing:
|
||||
raise IgnoreRequest()
|
||||
if not self.scheduler.domain_is_open(domain):
|
||||
self.scheduler.open_domain(domain)
|
||||
if not self.downloader.domain_is_open(domain):
|
||||
if self.domain_is_closed(domain): # scheduler auto-open
|
||||
self.domain_scheduler.add_domain(domain)
|
||||
self.next_request(spider)
|
||||
return self.scheduler.enqueue_request(domain, request)
|
||||
@ -429,7 +429,7 @@ class ExecutionEngine(object):
|
||||
"len(self.downloader.sites[domain].queue)",
|
||||
"len(self.downloader.sites[domain].active)",
|
||||
"len(self.downloader.sites[domain].transferring)",
|
||||
"self.downloader.sites[domain].closed",
|
||||
"self.downloader.sites[domain].closing",
|
||||
"self.downloader.sites[domain].lastseen",
|
||||
"self.pipeline.domain_is_idle(domain)",
|
||||
"len(self.pipeline.domaininfo[domain])",
|
||||
|
Loading…
x
Reference in New Issue
Block a user