1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-25 13:23:54 +00:00

Execution Queue refactoring by taking out the queue backend to a new Spider

Queue API. Also ported SQS Execution Queue to Spider Queue API, and make the
scrapy queue command use the Spider Queue directly, with deferreds support.

Closes #220.
This commit is contained in:
Pablo Hoffman 2010-09-03 14:29:27 -03:00
parent 37776618a3
commit 7cfc379230
16 changed files with 251 additions and 119 deletions

View File

@ -1,47 +1,64 @@
from twisted.internet import reactor, threads
from scrapy.command import ScrapyCommand
from scrapy.commands import runserver
from scrapy.exceptions import UsageError
from scrapy.conf import settings
from scrapy.utils.conf import arglist_to_dict
class Command(runserver.Command):
requires_project = True
default_settings = {'LOG_ENABLED': False}
default_settings = {'LOG_LEVEL': 'WARNING'}
def syntax(self):
return "[options] <list|clear|add spider1 ..>"
return "[options] <list|clear|count|add spider1 ..>"
def short_desc(self):
return "Control execution queue"
def add_options(self, parser):
ScrapyCommand.add_options(self, parser)
parser.add_option("--priority", dest="priority", type="float", default=0.0, \
help="priority to use for adding spiders")
parser.add_option("-a", "--arg", dest="spargs", action="append", default=[], \
help="spider arguments to use for adding spiders")
help="set spider argument (may be repeated)")
def process_options(self, args, opts):
ScrapyCommand.process_options(self, args, opts)
try:
opts.spargs = arglist_to_dict(opts.spargs)
except ValueError:
raise UsageError("Invalid -a value, use -a NAME=VALUE", print_help=False)
def run(self, args, opts):
if len(args) < 1:
raise UsageError()
cmd = args[0]
botname = settings['BOT_NAME']
queue = self.crawler.queue.queue
q = self.crawler.queue._queue
if cmd == 'add':
if len(args) < 2:
if cmd == 'add' and len(args) < 2:
raise UsageError()
msg = dict(x for x in [x.split('=', 1) for x in opts.spargs])
d = threads.deferToThread(self._run_in_thread, args, opts, q, cmd)
d.addBoth(lambda _: reactor.stop())
from scrapy import log
log.start()
reactor.run()
def _run_in_thread(self, args, opts, q, cmd):
if cmd == 'add':
for x in args[1:]:
msg.update(name=x)
queue.put(msg)
print "Added (priority=%s): %s" % (opts.priority, msg)
self._call(q.add, x, **opts.spargs)
print "Added: name=%s args=%s" % (x, opts.spargs)
elif cmd == 'list':
for x, y in queue:
print "(priority=%s) %s" % (y, x)
x = self._call(q.list)
print "\n".join(map(str, x))
elif cmd == 'count':
print self._call(q.count)
elif cmd == 'clear':
queue.clear()
print "Cleared %s queue" % botname
self._call(q.clear)
else:
raise UsageError()
def _call(self, f, *a, **kw):
return threads.blockingCallFromThread(reactor, f, *a, **kw)

View File

@ -4,13 +4,10 @@ from scrapy.conf import settings
class Command(ScrapyCommand):
requires_project = True
default_settings = {'KEEP_ALIVE': True}
def short_desc(self):
return "Start Scrapy in server mode"
def process_options(self, args, opts):
super(Command, self).process_options(args, opts)
settings.overrides['QUEUE_CLASS'] = settings['SERVER_QUEUE_CLASS']
def run(self, args, opts):
self.crawler.start()

View File

@ -10,7 +10,7 @@ from scrapy.shell import Shell
class Command(ScrapyCommand):
requires_project = False
default_settings = {'QUEUE_CLASS': 'scrapy.queue.KeepAliveExecutionQueue'}
default_settings = {'KEEP_ALIVE': True}
def syntax(self):
return "[url|file]"

View File

@ -24,7 +24,6 @@ IGNORE = ignore_patterns('*.pyc', '.svn')
class Command(ScrapyCommand):
requires_project = False
default_settings = {'LOG_ENABLED': False}
def syntax(self):
return "<project_name>"

View File

