diff --git a/scrapy/trunk/scrapy/core/engine.py b/scrapy/trunk/scrapy/core/engine.py index 78e7666b5..625b34d8f 100644 --- a/scrapy/trunk/scrapy/core/engine.py +++ b/scrapy/trunk/scrapy/core/engine.py @@ -22,7 +22,7 @@ from scrapy.item import ScrapedItem from scrapy.item.pipeline import ItemPipelineManager from scrapy.spider import spiders from scrapy.spider.middleware import SpiderMiddlewareManager -from scrapy.utils.defer import chain_deferred, defer_succeed, mustbe_deferred, deferred_degenerate +from scrapy.utils.defer import chain_deferred, defer_succeed, mustbe_deferred, deferred_imap from scrapy.utils.request import request_info class ExecutionEngine(object): @@ -255,12 +255,8 @@ class ExecutionEngine(object): else: log.msg("Spider must return Request, ScrapedItem or None, got '%s' while processing %s" % (type(item).__name__, request), log.WARNING, domain=domain) - class _ResultContainer(object): - def append(self, item): - _onsuccess_per_item(item) - def _onsuccess(result): - return deferred_degenerate(result, _ResultContainer()) + return deferred_imap(_onsuccess_per_item, result) def _onerror(_failure): if not isinstance(_failure.value, IgnoreRequest): diff --git a/scrapy/trunk/scrapy/tests/test_utils_defer.py b/scrapy/trunk/scrapy/tests/test_utils_defer.py new file mode 100644 index 000000000..2248c0032 --- /dev/null +++ b/scrapy/trunk/scrapy/tests/test_utils_defer.py @@ -0,0 +1,37 @@ +from itertools import imap +from twisted.trial import unittest + +from scrapy.utils.defer import deferred_imap + + +class DeferTest(unittest.TestCase): + def test_deferred_imap_1(self): + """deferred_imap storing results""" + seq = [1, 2, 3] + output = list(imap(None, seq)) + + dfd = deferred_imap(None, seq) + dfd.addCallback(self.assertEqual, output) + return dfd + + def test_deferred_imap_2(self): + """deferred_imap not storing results""" + seq = [1, 2, 3] + output = list(imap(None, seq, seq)) + + dfd = deferred_imap(None, seq, seq) + dfd.addCallback(self.assertEqual, output) + return dfd + + def test_deferred_imap_3(self): + """deferred_imap not storing results""" + seq = [1, 2, 3] + output = [] + function = lambda v: output.append(v) + + dfd = deferred_imap(function, seq, store_results=False) + dfd.addCallback(self.assertEqual, []) + dfd.addCallback(lambda _: self.assertEqual(output, seq)) + return dfd + + diff --git a/scrapy/trunk/scrapy/utils/defer.py b/scrapy/trunk/scrapy/utils/defer.py index 453d7320d..1a30087f1 100644 --- a/scrapy/trunk/scrapy/utils/defer.py +++ b/scrapy/trunk/scrapy/utils/defer.py @@ -2,6 +2,7 @@ Helper functions for dealing with Twisted deferreds """ +from itertools import imap from twisted.internet import defer, reactor from twisted.python import failure @@ -63,19 +64,38 @@ def lambda_deferred(func): return d return deferred.addCallbacks(_success, _fail) -def deferred_degenerate(generator, container=None, next_delay=0): - generator = iter(generator or []) +def deferred_imap(function, *sequences, **kwargs): + """Analog to itertools.imap python function but friendly iterable evaluation + taking in count cooperative multitasking. + + It returns a Deferred object that is fired when StopIteration is reached or + when any exception is raised when calling function. + + By default the output of the evaluation is collected into a list and + returned as deferred result when iterable finished. But it can be disabled + (to save memory) using `store_results` parameter. + + """ + + next_delay = kwargs.pop('next_delay', 0) + store_results = kwargs.pop('store_results', True) + deferred = defer.Deferred() - container = container or [] + container = [] + + iterator = imap(function, *sequences) + def _next(): try: - container.append(generator.next()) + value = iterator.next() + if store_results: + container.append(value) except StopIteration: reactor.callLater(0, deferred.callback, container) except: reactor.callLater(0, deferred.errback, failure.Failure()) else: reactor.callLater(next_delay, _next) + _next() return deferred -