mirror of
https://github.com/scrapy/scrapy.git
synced 2025-02-25 23:04:16 +00:00
implement download timeouts based on deferred cancellation
This commit is contained in:
parent
495acba223
commit
7729f939e9
@ -12,6 +12,7 @@ from twisted.web.http_headers import Headers as TxHeaders
|
|||||||
from twisted.web.http import PotentialDataLoss
|
from twisted.web.http import PotentialDataLoss
|
||||||
from twisted.web.iweb import IBodyProducer
|
from twisted.web.iweb import IBodyProducer
|
||||||
from twisted.internet.endpoints import TCP4ClientEndpoint
|
from twisted.internet.endpoints import TCP4ClientEndpoint
|
||||||
|
from twisted.internet.error import TimeoutError
|
||||||
|
|
||||||
from scrapy.http import Headers
|
from scrapy.http import Headers
|
||||||
from scrapy.responsetypes import responsetypes
|
from scrapy.responsetypes import responsetypes
|
||||||
@ -49,20 +50,27 @@ class ScrapyAgent(object):
|
|||||||
self._pool = pool
|
self._pool = pool
|
||||||
|
|
||||||
def download_request(self, request):
|
def download_request(self, request):
|
||||||
|
timeout = request.meta.get('download_timeout') or self._connectTimeout
|
||||||
url = urldefrag(request.url)[0]
|
url = urldefrag(request.url)[0]
|
||||||
method = request.method
|
method = request.method
|
||||||
headers = TxHeaders(request.headers)
|
headers = TxHeaders(request.headers)
|
||||||
bodyproducer = _RequestBodyProducer(request.body) if request.body else None
|
bodyproducer = _RequestBodyProducer(request.body) if request.body else None
|
||||||
agent = self._get_agent(request)
|
agent = self._get_agent(request, timeout)
|
||||||
start_time = time()
|
start_time = time()
|
||||||
d = agent.request(method, url, headers, bodyproducer)
|
d = agent.request(method, url, headers, bodyproducer)
|
||||||
d.addBoth(self._download_latency, request, start_time)
|
d.addBoth(self._both_cb, request, start_time, url, timeout)
|
||||||
d.addCallback(self._agentrequest_downloaded, request)
|
d.addCallback(self._downloaded, request)
|
||||||
d.addErrback(self._agentrequest_failed, request)
|
self._timeout_cl = reactor.callLater(timeout, d.cancel)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _get_agent(self, request):
|
def _both_cb(self, result, request, start_time, url, timeout):
|
||||||
timeout = request.meta.get('download_timeout') or self._connectTimeout
|
request.meta['download_latency'] = time() - start_time
|
||||||
|
if self._timeout_cl.active():
|
||||||
|
self._timeout_cl.cancel()
|
||||||
|
return result
|
||||||
|
raise TimeoutError("Getting %s took longer than %s seconds." % (url, timeout))
|
||||||
|
|
||||||
|
def _get_agent(self, request, timeout):
|
||||||
bindaddress = request.meta.get('bindaddress') or self._bindAddress
|
bindaddress = request.meta.get('bindaddress') or self._bindAddress
|
||||||
proxy = request.meta.get('proxy')
|
proxy = request.meta.get('proxy')
|
||||||
if proxy:
|
if proxy:
|
||||||
@ -74,11 +82,7 @@ class ScrapyAgent(object):
|
|||||||
return self._Agent(reactor, contextFactory=self._contextFactory,
|
return self._Agent(reactor, contextFactory=self._contextFactory,
|
||||||
connectTimeout=timeout, bindAddress=bindaddress, pool=self._pool)
|
connectTimeout=timeout, bindAddress=bindaddress, pool=self._pool)
|
||||||
|
|
||||||
def _download_latency(self, any_, request, start_time):
|
def _downloaded(self, txresponse, request):
|
||||||
request.meta['download_latency'] = time() - start_time
|
|
||||||
return any_
|
|
||||||
|
|
||||||
def _agentrequest_downloaded(self, txresponse, request):
|
|
||||||
if txresponse.length == 0:
|
if txresponse.length == 0:
|
||||||
return self._build_response(('', None), txresponse, request)
|
return self._build_response(('', None), txresponse, request)
|
||||||
finished = defer.Deferred()
|
finished = defer.Deferred()
|
||||||
@ -95,11 +99,6 @@ class ScrapyAgent(object):
|
|||||||
respcls = responsetypes.from_args(headers=headers, url=url)
|
respcls = responsetypes.from_args(headers=headers, url=url)
|
||||||
return respcls(url=url, status=status, headers=headers, body=body)
|
return respcls(url=url, status=status, headers=headers, body=body)
|
||||||
|
|
||||||
def _agentrequest_failed(self, failure, request):
|
|
||||||
# be clear it is an HTTP failure with new downloader
|
|
||||||
log.err(failure, 'HTTP11 failure: %s' % request)
|
|
||||||
return failure
|
|
||||||
|
|
||||||
|
|
||||||
class _RequestBodyProducer(object):
|
class _RequestBodyProducer(object):
|
||||||
implements(IBodyProducer)
|
implements(IBodyProducer)
|
||||||
|
@ -105,7 +105,7 @@ class HttpTestCase(unittest.TestCase):
|
|||||||
return d
|
return d
|
||||||
|
|
||||||
def test_timeout_download_from_spider(self):
|
def test_timeout_download_from_spider(self):
|
||||||
request = Request(self.getURL('wait'), meta=dict(download_timeout=0.000001))
|
request = Request(self.getURL('wait'), meta=dict(download_timeout=0.1))
|
||||||
d = self.download_request(request, BaseSpider('foo'))
|
d = self.download_request(request, BaseSpider('foo'))
|
||||||
return self.assertFailure(d, defer.TimeoutError, error.TimeoutError)
|
return self.assertFailure(d, defer.TimeoutError, error.TimeoutError)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user