mirror of
https://github.com/scrapy/scrapy.git
synced 2025-02-24 22:43:57 +00:00
Merge pull request #206 from dangra/downloader-enhancements
AutoThrottle and Downloader enhancements
This commit is contained in:
commit
12475fccbe
@ -37,12 +37,6 @@ This adjusts download delays and concurrency based on the following rules:
|
||||
:setting:`AUTOTHROTTLE_START_DELAY`
|
||||
2. when a response is received, the download delay is adjusted to the
|
||||
average of previous download delay and the latency of the response.
|
||||
3. after :setting:`AUTOTHROTTLE_CONCURRENCY_CHECK_PERIOD` responses have
|
||||
passed, the average latency of this period is checked against the previous
|
||||
one and:
|
||||
|
||||
* if the latency remained constant (within standard deviation limits), it is increased
|
||||
* if the latency has increased (beyond standard deviation limits) and the concurrency is higher than 1, the concurrency is decreased
|
||||
|
||||
.. note:: The AutoThrottle extension honours the standard Scrapy settings for
|
||||
concurrency and delay. This means that it will never set a download delay
|
||||
@ -55,11 +49,11 @@ The settings used to control the AutoThrottle extension are:
|
||||
|
||||
* :setting:`AUTOTHROTTLE_ENABLED`
|
||||
* :setting:`AUTOTHROTTLE_START_DELAY`
|
||||
* :setting:`AUTOTHROTTLE_CONCURRENCY_CHECK_PERIOD`
|
||||
* :setting:`AUTOTHROTTLE_MAX_DELAY`
|
||||
* :setting:`AUTOTHROTTLE_DEBUG`
|
||||
* :setting:`DOWNLOAD_DELAY`
|
||||
* :setting:`CONCURRENT_REQUESTS_PER_DOMAIN`
|
||||
* :setting:`CONCURRENT_REQUESTS_PER_IP`
|
||||
* :setting:`DOWNLOAD_DELAY`
|
||||
|
||||
For more information see :ref:`autothrottle-algorithm`.
|
||||
|
||||
@ -81,14 +75,14 @@ Default: ``5.0``
|
||||
|
||||
The initial download delay (in seconds).
|
||||
|
||||
.. setting:: AUTOTHROTTLE_CONCURRENCY_CHECK_PERIOD
|
||||
.. setting:: AUTOTHROTTLE_MAX_DELAY
|
||||
|
||||
AUTOTHROTTLE_CONCURRENCY_CHECK_PERIOD
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
AUTOTHROTTLE_MAX_DELAY
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Default: ``10``
|
||||
Default: ``60.0``
|
||||
|
||||
How many responses should pass to perform concurrency adjustments.
|
||||
The maximum download delay (in seconds) to be set in case of high latencies.
|
||||
|
||||
.. setting:: AUTOTHROTTLE_DEBUG
|
||||
|
||||
|
@ -1,106 +1,69 @@
|
||||
import logging
|
||||
from scrapy.exceptions import NotConfigured
|
||||
from scrapy import signals
|
||||
from scrapy.utils.httpobj import urlparse_cached
|
||||
from scrapy.resolver import dnscache
|
||||
|
||||
|
||||
class AutoThrottle(object):
|
||||
|
||||
def __init__(self, crawler):
|
||||
settings = crawler.settings
|
||||
if not settings.getbool('AUTOTHROTTLE_ENABLED'):
|
||||
raise NotConfigured
|
||||
self.crawler = crawler
|
||||
crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)
|
||||
crawler.signals.connect(self.response_received, signal=signals.response_received)
|
||||
self.START_DELAY = settings.getfloat("AUTOTHROTTLE_START_DELAY", 5.0)
|
||||
self.CONCURRENCY_CHECK_PERIOD = settings.getint("AUTOTHROTTLE_CONCURRENCY_CHECK_PERIOD", 10)
|
||||
self.MAX_CONCURRENCY = self._max_concurency(settings)
|
||||
self.MIN_DOWNLOAD_DELAY = self._min_download_delay(settings)
|
||||
self.DEBUG = settings.getbool("AUTOTHROTTLE_DEBUG")
|
||||
self.last_latencies = [self.START_DELAY]
|
||||
self.last_lat = self.START_DELAY, 0.0
|
||||
if not crawler.settings.getbool('AUTOTHROTTLE_ENABLED'):
|
||||
raise NotConfigured
|
||||
|
||||
def _min_download_delay(self, settings):
|
||||
return max(settings.getfloat("AUTOTHROTTLE_MIN_DOWNLOAD_DELAY"),
|
||||
settings.getfloat("DOWNLOAD_DELAY"))
|
||||
|
||||
def _max_concurency(self, settings):
|
||||
delay = self._min_download_delay(settings)
|
||||
if delay == 0:
|
||||
candidates = ["AUTOTHROTTLE_MAX_CONCURRENCY",
|
||||
"CONCURRENT_REQUESTS_PER_DOMAIN", "CONCURRENT_REQUESTS_PER_IP"]
|
||||
candidates = [settings.getint(x) for x in candidates]
|
||||
candidates = [x for x in candidates if x > 0]
|
||||
if candidates:
|
||||
return min(candidates)
|
||||
return 1
|
||||
self.debug = crawler.settings.getbool("AUTOTHROTTLE_DEBUG")
|
||||
crawler.signals.connect(self._spider_opened, signal=signals.spider_opened)
|
||||
crawler.signals.connect(self._response_downloaded, signal=signals.response_downloaded)
|
||||
|
||||
@classmethod
|
||||
def from_crawler(cls, crawler):
|
||||
return cls(crawler)
|
||||
|
||||
def spider_opened(self, spider):
|
||||
if hasattr(spider, "download_delay"):
|
||||
self.MIN_DOWNLOAD_DELAY = spider.download_delay
|
||||
spider.download_delay = self.START_DELAY
|
||||
if hasattr(spider, "max_concurrent_requests"):
|
||||
self.MAX_CONCURRENCY = spider.max_concurrent_requests
|
||||
# override in order to avoid to initialize slot with concurrency > 1
|
||||
spider.max_concurrent_requests = 1
|
||||
def _spider_opened(self, spider):
|
||||
self.mindelay = self._min_delay(spider)
|
||||
self.maxdelay = self._max_delay(spider)
|
||||
spider.download_delay = self._start_delay(spider)
|
||||
|
||||
def response_received(self, response, spider):
|
||||
key, slot = self._get_slot(response.request)
|
||||
latency = response.meta.get('download_latency')
|
||||
|
||||
if not latency or not slot:
|
||||
def _min_delay(self, spider):
|
||||
s = self.crawler.settings
|
||||
return getattr(spider, 'download_delay', 0.0) or \
|
||||
s.getfloat('AUTOTHROTTLE_MIN_DOWNLOAD_DELAY') or \
|
||||
s.getfloat('DOWNLOAD_DELAY')
|
||||
|
||||
def _max_delay(self, spider):
|
||||
return self.crawler.settings.getfloat('AUTOTHROTTLE_MAX_DELAY', 60.0)
|
||||
|
||||
def _start_delay(self, spider):
|
||||
return max(self.mindelay, self.crawler.settings.getfloat('AUTOTHROTTLE_START_DELAY', 5.0))
|
||||
|
||||
def _response_downloaded(self, response, request, spider):
|
||||
key, slot = self._get_slot(request, spider)
|
||||
latency = request.meta.get('download_latency')
|
||||
if latency is None or slot is None:
|
||||
return
|
||||
|
||||
olddelay = slot.delay
|
||||
self._adjust_delay(slot, latency, response)
|
||||
self._check_concurrency(slot, latency)
|
||||
if self.debug:
|
||||
diff = slot.delay - olddelay
|
||||
size = len(response.body)
|
||||
conc = len(slot.transferring)
|
||||
msg = "slot: %s | conc:%2d | delay:%5d ms (%+d) | latency:%5d ms | size:%6d bytes" % \
|
||||
(key, conc, slot.delay * 1000, diff * 1000, latency * 1000, size)
|
||||
spider.log(msg, level=logging.INFO)
|
||||
|
||||
if self.DEBUG:
|
||||
spider.log("slot: %s | conc:%2d | delay:%5d ms | latency:%5d ms | size:%6d bytes" % \
|
||||
(key, slot.concurrency, slot.delay*1000, \
|
||||
latency*1000, len(response.body)))
|
||||
|
||||
def _get_slot(self, request):
|
||||
downloader = self.crawler.engine.downloader
|
||||
key = urlparse_cached(request).hostname or ''
|
||||
if downloader.ip_concurrency:
|
||||
key = dnscache.get(key, key)
|
||||
return key, downloader.slots.get(key) or downloader.inactive_slots.get(key)
|
||||
|
||||
def _check_concurrency(self, slot, latency):
|
||||
latencies = self.last_latencies
|
||||
latencies.append(latency)
|
||||
if len(latencies) == self.CONCURRENCY_CHECK_PERIOD:
|
||||
curavg, curdev = avg_stdev(latencies)
|
||||
preavg, predev = self.last_lat
|
||||
self.last_lat = curavg, curdev
|
||||
del latencies[:]
|
||||
if curavg > preavg + predev:
|
||||
if slot.concurrency > 1:
|
||||
slot.concurrency -= 1
|
||||
elif slot.concurrency < self.MAX_CONCURRENCY:
|
||||
slot.concurrency += 1
|
||||
def _get_slot(self, request, spider):
|
||||
key = request.meta.get('download_slot')
|
||||
return key, self.crawler.engine.downloader.slots.get(key)
|
||||
|
||||
def _adjust_delay(self, slot, latency, response):
|
||||
"""Define delay adjustment policy"""
|
||||
# if latency is bigger than old delay, then use latency instead of mean. Works better with problematic sites
|
||||
new_delay = (slot.delay + latency) / 2.0 if latency < slot.delay else latency
|
||||
# If latency is bigger than old delay, then use latency instead of mean.
|
||||
# It works better with problematic sites
|
||||
new_delay = min(max(self.mindelay, latency, (slot.delay + latency) / 2.0), self.maxdelay)
|
||||
|
||||
if new_delay < self.MIN_DOWNLOAD_DELAY:
|
||||
new_delay = self.MIN_DOWNLOAD_DELAY
|
||||
|
||||
# dont adjust delay if response status != 200 and new delay is smaller than old one,
|
||||
# as error pages (and redirections) are usually small and so tend to reduce latency, thus provoking a positive feedback
|
||||
# by reducing delay instead of increase.
|
||||
# Dont adjust delay if response status != 200 and new delay is smaller
|
||||
# than old one, as error pages (and redirections) are usually small and
|
||||
# so tend to reduce latency, thus provoking a positive feedback by
|
||||
# reducing delay instead of increase.
|
||||
if response.status == 200 or new_delay > slot.delay:
|
||||
slot.delay = new_delay
|
||||
|
||||
def avg_stdev(lst):
|
||||
"""Return average and standard deviation of the given list"""
|
||||
avg = sum(lst)/len(lst)
|
||||
sdsq = sum((x-avg) ** 2 for x in lst)
|
||||
stdev = (sdsq / (len(lst) -1)) ** 0.5
|
||||
return avg, stdev
|
||||
|
@ -2,20 +2,18 @@ import random
|
||||
import warnings
|
||||
from time import time
|
||||
from collections import deque
|
||||
from functools import partial
|
||||
|
||||
from twisted.internet import reactor, defer
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from scrapy.utils.defer import mustbe_deferred
|
||||
from scrapy.utils.httpobj import urlparse_cached
|
||||
from scrapy.resolver import dnscache
|
||||
from scrapy.exceptions import ScrapyDeprecationWarning
|
||||
from scrapy import signals
|
||||
from scrapy import log
|
||||
from .middleware import DownloaderMiddlewareManager
|
||||
from .handlers import DownloadHandlers
|
||||
|
||||
|
||||
class Slot(object):
|
||||
"""Downloader slot"""
|
||||
|
||||
@ -34,7 +32,7 @@ class Slot(object):
|
||||
|
||||
def download_delay(self):
|
||||
if self.randomize_delay:
|
||||
return random.uniform(0.5*self.delay, 1.5*self.delay)
|
||||
return random.uniform(0.5 * self.delay, 1.5 * self.delay)
|
||||
return self.delay
|
||||
|
||||
|
||||
@ -42,7 +40,7 @@ def _get_concurrency_delay(concurrency, spider, settings):
|
||||
delay = settings.getfloat('DOWNLOAD_DELAY')
|
||||
if hasattr(spider, 'DOWNLOAD_DELAY'):
|
||||
warnings.warn("%s.DOWNLOAD_DELAY attribute is deprecated, use %s.download_delay instead" %
|
||||
(type(spider).__name__, type(spider).__name__))
|
||||
(type(spider).__name__, type(spider).__name__))
|
||||
delay = spider.DOWNLOAD_DELAY
|
||||
if hasattr(spider, 'download_delay'):
|
||||
delay = spider.download_delay
|
||||
@ -50,8 +48,8 @@ def _get_concurrency_delay(concurrency, spider, settings):
|
||||
# TODO: remove for Scrapy 0.15
|
||||
c = settings.getint('CONCURRENT_REQUESTS_PER_SPIDER')
|
||||
if c:
|
||||
warnings.warn("CONCURRENT_REQUESTS_PER_SPIDER setting is deprecated, " \
|
||||
"use CONCURRENT_REQUESTS_PER_DOMAIN instead", ScrapyDeprecationWarning)
|
||||
warnings.warn("CONCURRENT_REQUESTS_PER_SPIDER setting is deprecated, "
|
||||
"use CONCURRENT_REQUESTS_PER_DOMAIN instead", ScrapyDeprecationWarning)
|
||||
concurrency = c
|
||||
# ----------------------------
|
||||
|
||||
@ -73,51 +71,48 @@ class Downloader(object):
|
||||
self.domain_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN')
|
||||
self.ip_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_IP')
|
||||
self.middleware = DownloaderMiddlewareManager.from_crawler(crawler)
|
||||
self.inactive_slots = {}
|
||||
|
||||
def fetch(self, request, spider):
|
||||
key, slot = self._get_slot(request, spider)
|
||||
|
||||
self.active.add(request)
|
||||
slot.active.add(request)
|
||||
def _deactivate(response):
|
||||
self.active.remove(request)
|
||||
slot.active.remove(request)
|
||||
if not slot.active: # remove empty slots
|
||||
self.inactive_slots[key] = self.slots.pop(key)
|
||||
|
||||
return response
|
||||
|
||||
dlfunc = partial(self._enqueue_request, slot=slot)
|
||||
dfd = self.middleware.download(dlfunc, request, spider)
|
||||
self.active.add(request)
|
||||
dfd = self.middleware.download(self._enqueue_request, request, spider)
|
||||
return dfd.addBoth(_deactivate)
|
||||
|
||||
def needs_backout(self):
|
||||
return len(self.active) >= self.total_concurrency
|
||||
|
||||
def _get_slot(self, request, spider):
|
||||
key = self._get_slot_key(request, spider)
|
||||
if key not in self.slots:
|
||||
conc = self.ip_concurrency if self.ip_concurrency else self.domain_concurrency
|
||||
conc, delay = _get_concurrency_delay(conc, spider, self.settings)
|
||||
self.slots[key] = Slot(conc, delay, self.settings)
|
||||
|
||||
return key, self.slots[key]
|
||||
|
||||
def _get_slot_key(self, request, spider):
|
||||
if 'download_slot' in request.meta:
|
||||
return request.meta['download_slot']
|
||||
|
||||
key = urlparse_cached(request).hostname or ''
|
||||
if self.ip_concurrency:
|
||||
key = dnscache.get(key, key)
|
||||
if key not in self.slots:
|
||||
if key in self.inactive_slots:
|
||||
self.slots[key] = self.inactive_slots.pop(key)
|
||||
else:
|
||||
if self.ip_concurrency:
|
||||
concurrency = self.ip_concurrency
|
||||
else:
|
||||
concurrency = self.domain_concurrency
|
||||
concurrency, delay = _get_concurrency_delay(concurrency, spider, self.settings)
|
||||
self.slots[key] = Slot(concurrency, delay, self.settings)
|
||||
return key, self.slots[key]
|
||||
|
||||
def _enqueue_request(self, request, spider, slot):
|
||||
def _downloaded(response):
|
||||
self.signals.send_catch_log(signal=signals.response_downloaded, \
|
||||
response=response, request=request, spider=spider)
|
||||
return key
|
||||
|
||||
def _enqueue_request(self, request, spider):
|
||||
key, slot = self._get_slot(request, spider)
|
||||
request.meta['download_slot'] = key
|
||||
|
||||
def _deactivate(response):
|
||||
slot.active.remove(request)
|
||||
return response
|
||||
|
||||
deferred = defer.Deferred().addCallback(_downloaded)
|
||||
slot.active.add(request)
|
||||
deferred = defer.Deferred().addBoth(_deactivate)
|
||||
slot.queue.append((request, deferred))
|
||||
self._process_queue(spider, slot)
|
||||
return deferred
|
||||
@ -152,17 +147,28 @@ class Downloader(object):
|
||||
# 1. Create the download deferred
|
||||
dfd = mustbe_deferred(self.handlers.download_request, request, spider)
|
||||
|
||||
# 2. After response arrives, remove the request from transferring
|
||||
# 2. Notify response_downloaded listeners about the recent download
|
||||
# before querying queue for next request
|
||||
def _downloaded(response):
|
||||
self.signals.send_catch_log(signal=signals.response_downloaded,
|
||||
response=response,
|
||||
request=request,
|
||||
spider=spider)
|
||||
return response
|
||||
dfd.addCallback(_downloaded)
|
||||
|
||||
# 3. After response arrives, remove the request from transferring
|
||||
# state to free up the transferring slot so it can be used by the
|
||||
# following requests (perhaps those which came from the downloader
|
||||
# middleware itself)
|
||||
slot.transferring.add(request)
|
||||
|
||||
def finish_transferring(_):
|
||||
slot.transferring.remove(request)
|
||||
self._process_queue(spider, slot)
|
||||
return _
|
||||
|
||||
return dfd.addBoth(finish_transferring)
|
||||
|
||||
def is_idle(self):
|
||||
return not self.slots
|
||||
|
||||
|
@ -154,7 +154,7 @@ class ExecutionEngine(object):
|
||||
scraper_idle = spider in self.scraper.slots \
|
||||
and self.scraper.slots[spider].is_idle()
|
||||
pending = self.slots[spider].scheduler.has_pending_requests()
|
||||
downloading = bool(self.downloader.slots)
|
||||
downloading = bool(self.downloader.active)
|
||||
idle = scraper_idle and not (pending or downloading)
|
||||
return idle
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user