@ -150,6 +150,8 @@ ITEM_PROCESSOR = 'scrapy.contrib.pipeline.ItemPipelineManager'
# Item pipelines are typically set in specific commands settings
ITEM_PIPELINES = []
KEEP_ALIVE = False
LOG_ENABLED = True
LOG_ENCODING = 'utf-8'
LOG_FORMATTER = 'scrapy.logformatter.LogFormatter'
@ -175,8 +177,6 @@ MEMUSAGE_WARNING_MB = 0
NEWSPIDER_MODULE = ''
QUEUE_CLASS = 'scrapy.queue.ExecutionQueue'
RANDOMIZE_DOWNLOAD_DELAY = True
REDIRECT_MAX_METAREFRESH_DELAY = 100
@ -209,8 +209,6 @@ SCHEDULER_MIDDLEWARES_BASE = {
SCHEDULER_ORDER = 'DFO'
SERVER_QUEUE_CLASS = 'scrapy.contrib.queue.SqliteExecutionQueue'
SPIDER_MANAGER_CLASS = 'scrapy.spidermanager.SpiderManager'
SPIDER_MIDDLEWARES = {}
@ -228,6 +226,8 @@ SPIDER_MIDDLEWARES_BASE = {
SPIDER_MODULES = []
SPIDER_QUEUE_CLASS = 'scrapy.spiderqueue.SqliteSpiderQueue'
SQLITE_DB = 'scrapy.db'
SQS_QUEUE = 'scrapy'

View File

@ -1,20 +0,0 @@
from scrapy.queue import ExecutionQueue
from scrapy.utils.sqlite import JsonSqlitePriorityQueue
from scrapy.conf import settings
class SqliteExecutionQueue(ExecutionQueue):
queue_class = JsonSqlitePriorityQueue
def __init__(self, *a, **kw):
super(SqliteExecutionQueue, self).__init__(*a, **kw)
self.queue = JsonSqlitePriorityQueue(settings['SQLITE_DB'])
def _append_next(self):
msg = self.queue.pop()
if msg:
name = msg.pop('name')
self.append_spider_name(name, **msg)
def is_finished(self):
return False

View File

@ -1,49 +0,0 @@
import threading
from twisted.internet import threads
from boto.sqs.connection import SQSConnection
from boto.sqs import regions
from scrapy.queue import ExecutionQueue
from scrapy.utils.py26 import json
from scrapy.conf import settings
class SQSExecutionQueue(ExecutionQueue):
polling_delay = settings.getint('SQS_POLLING_DELAY')
queue_name = settings['SQS_QUEUE']
region_name = settings['SQS_REGION']
visibility_timeout = settings.getint('SQS_VISIBILITY_TIMEOUT')
aws_access_key_id = settings['AWS_ACCESS_KEY_ID']
aws_secret_access_key = settings['AWS_SECRET_ACCESS_KEY']
def __init__(self, *a, **kw):
super(SQSExecutionQueue, self).__init__(*a, **kw)
self.region = self._get_region()
self._tls = threading.local()
def _append_next(self):
return threads.deferToThread(self._append_next_from_sqs)
def _append_next_from_sqs(self):
q = self._get_sqs_queue()
msgs = q.get_messages(1, visibility_timeout=self.visibility_timeout)
if msgs:
msg = msgs[0]
msg.delete()
spargs = json.loads(msg.get_body())
spname = spargs.pop('name')
self.append_spider_name(spname, **spargs)
def _get_sqs_queue(self):
if not hasattr(self._tls, 'queue'):
c = SQSConnection(self.aws_access_key_id, self.aws_secret_access_key, \
region=self.region)
self._tls.queue = c.create_queue(self.queue_name)
return self._tls.queue
def _get_region(self, name=region_name):
return [r for r in regions() if r.name == name][0]
def is_finished(self):
return False

View File

@ -0,0 +1,83 @@
from zope.interface import implements
from twisted.internet import threads
from boto.sqs.connection import SQSConnection
from boto.sqs.message import Message
from boto.sqs import regions
from scrapy.interfaces import ISpiderQueue
from scrapy.utils.py26 import json
class SQSSpiderQueue(object):
implements(ISpiderQueue)
def __init__(self, *a, **kw):
self.polling_delay = kw.pop('polling_delay', 30)
self.queue_name = kw.pop('queue_name', 'scrapy')
self.region_name = kw.pop('region_name', 'us-east-1')
self.visibility_timeout = kw.pop('visibility_timeout', 7200)
self.aws_access_key_id = kw.pop('aws_access_key_id', None)
self.aws_secret_access_key = kw.pop('aws_secret_access_key', None)
self.region = self._get_region(self.region_name)
self.conn.create_queue(self.queue_name)
super(SQSSpiderQueue, self).__init__(*a, **kw)
@classmethod
def from_settings(cls, settings):
return cls(
polling_delay=settings.getint('SQS_POLLING_DELAY'),
queue_name=settings['SQS_QUEUE'],
region_name=settings['SQS_REGION'],
visibility_timeout=settings.getint('SQS_VISIBILITY_TIMEOUT'),
aws_access_key_id=settings['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=settings['AWS_SECRET_ACCESS_KEY']
)
def _get_region(self, name):
return [r for r in regions() if r.name == name][0]
@property
def conn(self):
return SQSConnection(self.aws_access_key_id, self.aws_secret_access_key, \
region=self.region)
@property
def queue(self):
return self.conn.get_queue(self.queue_name)
def _queue_method(self, method, *a, **kw):
return getattr(self.queue, method)(*a, **kw)
def pop(self):
return threads.deferToThread(self._pop)
def _pop(self):
msgs = self.queue.get_messages(1, visibility_timeout=self.visibility_timeout)
if msgs:
msg = msgs[0]
msg.delete()
return json.loads(msg.get_body())
def add(self, name, **spider_args):
d = spider_args.copy()
d['name'] = name
msg = Message(body=json.dumps(d))
return threads.deferToThread(self._queue_method, 'write', msg)
def count(self):
return threads.deferToThread(self._queue_method, 'count')
def list(self):
return threads.deferToThread(self._list)
def _list(self):
msgs = []
m = self.queue.read(visibility_timeout=100)
while m:
msgs.append(json.loads(m.get_body()))
m = self.queue.read(visibility_timeout=100)
return msgs
def clear(self):
return threads.deferToThread(self._queue_method, 'clear')

View File

@ -3,6 +3,7 @@ import signal
from twisted.internet import reactor, defer
from scrapy.xlib.pydispatch import dispatcher
from scrapy.queue import ExecutionQueue
from scrapy.core.engine import ExecutionEngine
from scrapy.extension import ExtensionManager
from scrapy.utils.ossignal import install_shutdown_handlers, signal_names
@ -33,8 +34,10 @@ class Crawler(object):
self.extensions = ExtensionManager.from_settings(self.settings)
spman_cls = load_object(self.settings['SPIDER_MANAGER_CLASS'])
self.spiders = spman_cls.from_settings(self.settings)
queue_cls = load_object(self.settings['QUEUE_CLASS'])
self.queue = queue_cls(self.spiders)
spq_cls = load_object(self.settings['SPIDER_QUEUE_CLASS'])
spq = spq_cls.from_settings(self.settings)
keepalive = self.settings.getbool('KEEP_ALIVE')
self.queue = ExecutionQueue(self.spiders, spq, keepalive)
self.engine = ExecutionEngine(self.settings, self._spider_closed)
@defer.inlineCallbacks

35
scrapy/interfaces.py Normal file
View File

@ -0,0 +1,35 @@
from zope.interface import Interface
class ISpiderQueue(Interface):
def from_settings(settings):
"""Class method to instantiate from settings"""
def add(name, **spider_args):
"""Add a spider to the queue given its name a some spider arguments.
This method can return a deferred. """
def pop():
"""Pop the next mesasge from the queue. The messages is a dict
conaining a key 'name' with the spider name and other keys as spider
attributes.
This method can return a deferred. """
def list():
"""Return a list with the messages in the queue. Each message is a dict
which must have a 'name' key (with the spider name), and other optional
keys that will be used as spider arguments, to create the spider.
This method can return a deferred. """
def count():
"""Return the number of spiders in the queue.
This method can return a deferred. """
def clear():
"""Clear the queue.
This method can return a deferred. """

View File

@ -9,16 +9,21 @@ class ExecutionQueue(object):
polling_delay = 5
def __init__(self, _spiders):
def __init__(self, spiders, queue, keepalive=False):
self.spider_requests = []
self._spiders = _spiders
self._spiders = spiders
self._queue = queue
self._keepalive = keepalive
def _append_next(self):
"""Called when there are no more items left in self.spider_requests.
This method is meant to be overriden in subclasses to add new (spider,
requests) tuples to self.spider_requests. It can return a Deferred.
"""
pass
msg = self._queue.pop()
if msg:
name = msg.pop('name')
self.append_spider_name(name, **msg)
def get_next(self):
"""Return a tuple (spider, requests) containing a list of Requests and
@ -44,7 +49,7 @@ class ExecutionQueue(object):
spiders to crawl (this is for one-shot runs). If it returns ``False``
Scrapy will keep polling this queue for new requests to scrape
"""
return not bool(self.spider_requests)
return not self._keepalive and not bool(self.spider_requests)
def append_spider(self, spider):
"""Append a Spider to crawl"""
@ -82,9 +87,3 @@ class ExecutionQueue(object):
log.msg('Unable to find spider: %s' % name, log.ERROR)
else:
self.append_spider(spider)
class KeepAliveExecutionQueue(ExecutionQueue):
def is_finished(self):
return False

35
scrapy/spiderqueue.py Normal file
View File

@ -0,0 +1,35 @@
from zope.interface import implements
from zope.interface.verify import verifyObject
from scrapy.interfaces import ISpiderQueue
from scrapy.utils.sqlite import JsonSqlitePriorityQueue
class SqliteSpiderQueue(object):
implements(ISpiderQueue)
def __init__(self, database=None, table='spider_queue'):
self.q = JsonSqlitePriorityQueue(database, table)
@classmethod
def from_settings(cls, settings):
return cls(settings['SQLITE_DB'])
def add(self, name, **spider_args):
d = spider_args.copy()
d['name'] = name
priority = float(d.pop('priority', 0))
self.q.put(d, priority)
def pop(self):
return self.q.pop()
def count(self):
return len(self.q)
def list(self):
return [x[0] for x in self.q]
def clear(self):
self.q.clear()

View File

@ -0,0 +1,21 @@
import os
from twisted.trial import unittest
from zope.interface.verify import verifyObject
from scrapy.interfaces import ISpiderQueue
class SQSSpiderQueueTest(unittest.TestCase):
try:
import boto
except ImportError, e:
skip = str(e)
if 'AWS_ACCESS_KEY_ID' not in os.environ:
skip = "AWS keys not found"
def test_interface(self):
from scrapy.contrib.spiderqueue import SQSSpiderQueue
verifyObject(ISpiderQueue, SQSSpiderQueue())

View File

@ -1,6 +1,6 @@
import unittest
from scrapy.queue import ExecutionQueue, KeepAliveExecutionQueue
from scrapy.queue import ExecutionQueue
from scrapy.spider import BaseSpider
from scrapy.http import Request
@ -27,10 +27,10 @@ class TestSpiderManager(object):
class ExecutionQueueTest(unittest.TestCase):
queue_class = ExecutionQueue
keep_alive = False
def setUp(self):
self.queue = self.queue_class(_spiders=TestSpiderManager())
self.queue = ExecutionQueue(TestSpiderManager(), None, self.keep_alive)
self.spider = TestSpider()
self.request = Request('about:none')
@ -106,7 +106,7 @@ class ExecutionQueueTest(unittest.TestCase):
class KeepAliveExecutionQueueTest(ExecutionQueueTest):
queue_class = KeepAliveExecutionQueue
keep_alive = True
def test_is_finished(self):
self.assert_(not self.queue.is_finished())

View File

@ -0,0 +1,12 @@
import unittest
from zope.interface.verify import verifyObject
from scrapy.interfaces import ISpiderQueue
from scrapy.spiderqueue import SqliteSpiderQueue
class SpiderQueueTest(unittest.TestCase):
def test_interface(self):
verifyObject(ISpiderQueue, SqliteSpiderQueue())

View File

@ -8,10 +8,10 @@ from scrapy.utils.py26 import json
class SqliteDict(DictMixin):
"""SQLite-backed dictionary"""
def __init__(self, database=':memory:', table="dict"):
self.database = database
def __init__(self, database=None, table="dict"):
self.database = database or ':memory:'
self.table = table
self.conn = sqlite3.connect(database)
self.conn = sqlite3.connect(self.database)
q = "create table if not exists %s (key text primary key, value blob)" \
% table
self.conn.execute(q)
@ -86,10 +86,10 @@ class SqlitePriorityQueue(object):
providing atomic inter-process operations.
"""
def __init__(self, database=':memory:', table="queue"):
self.database = database
def __init__(self, database=None, table="queue"):
self.database = database or ':memory:'
self.table = table
self.conn = sqlite3.connect(database)
self.conn = sqlite3.connect(self.database)
q = "create table if not exists %s (id integer primary key, " \
"priority real key, message blob)" % table
self.conn.execute(q)