mirror of
https://github.com/scrapy/scrapy.git
synced 2025-02-21 06:52:59 +00:00
Merge pull request #4259 from scrapy/asyncio-mw
Asyncio support in downloader middlewares
This commit is contained in:
commit
ce618fb6f2
@ -17,7 +17,7 @@ matrix:
|
||||
- env: TOXENV=pinned
|
||||
python: 3.5
|
||||
- env: TOXENV=py35-asyncio
|
||||
python: 3.5
|
||||
python: 3.5.2
|
||||
- env: TOXENV=py36
|
||||
python: 3.6
|
||||
- env: TOXENV=py37
|
||||
|
@ -8,7 +8,7 @@ from twisted.internet import defer
|
||||
from scrapy.exceptions import _InvalidOutput
|
||||
from scrapy.http import Request, Response
|
||||
from scrapy.middleware import MiddlewareManager
|
||||
from scrapy.utils.defer import mustbe_deferred
|
||||
from scrapy.utils.defer import mustbe_deferred, deferred_from_coro
|
||||
from scrapy.utils.conf import build_component_list
|
||||
|
||||
|
||||
@ -33,7 +33,7 @@ class DownloaderMiddlewareManager(MiddlewareManager):
|
||||
@defer.inlineCallbacks
|
||||
def process_request(request):
|
||||
for method in self.methods['process_request']:
|
||||
response = yield method(request=request, spider=spider)
|
||||
response = yield deferred_from_coro(method(request=request, spider=spider))
|
||||
if response is not None and not isinstance(response, (Response, Request)):
|
||||
raise _InvalidOutput('Middleware %s.process_request must return None, Response or Request, got %s' % \
|
||||
(method.__self__.__class__.__name__, response.__class__.__name__))
|
||||
@ -48,7 +48,7 @@ class DownloaderMiddlewareManager(MiddlewareManager):
|
||||
defer.returnValue(response)
|
||||
|
||||
for method in self.methods['process_response']:
|
||||
response = yield method(request=request, response=response, spider=spider)
|
||||
response = yield deferred_from_coro(method(request=request, response=response, spider=spider))
|
||||
if not isinstance(response, (Response, Request)):
|
||||
raise _InvalidOutput('Middleware %s.process_response must return Response or Request, got %s' % \
|
||||
(method.__self__.__class__.__name__, type(response)))
|
||||
@ -60,7 +60,7 @@ class DownloaderMiddlewareManager(MiddlewareManager):
|
||||
def process_exception(_failure):
|
||||
exception = _failure.value
|
||||
for method in self.methods['process_exception']:
|
||||
response = yield method(request=request, exception=exception, spider=spider)
|
||||
response = yield deferred_from_coro(method(request=request, exception=exception, spider=spider))
|
||||
if response is not None and not isinstance(response, (Response, Request)):
|
||||
raise _InvalidOutput('Middleware %s.process_exception must return None, Response or Request, got %s' % \
|
||||
(method.__self__.__class__.__name__, type(response)))
|
||||
|
@ -1,10 +1,14 @@
|
||||
"""
|
||||
Helper functions for dealing with Twisted deferreds
|
||||
"""
|
||||
import asyncio
|
||||
import inspect
|
||||
|
||||
from twisted.internet import defer, task
|
||||
from twisted.python import failure
|
||||
|
||||
from scrapy.exceptions import IgnoreRequest
|
||||
from scrapy.utils.asyncio import is_asyncio_reactor_installed
|
||||
|
||||
|
||||
def defer_fail(_failure):
|
||||
@ -114,3 +118,25 @@ def iter_errback(iterable, errback, *a, **kw):
|
||||
break
|
||||
except Exception:
|
||||
errback(failure.Failure(), *a, **kw)
|
||||
|
||||
|
||||
def _isfuture(o):
|
||||
# workaround for Python before 3.5.3 not having asyncio.isfuture
|
||||
if hasattr(asyncio, 'isfuture'):
|
||||
return asyncio.isfuture(o)
|
||||
return isinstance(o, asyncio.Future)
|
||||
|
||||
|
||||
def deferred_from_coro(o):
|
||||
"""Converts a coroutine into a Deferred, or returns the object as is if it isn't a coroutine"""
|
||||
if isinstance(o, defer.Deferred):
|
||||
return o
|
||||
if _isfuture(o) or inspect.isawaitable(o):
|
||||
if not is_asyncio_reactor_installed():
|
||||
# wrapping the coroutine directly into a Deferred, this doesn't work correctly with coroutines
|
||||
# that use asyncio, e.g. "await asyncio.sleep(1)"
|
||||
return defer.ensureDeferred(o)
|
||||
else:
|
||||
# wrapping the coroutine into a Future and then into a Deferred, this requires AsyncioSelectorReactor
|
||||
return defer.Deferred.fromFuture(asyncio.ensure_future(o))
|
||||
return o
|
||||
|
@ -1,7 +1,7 @@
|
||||
"""
|
||||
This module contains some assorted functions used in tests
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from importlib import import_module
|
||||
@ -96,3 +96,10 @@ def assert_samelines(testcase, text1, text2, msg=None):
|
||||
line endings between platforms
|
||||
"""
|
||||
testcase.assertEqual(text1.splitlines(), text2.splitlines(), msg)
|
||||
|
||||
|
||||
def get_from_asyncio_queue(value):
|
||||
q = asyncio.Queue()
|
||||
getter = q.get()
|
||||
q.put_nowait(value)
|
||||
return getter
|
||||
|
@ -1,5 +1,8 @@
|
||||
import asyncio
|
||||
from unittest import mock
|
||||
|
||||
from pytest import mark
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import Deferred
|
||||
from twisted.trial.unittest import TestCase
|
||||
from twisted.python.failure import Failure
|
||||
@ -8,7 +11,7 @@ from scrapy.http import Request, Response
|
||||
from scrapy.spiders import Spider
|
||||
from scrapy.exceptions import _InvalidOutput
|
||||
from scrapy.core.downloader.middleware import DownloaderMiddlewareManager
|
||||
from scrapy.utils.test import get_crawler
|
||||
from scrapy.utils.test import get_crawler, get_from_asyncio_queue
|
||||
from scrapy.utils.python import to_bytes
|
||||
|
||||
|
||||
@ -206,3 +209,47 @@ class MiddlewareUsingDeferreds(ManagerTestCase):
|
||||
|
||||
self.assertIs(results[0], resp)
|
||||
self.assertFalse(download_func.called)
|
||||
|
||||
|
||||
class MiddlewareUsingCoro(ManagerTestCase):
|
||||
"""Middlewares using asyncio coroutines should work"""
|
||||
|
||||
def test_asyncdef(self):
|
||||
resp = Response('http://example.com/index.html')
|
||||
|
||||
class CoroMiddleware:
|
||||
async def process_request(self, request, spider):
|
||||
await defer.succeed(42)
|
||||
return resp
|
||||
|
||||
self.mwman._add_middleware(CoroMiddleware())
|
||||
req = Request('http://example.com/index.html')
|
||||
download_func = mock.MagicMock()
|
||||
dfd = self.mwman.download(download_func, req, self.spider)
|
||||
results = []
|
||||
dfd.addBoth(results.append)
|
||||
self._wait(dfd)
|
||||
|
||||
self.assertIs(results[0], resp)
|
||||
self.assertFalse(download_func.called)
|
||||
|
||||
@mark.only_asyncio()
|
||||
def test_asyncdef_asyncio(self):
|
||||
resp = Response('http://example.com/index.html')
|
||||
|
||||
class CoroMiddleware:
|
||||
async def process_request(self, request, spider):
|
||||
await asyncio.sleep(0.1)
|
||||
result = await get_from_asyncio_queue(resp)
|
||||
return result
|
||||
|
||||
self.mwman._add_middleware(CoroMiddleware())
|
||||
req = Request('http://example.com/index.html')
|
||||
download_func = mock.MagicMock()
|
||||
dfd = self.mwman.download(download_func, req, self.spider)
|
||||
results = []
|
||||
dfd.addBoth(results.append)
|
||||
self._wait(dfd)
|
||||
|
||||
self.assertIs(results[0], resp)
|
||||
self.assertFalse(download_func.called)
|
||||
|
Loading…
x
Reference in New Issue
Block a user