1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-06 11:00:46 +00:00
scrapy/tests/test_pipelines.py
2025-02-02 14:10:09 +05:00

134 lines
3.8 KiB
Python

import asyncio
from pytest import mark
from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.trial import unittest
from scrapy import Request, Spider, signals
from scrapy.utils.defer import deferred_to_future, maybe_deferred_to_future
from scrapy.utils.test import get_crawler, get_from_asyncio_queue
from tests.mockserver import MockServer
class SimplePipeline:
def process_item(self, item, spider):
item["pipeline_passed"] = True
return item
class DeferredPipeline:
def cb(self, item):
item["pipeline_passed"] = True
return item
def process_item(self, item, spider):
d = Deferred()
d.addCallback(self.cb)
d.callback(item)
return d
class AsyncDefPipeline:
async def process_item(self, item, spider):
d = Deferred()
from twisted.internet import reactor
reactor.callLater(0, d.callback, None)
await maybe_deferred_to_future(d)
item["pipeline_passed"] = True
return item
class AsyncDefAsyncioPipeline:
async def process_item(self, item, spider):
d = Deferred()
from twisted.internet import reactor
reactor.callLater(0, d.callback, None)
await deferred_to_future(d)
await asyncio.sleep(0.2)
item["pipeline_passed"] = await get_from_asyncio_queue(True)
return item
class AsyncDefNotAsyncioPipeline:
async def process_item(self, item, spider):
d1 = Deferred()
from twisted.internet import reactor
reactor.callLater(0, d1.callback, None)
await d1
d2 = Deferred()
reactor.callLater(0, d2.callback, None)
await maybe_deferred_to_future(d2)
item["pipeline_passed"] = True
return item
class ItemSpider(Spider):
name = "itemspider"
def start_requests(self):
yield Request(self.mockserver.url("/status?n=200"))
def parse(self, response):
return {"field": 42}
class PipelineTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mockserver = MockServer()
cls.mockserver.__enter__()
@classmethod
def tearDownClass(cls):
cls.mockserver.__exit__(None, None, None)
def _on_item_scraped(self, item):
self.assertIsInstance(item, dict)
self.assertTrue(item.get("pipeline_passed"))
self.items.append(item)
def _create_crawler(self, pipeline_class):
settings = {
"ITEM_PIPELINES": {pipeline_class: 1},
}
crawler = get_crawler(ItemSpider, settings)
crawler.signals.connect(self._on_item_scraped, signals.item_scraped)
self.items = []
return crawler
@defer.inlineCallbacks
def test_simple_pipeline(self):
crawler = self._create_crawler(SimplePipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)
@defer.inlineCallbacks
def test_deferred_pipeline(self):
crawler = self._create_crawler(DeferredPipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)
@defer.inlineCallbacks
def test_asyncdef_pipeline(self):
crawler = self._create_crawler(AsyncDefPipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)
@mark.only_asyncio()
@defer.inlineCallbacks
def test_asyncdef_asyncio_pipeline(self):
crawler = self._create_crawler(AsyncDefAsyncioPipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)
@mark.only_not_asyncio()
@defer.inlineCallbacks
def test_asyncdef_not_asyncio_pipeline(self):
crawler = self._create_crawler(AsyncDefNotAsyncioPipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)