diff --git a/scrapy/commands/queue.py b/scrapy/commands/queue.py index fb6ee594e..b2105f779 100644 --- a/scrapy/commands/queue.py +++ b/scrapy/commands/queue.py @@ -1,47 +1,64 @@ +from twisted.internet import reactor, threads + from scrapy.command import ScrapyCommand from scrapy.commands import runserver from scrapy.exceptions import UsageError -from scrapy.conf import settings +from scrapy.utils.conf import arglist_to_dict class Command(runserver.Command): requires_project = True - default_settings = {'LOG_ENABLED': False} + default_settings = {'LOG_LEVEL': 'WARNING'} def syntax(self): - return "[options] " + return "[options] " def short_desc(self): return "Control execution queue" def add_options(self, parser): ScrapyCommand.add_options(self, parser) - parser.add_option("--priority", dest="priority", type="float", default=0.0, \ - help="priority to use for adding spiders") parser.add_option("-a", "--arg", dest="spargs", action="append", default=[], \ - help="spider arguments to use for adding spiders") + help="set spider argument (may be repeated)") + + def process_options(self, args, opts): + ScrapyCommand.process_options(self, args, opts) + try: + opts.spargs = arglist_to_dict(opts.spargs) + except ValueError: + raise UsageError("Invalid -a value, use -a NAME=VALUE", print_help=False) def run(self, args, opts): if len(args) < 1: raise UsageError() cmd = args[0] - botname = settings['BOT_NAME'] - queue = self.crawler.queue.queue + q = self.crawler.queue._queue + if cmd == 'add' and len(args) < 2: + raise UsageError() + + d = threads.deferToThread(self._run_in_thread, args, opts, q, cmd) + d.addBoth(lambda _: reactor.stop()) + from scrapy import log + log.start() + reactor.run() + + def _run_in_thread(self, args, opts, q, cmd): if cmd == 'add': - if len(args) < 2: - raise UsageError() - msg = dict(x for x in [x.split('=', 1) for x in opts.spargs]) for x in args[1:]: - msg.update(name=x) - queue.put(msg) - print "Added (priority=%s): %s" % (opts.priority, msg) + self._call(q.add, x, **opts.spargs) + print "Added: name=%s args=%s" % (x, opts.spargs) elif cmd == 'list': - for x, y in queue: - print "(priority=%s) %s" % (y, x) + x = self._call(q.list) + print "\n".join(map(str, x)) + elif cmd == 'count': + print self._call(q.count) elif cmd == 'clear': - queue.clear() - print "Cleared %s queue" % botname + self._call(q.clear) else: raise UsageError() + + def _call(self, f, *a, **kw): + return threads.blockingCallFromThread(reactor, f, *a, **kw) + diff --git a/scrapy/commands/runserver.py b/scrapy/commands/runserver.py index 21a36fe59..7f4dc970b 100644 --- a/scrapy/commands/runserver.py +++ b/scrapy/commands/runserver.py @@ -4,13 +4,10 @@ from scrapy.conf import settings class Command(ScrapyCommand): requires_project = True + default_settings = {'KEEP_ALIVE': True} def short_desc(self): return "Start Scrapy in server mode" - def process_options(self, args, opts): - super(Command, self).process_options(args, opts) - settings.overrides['QUEUE_CLASS'] = settings['SERVER_QUEUE_CLASS'] - def run(self, args, opts): self.crawler.start() diff --git a/scrapy/commands/shell.py b/scrapy/commands/shell.py index bf2e88d65..20a82c6d9 100644 --- a/scrapy/commands/shell.py +++ b/scrapy/commands/shell.py @@ -10,7 +10,7 @@ from scrapy.shell import Shell class Command(ScrapyCommand): requires_project = False - default_settings = {'QUEUE_CLASS': 'scrapy.queue.KeepAliveExecutionQueue'} + default_settings = {'KEEP_ALIVE': True} def syntax(self): return "[url|file]" diff --git a/scrapy/commands/startproject.py b/scrapy/commands/startproject.py index 0b39630ac..a4f615f6e 100644 --- a/scrapy/commands/startproject.py +++ b/scrapy/commands/startproject.py @@ -24,7 +24,6 @@ IGNORE = ignore_patterns('*.pyc', '.svn') class Command(ScrapyCommand): requires_project = False - default_settings = {'LOG_ENABLED': False} def syntax(self): return "" diff --git a/scrapy/conf/default_settings.py b/scrapy/conf/default_settings.py index eed71eda9..8a2b180ef 100644 --- a/scrapy/conf/default_settings.py +++ b/scrapy/conf/default_settings.py @@ -150,6 +150,8 @@ ITEM_PROCESSOR = 'scrapy.contrib.pipeline.ItemPipelineManager' # Item pipelines are typically set in specific commands settings ITEM_PIPELINES = [] +KEEP_ALIVE = False + LOG_ENABLED = True LOG_ENCODING = 'utf-8' LOG_FORMATTER = 'scrapy.logformatter.LogFormatter' @@ -175,8 +177,6 @@ MEMUSAGE_WARNING_MB = 0 NEWSPIDER_MODULE = '' -QUEUE_CLASS = 'scrapy.queue.ExecutionQueue' - RANDOMIZE_DOWNLOAD_DELAY = True REDIRECT_MAX_METAREFRESH_DELAY = 100 @@ -209,8 +209,6 @@ SCHEDULER_MIDDLEWARES_BASE = { SCHEDULER_ORDER = 'DFO' -SERVER_QUEUE_CLASS = 'scrapy.contrib.queue.SqliteExecutionQueue' - SPIDER_MANAGER_CLASS = 'scrapy.spidermanager.SpiderManager' SPIDER_MIDDLEWARES = {} @@ -228,6 +226,8 @@ SPIDER_MIDDLEWARES_BASE = { SPIDER_MODULES = [] +SPIDER_QUEUE_CLASS = 'scrapy.spiderqueue.SqliteSpiderQueue' + SQLITE_DB = 'scrapy.db' SQS_QUEUE = 'scrapy' diff --git a/scrapy/contrib/queue/__init__.py b/scrapy/contrib/queue/__init__.py deleted file mode 100644 index c98f6e730..000000000 --- a/scrapy/contrib/queue/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -from scrapy.queue import ExecutionQueue -from scrapy.utils.sqlite import JsonSqlitePriorityQueue -from scrapy.conf import settings - -class SqliteExecutionQueue(ExecutionQueue): - - queue_class = JsonSqlitePriorityQueue - - def __init__(self, *a, **kw): - super(SqliteExecutionQueue, self).__init__(*a, **kw) - self.queue = JsonSqlitePriorityQueue(settings['SQLITE_DB']) - - def _append_next(self): - msg = self.queue.pop() - if msg: - name = msg.pop('name') - self.append_spider_name(name, **msg) - - def is_finished(self): - return False diff --git a/scrapy/contrib/queue/sqs.py b/scrapy/contrib/queue/sqs.py deleted file mode 100644 index f3ccb816d..000000000 --- a/scrapy/contrib/queue/sqs.py +++ /dev/null @@ -1,49 +0,0 @@ -import threading - -from twisted.internet import threads -from boto.sqs.connection import SQSConnection -from boto.sqs import regions - -from scrapy.queue import ExecutionQueue -from scrapy.utils.py26 import json -from scrapy.conf import settings - -class SQSExecutionQueue(ExecutionQueue): - - polling_delay = settings.getint('SQS_POLLING_DELAY') - queue_name = settings['SQS_QUEUE'] - region_name = settings['SQS_REGION'] - visibility_timeout = settings.getint('SQS_VISIBILITY_TIMEOUT') - aws_access_key_id = settings['AWS_ACCESS_KEY_ID'] - aws_secret_access_key = settings['AWS_SECRET_ACCESS_KEY'] - - def __init__(self, *a, **kw): - super(SQSExecutionQueue, self).__init__(*a, **kw) - self.region = self._get_region() - self._tls = threading.local() - - def _append_next(self): - return threads.deferToThread(self._append_next_from_sqs) - - def _append_next_from_sqs(self): - q = self._get_sqs_queue() - msgs = q.get_messages(1, visibility_timeout=self.visibility_timeout) - if msgs: - msg = msgs[0] - msg.delete() - spargs = json.loads(msg.get_body()) - spname = spargs.pop('name') - self.append_spider_name(spname, **spargs) - - def _get_sqs_queue(self): - if not hasattr(self._tls, 'queue'): - c = SQSConnection(self.aws_access_key_id, self.aws_secret_access_key, \ - region=self.region) - self._tls.queue = c.create_queue(self.queue_name) - return self._tls.queue - - def _get_region(self, name=region_name): - return [r for r in regions() if r.name == name][0] - - def is_finished(self): - return False diff --git a/scrapy/contrib/spiderqueue.py b/scrapy/contrib/spiderqueue.py new file mode 100644 index 000000000..3f972b7d9 --- /dev/null +++ b/scrapy/contrib/spiderqueue.py @@ -0,0 +1,83 @@ +from zope.interface import implements + +from twisted.internet import threads +from boto.sqs.connection import SQSConnection +from boto.sqs.message import Message +from boto.sqs import regions + +from scrapy.interfaces import ISpiderQueue +from scrapy.utils.py26 import json + +class SQSSpiderQueue(object): + + implements(ISpiderQueue) + + def __init__(self, *a, **kw): + self.polling_delay = kw.pop('polling_delay', 30) + self.queue_name = kw.pop('queue_name', 'scrapy') + self.region_name = kw.pop('region_name', 'us-east-1') + self.visibility_timeout = kw.pop('visibility_timeout', 7200) + self.aws_access_key_id = kw.pop('aws_access_key_id', None) + self.aws_secret_access_key = kw.pop('aws_secret_access_key', None) + self.region = self._get_region(self.region_name) + self.conn.create_queue(self.queue_name) + super(SQSSpiderQueue, self).__init__(*a, **kw) + + @classmethod + def from_settings(cls, settings): + return cls( + polling_delay=settings.getint('SQS_POLLING_DELAY'), + queue_name=settings['SQS_QUEUE'], + region_name=settings['SQS_REGION'], + visibility_timeout=settings.getint('SQS_VISIBILITY_TIMEOUT'), + aws_access_key_id=settings['AWS_ACCESS_KEY_ID'], + aws_secret_access_key=settings['AWS_SECRET_ACCESS_KEY'] + ) + + def _get_region(self, name): + return [r for r in regions() if r.name == name][0] + + @property + def conn(self): + return SQSConnection(self.aws_access_key_id, self.aws_secret_access_key, \ + region=self.region) + + @property + def queue(self): + return self.conn.get_queue(self.queue_name) + + def _queue_method(self, method, *a, **kw): + return getattr(self.queue, method)(*a, **kw) + + def pop(self): + return threads.deferToThread(self._pop) + + def _pop(self): + msgs = self.queue.get_messages(1, visibility_timeout=self.visibility_timeout) + if msgs: + msg = msgs[0] + msg.delete() + return json.loads(msg.get_body()) + + def add(self, name, **spider_args): + d = spider_args.copy() + d['name'] = name + msg = Message(body=json.dumps(d)) + return threads.deferToThread(self._queue_method, 'write', msg) + + def count(self): + return threads.deferToThread(self._queue_method, 'count') + + def list(self): + return threads.deferToThread(self._list) + + def _list(self): + msgs = [] + m = self.queue.read(visibility_timeout=100) + while m: + msgs.append(json.loads(m.get_body())) + m = self.queue.read(visibility_timeout=100) + return msgs + + def clear(self): + return threads.deferToThread(self._queue_method, 'clear') diff --git a/scrapy/crawler.py b/scrapy/crawler.py index ccc4f6d33..1d5d17366 100644 --- a/scrapy/crawler.py +++ b/scrapy/crawler.py @@ -3,6 +3,7 @@ import signal from twisted.internet import reactor, defer from scrapy.xlib.pydispatch import dispatcher +from scrapy.queue import ExecutionQueue from scrapy.core.engine import ExecutionEngine from scrapy.extension import ExtensionManager from scrapy.utils.ossignal import install_shutdown_handlers, signal_names @@ -33,8 +34,10 @@ class Crawler(object): self.extensions = ExtensionManager.from_settings(self.settings) spman_cls = load_object(self.settings['SPIDER_MANAGER_CLASS']) self.spiders = spman_cls.from_settings(self.settings) - queue_cls = load_object(self.settings['QUEUE_CLASS']) - self.queue = queue_cls(self.spiders) + spq_cls = load_object(self.settings['SPIDER_QUEUE_CLASS']) + spq = spq_cls.from_settings(self.settings) + keepalive = self.settings.getbool('KEEP_ALIVE') + self.queue = ExecutionQueue(self.spiders, spq, keepalive) self.engine = ExecutionEngine(self.settings, self._spider_closed) @defer.inlineCallbacks diff --git a/scrapy/interfaces.py b/scrapy/interfaces.py new file mode 100644 index 000000000..ddf94932b --- /dev/null +++ b/scrapy/interfaces.py @@ -0,0 +1,35 @@ +from zope.interface import Interface + +class ISpiderQueue(Interface): + + def from_settings(settings): + """Class method to instantiate from settings""" + + def add(name, **spider_args): + """Add a spider to the queue given its name a some spider arguments. + + This method can return a deferred. """ + + def pop(): + """Pop the next mesasge from the queue. The messages is a dict + conaining a key 'name' with the spider name and other keys as spider + attributes. + + This method can return a deferred. """ + + def list(): + """Return a list with the messages in the queue. Each message is a dict + which must have a 'name' key (with the spider name), and other optional + keys that will be used as spider arguments, to create the spider. + + This method can return a deferred. """ + + def count(): + """Return the number of spiders in the queue. + + This method can return a deferred. """ + + def clear(): + """Clear the queue. + + This method can return a deferred. """ diff --git a/scrapy/queue.py b/scrapy/queue.py index 89a95e381..fbb486038 100644 --- a/scrapy/queue.py +++ b/scrapy/queue.py @@ -9,16 +9,21 @@ class ExecutionQueue(object): polling_delay = 5 - def __init__(self, _spiders): + def __init__(self, spiders, queue, keepalive=False): self.spider_requests = [] - self._spiders = _spiders + self._spiders = spiders + self._queue = queue + self._keepalive = keepalive def _append_next(self): """Called when there are no more items left in self.spider_requests. This method is meant to be overriden in subclasses to add new (spider, requests) tuples to self.spider_requests. It can return a Deferred. """ - pass + msg = self._queue.pop() + if msg: + name = msg.pop('name') + self.append_spider_name(name, **msg) def get_next(self): """Return a tuple (spider, requests) containing a list of Requests and @@ -44,7 +49,7 @@ class ExecutionQueue(object): spiders to crawl (this is for one-shot runs). If it returns ``False`` Scrapy will keep polling this queue for new requests to scrape """ - return not bool(self.spider_requests) + return not self._keepalive and not bool(self.spider_requests) def append_spider(self, spider): """Append a Spider to crawl""" @@ -82,9 +87,3 @@ class ExecutionQueue(object): log.msg('Unable to find spider: %s' % name, log.ERROR) else: self.append_spider(spider) - - -class KeepAliveExecutionQueue(ExecutionQueue): - - def is_finished(self): - return False diff --git a/scrapy/spiderqueue.py b/scrapy/spiderqueue.py new file mode 100644 index 000000000..780d99e26 --- /dev/null +++ b/scrapy/spiderqueue.py @@ -0,0 +1,35 @@ +from zope.interface import implements +from zope.interface.verify import verifyObject + +from scrapy.interfaces import ISpiderQueue +from scrapy.utils.sqlite import JsonSqlitePriorityQueue + + +class SqliteSpiderQueue(object): + + implements(ISpiderQueue) + + def __init__(self, database=None, table='spider_queue'): + self.q = JsonSqlitePriorityQueue(database, table) + + @classmethod + def from_settings(cls, settings): + return cls(settings['SQLITE_DB']) + + def add(self, name, **spider_args): + d = spider_args.copy() + d['name'] = name + priority = float(d.pop('priority', 0)) + self.q.put(d, priority) + + def pop(self): + return self.q.pop() + + def count(self): + return len(self.q) + + def list(self): + return [x[0] for x in self.q] + + def clear(self): + self.q.clear() diff --git a/scrapy/tests/test_contrib_spiderqueue.py b/scrapy/tests/test_contrib_spiderqueue.py new file mode 100644 index 000000000..a91c286fa --- /dev/null +++ b/scrapy/tests/test_contrib_spiderqueue.py @@ -0,0 +1,21 @@ +import os + +from twisted.trial import unittest +from zope.interface.verify import verifyObject + +from scrapy.interfaces import ISpiderQueue + +class SQSSpiderQueueTest(unittest.TestCase): + + try: + import boto + except ImportError, e: + skip = str(e) + + if 'AWS_ACCESS_KEY_ID' not in os.environ: + skip = "AWS keys not found" + + def test_interface(self): + from scrapy.contrib.spiderqueue import SQSSpiderQueue + verifyObject(ISpiderQueue, SQSSpiderQueue()) + diff --git a/scrapy/tests/test_queue.py b/scrapy/tests/test_queue.py index 38a294ad6..bf12c91b3 100644 --- a/scrapy/tests/test_queue.py +++ b/scrapy/tests/test_queue.py @@ -1,6 +1,6 @@ import unittest -from scrapy.queue import ExecutionQueue, KeepAliveExecutionQueue +from scrapy.queue import ExecutionQueue from scrapy.spider import BaseSpider from scrapy.http import Request @@ -27,10 +27,10 @@ class TestSpiderManager(object): class ExecutionQueueTest(unittest.TestCase): - queue_class = ExecutionQueue + keep_alive = False def setUp(self): - self.queue = self.queue_class(_spiders=TestSpiderManager()) + self.queue = ExecutionQueue(TestSpiderManager(), None, self.keep_alive) self.spider = TestSpider() self.request = Request('about:none') @@ -106,7 +106,7 @@ class ExecutionQueueTest(unittest.TestCase): class KeepAliveExecutionQueueTest(ExecutionQueueTest): - queue_class = KeepAliveExecutionQueue + keep_alive = True def test_is_finished(self): self.assert_(not self.queue.is_finished()) diff --git a/scrapy/tests/test_spiderqueue.py b/scrapy/tests/test_spiderqueue.py new file mode 100644 index 000000000..18cd6eb2d --- /dev/null +++ b/scrapy/tests/test_spiderqueue.py @@ -0,0 +1,12 @@ +import unittest + +from zope.interface.verify import verifyObject + +from scrapy.interfaces import ISpiderQueue +from scrapy.spiderqueue import SqliteSpiderQueue + +class SpiderQueueTest(unittest.TestCase): + + def test_interface(self): + verifyObject(ISpiderQueue, SqliteSpiderQueue()) + diff --git a/scrapy/utils/sqlite.py b/scrapy/utils/sqlite.py index 5451550dc..7d68c01a4 100644 --- a/scrapy/utils/sqlite.py +++ b/scrapy/utils/sqlite.py @@ -8,10 +8,10 @@ from scrapy.utils.py26 import json class SqliteDict(DictMixin): """SQLite-backed dictionary""" - def __init__(self, database=':memory:', table="dict"): - self.database = database + def __init__(self, database=None, table="dict"): + self.database = database or ':memory:' self.table = table - self.conn = sqlite3.connect(database) + self.conn = sqlite3.connect(self.database) q = "create table if not exists %s (key text primary key, value blob)" \ % table self.conn.execute(q) @@ -86,10 +86,10 @@ class SqlitePriorityQueue(object): providing atomic inter-process operations. """ - def __init__(self, database=':memory:', table="queue"): - self.database = database + def __init__(self, database=None, table="queue"): + self.database = database or ':memory:' self.table = table - self.conn = sqlite3.connect(database) + self.conn = sqlite3.connect(self.database) q = "create table if not exists %s (id integer primary key, " \ "priority real key, message blob)" % table self.conn.execute(q)