1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-14 13:32:05 +00:00

Add a test for an async item_scraped handler.

This commit is contained in:
Andrey Rakhmatullin 2020-02-06 22:39:00 +05:00
parent 7687564c73
commit 489ffcda51

39
tests/test_signals.py Normal file
View File

@ -0,0 +1,39 @@
from twisted.internet import defer
from twisted.trial import unittest
from scrapy import signals, Request, Spider
from scrapy.utils.test import get_crawler
from tests.mockserver import MockServer
class ItemSpider(Spider):
name = 'itemspider'
def start_requests(self):
for _ in range(10):
yield Request(self.mockserver.url('/status?n=200'),
dont_filter=True)
def parse(self, response):
return {'field': 42}
class AsyncSignalTestCase(unittest.TestCase):
def setUp(self):
self.mockserver = MockServer()
self.mockserver.__enter__()
self.items = []
def tearDown(self):
self.mockserver.__exit__(None, None, None)
async def _on_item_scraped(self, item):
self.items.append(item)
@defer.inlineCallbacks
def test_simple_pipeline(self):
crawler = get_crawler(ItemSpider)
crawler.signals.connect(self._on_item_scraped, signals.item_scraped)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 10)