1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-22 14:32:58 +00:00

ported code to use queuelib

This commit is contained in:
Pablo Hoffman 2013-04-23 17:48:09 -03:00
parent 5531290d53
commit d02da2f31f
10 changed files with 7 additions and 640 deletions

2
debian/control vendored
View File

@ -8,7 +8,7 @@ Homepage: http://scrapy.org/
Package: scrapy-SUFFIX
Architecture: all
Depends: ${python:Depends}, python-lxml, python-twisted, python-openssl, python-w3lib (>= 1.2)
Depends: ${python:Depends}, python-lxml, python-twisted, python-openssl, python-w3lib (>= 1.2), python-queuelib
Recommends: python-setuptools
Conflicts: python-scrapy, scrapy, scrapy-0.11
Provides: python-scrapy, scrapy

View File

@ -6,6 +6,7 @@ Release notes
0.18 (unreleased)
-----------------
- moved persistent (on disk) queues to a separate project (queuelib_) which scrapy now depends on
- add scrapy commands using external libraries (:issue:`260`)
- added ``--pdb`` option to ``scrapy`` command line tool
- added :meth:`XPathSelector.remove_namespaces` which allows to remove all namespaces from XML documents for convenience (to work with namespace-less XPaths). Documented in :ref:`topics-selectors`.
@ -446,3 +447,4 @@ First release of Scrapy.
.. _lxml: http://lxml.de/
.. _ClientForm: http://wwwsearch.sourceforge.net/old/ClientForm/
.. _resource: http://docs.python.org/library/resource.html
.. _queuelib: https://github.com/scrapy/queuelib

View File

@ -2,7 +2,7 @@ import os
import json
from os.path import join, exists
from scrapy.utils.pqueue import PriorityQueue
from queuelib import PriorityQueue
from scrapy.utils.reqser import request_to_dict, request_from_dict
from scrapy.utils.misc import load_object
from scrapy.utils.job import job_dir

View File

@ -4,7 +4,7 @@ Scheduler queues
import marshal, cPickle as pickle
from scrapy.utils import queue
from queuelib import queue
def _serializable_queue(queue_class, serialize, deserialize):

View File

@ -1,4 +1,4 @@
from scrapy.tests import test_utils_queue as t
from queuelib.tests import test_queue as t
from scrapy.squeue import MarshalFifoDiskQueue, MarshalLifoDiskQueue, PickleFifoDiskQueue, PickleLifoDiskQueue
from scrapy.item import Item, Field
from scrapy.http import Request

View File

@ -1,183 +0,0 @@
from twisted.trial import unittest
from scrapy.utils.pqueue import PriorityQueue
from scrapy.utils.queue import FifoMemoryQueue, LifoMemoryQueue, FifoDiskQueue, LifoDiskQueue
def track_closed(cls):
"""Wraps a queue class to track down if close() method was called"""
class TrackingClosed(cls):
def __init__(self, *a, **kw):
super(TrackingClosed, self).__init__(*a, **kw)
self.closed = False
def close(self):
super(TrackingClosed, self).close()
self.closed = True
return TrackingClosed
class FifoMemoryPriorityQueueTest(unittest.TestCase):
def setUp(self):
self.q = PriorityQueue(self.qfactory)
def qfactory(self, prio):
return track_closed(FifoMemoryQueue)()
def test_push_pop_noprio(self):
self.q.push('a')
self.q.push('b')
self.q.push('c')
self.assertEqual(self.q.pop(), 'a')
self.assertEqual(self.q.pop(), 'b')
self.assertEqual(self.q.pop(), 'c')
self.assertEqual(self.q.pop(), None)
def test_push_pop_prio(self):
self.q.push('a', 3)
self.q.push('b', 1)
self.q.push('c', 2)
self.q.push('d', 1)
self.assertEqual(self.q.pop(), 'b')
self.assertEqual(self.q.pop(), 'd')
self.assertEqual(self.q.pop(), 'c')
self.assertEqual(self.q.pop(), 'a')
self.assertEqual(self.q.pop(), None)
def test_len_nonzero(self):
assert not self.q
self.assertEqual(len(self.q), 0)
self.q.push('a', 3)
assert self.q
self.q.push('b', 1)
self.q.push('c', 2)
self.q.push('d', 1)
self.assertEqual(len(self.q), 4)
self.q.pop()
self.q.pop()
self.q.pop()
self.q.pop()
assert not self.q
self.assertEqual(len(self.q), 0)
def test_close(self):
self.q.push('a', 3)
self.q.push('b', 1)
self.q.push('c', 2)
self.q.push('d', 1)
iqueues = self.q.queues.values()
self.assertEqual(sorted(self.q.close()), [1, 2, 3])
assert all(q.closed for q in iqueues)
def test_close_return_active(self):
self.q.push('b', 1)
self.q.push('c', 2)
self.q.push('a', 3)
self.q.pop()
self.assertEqual(sorted(self.q.close()), [2, 3])
def test_popped_internal_queues_closed(self):
self.q.push('a', 3)
self.q.push('b', 1)
self.q.push('c', 2)
p1queue = self.q.queues[1]
self.assertEqual(self.q.pop(), 'b')
self.q.close()
assert p1queue.closed
class LifoMemoryPriorityQueueTest(FifoMemoryPriorityQueueTest):
def qfactory(self, prio):
return track_closed(LifoMemoryQueue)()
def test_push_pop_noprio(self):
self.q.push('a')
self.q.push('b')
self.q.push('c')
self.assertEqual(self.q.pop(), 'c')
self.assertEqual(self.q.pop(), 'b')
self.assertEqual(self.q.pop(), 'a')
self.assertEqual(self.q.pop(), None)
def test_push_pop_prio(self):
self.q.push('a', 3)
self.q.push('b', 1)
self.q.push('c', 2)
self.q.push('d', 1)
self.assertEqual(self.q.pop(), 'd')
self.assertEqual(self.q.pop(), 'b')
self.assertEqual(self.q.pop(), 'c')
self.assertEqual(self.q.pop(), 'a')
self.assertEqual(self.q.pop(), None)
class FifoDiskPriorityQueueTest(FifoMemoryPriorityQueueTest):
def setUp(self):
self.q = PriorityQueue(self.qfactory)
def qfactory(self, prio):
return track_closed(FifoDiskQueue)(self.mktemp())
def test_nonserializable_object_one(self):
self.assertRaises(TypeError, self.q.push, lambda x: x, 0)
self.assertEqual(self.q.close(), [])
def test_nonserializable_object_many_close(self):
self.q.push('a', 3)
self.q.push('b', 1)
self.assertRaises(TypeError, self.q.push, lambda x: x, 0)
self.q.push('c', 2)
self.assertEqual(self.q.pop(), 'b')
self.assertEqual(sorted(self.q.close()), [2, 3])
def test_nonserializable_object_many_pop(self):
self.q.push('a', 3)
self.q.push('b', 1)
self.assertRaises(TypeError, self.q.push, lambda x: x, 0)
self.q.push('c', 2)
self.assertEqual(self.q.pop(), 'b')
self.assertEqual(self.q.pop(), 'c')
self.assertEqual(self.q.pop(), 'a')
self.assertEqual(self.q.pop(), None)
self.assertEqual(self.q.close(), [])
class FifoDiskPriorityQueueTest(FifoMemoryPriorityQueueTest):
def qfactory(self, prio):
return track_closed(FifoDiskQueue)(self.mktemp())
def test_nonserializable_object_one(self):
self.assertRaises(TypeError, self.q.push, lambda x: x, 0)
self.assertEqual(self.q.close(), [])
def test_nonserializable_object_many_close(self):
self.q.push('a', 3)
self.q.push('b', 1)
self.assertRaises(TypeError, self.q.push, lambda x: x, 0)
self.q.push('c', 2)
self.assertEqual(self.q.pop(), 'b')
self.assertEqual(sorted(self.q.close()), [2, 3])
def test_nonserializable_object_many_pop(self):
self.q.push('a', 3)
self.q.push('b', 1)
self.assertRaises(TypeError, self.q.push, lambda x: x, 0)
self.q.push('c', 2)
self.assertEqual(self.q.pop(), 'b')
self.assertEqual(self.q.pop(), 'c')
self.assertEqual(self.q.pop(), 'a')
self.assertEqual(self.q.pop(), None)
self.assertEqual(self.q.close(), [])
class LifoDiskPriorityQueueTest(LifoMemoryPriorityQueueTest):
def qfactory(self, prio):
return track_closed(LifoDiskQueue)(self.mktemp())

View File

@ -1,224 +0,0 @@
import os, glob
from twisted.trial import unittest
from scrapy.utils.queue import FifoMemoryQueue, LifoMemoryQueue, FifoDiskQueue, LifoDiskQueue
class FifoMemoryQueueTest(unittest.TestCase):
def queue(self):
return FifoMemoryQueue()
def test_empty(self):
"""Empty queue test"""
q = self.queue()
assert q.pop() is None
def test_push_pop1(self):
"""Basic push/pop test"""
q = self.queue()
q.push('a')
q.push('b')
q.push('c')
self.assertEqual(q.pop(), 'a')
self.assertEqual(q.pop(), 'b')
self.assertEqual(q.pop(), 'c')
self.assertEqual(q.pop(), None)
def test_push_pop2(self):
"""Test interleaved push and pops"""
q = self.queue()
q.push('a')
q.push('b')
q.push('c')
q.push('d')
self.assertEqual(q.pop(), 'a')
self.assertEqual(q.pop(), 'b')
q.push('e')
self.assertEqual(q.pop(), 'c')
self.assertEqual(q.pop(), 'd')
self.assertEqual(q.pop(), 'e')
def test_len(self):
q = self.queue()
self.assertEqual(len(q), 0)
q.push('a')
self.assertEqual(len(q), 1)
q.push('b')
q.push('c')
self.assertEqual(len(q), 3)
q.pop()
q.pop()
q.pop()
self.assertEqual(len(q), 0)
class LifoMemoryQueueTest(unittest.TestCase):
def queue(self):
return LifoMemoryQueue()
def test_empty(self):
"""Empty queue test"""
q = self.queue()
assert q.pop() is None
def test_push_pop1(self):
"""Basic push/pop test"""
q = self.queue()
q.push('a')
q.push('b')
q.push('c')
self.assertEqual(q.pop(), 'c')
self.assertEqual(q.pop(), 'b')
self.assertEqual(q.pop(), 'a')
self.assertEqual(q.pop(), None)
def test_push_pop2(self):
"""Test interleaved push and pops"""
q = self.queue()
q.push('a')
q.push('b')
q.push('c')
q.push('d')
self.assertEqual(q.pop(), 'd')
self.assertEqual(q.pop(), 'c')
q.push('e')
self.assertEqual(q.pop(), 'e')
self.assertEqual(q.pop(), 'b')
self.assertEqual(q.pop(), 'a')
def test_len(self):
q = self.queue()
self.assertEqual(len(q), 0)
q.push('a')
self.assertEqual(len(q), 1)
q.push('b')
q.push('c')
self.assertEqual(len(q), 3)
q.pop()
q.pop()
q.pop()
self.assertEqual(len(q), 0)
class FifoDiskQueueTest(FifoMemoryQueueTest):
chunksize = 100000
def setUp(self):
self.qdir = self.mktemp()
def queue(self):
return FifoDiskQueue(self.qdir, chunksize=self.chunksize)
def test_close_open(self):
"""Test closing and re-opening keeps state"""
q = self.queue()
q.push('a')
q.push('b')
q.push('c')
q.push('d')
self.assertEqual(q.pop(), 'a')
self.assertEqual(q.pop(), 'b')
q.close()
del q
q = self.queue()
self.assertEqual(len(q), 2)
q.push('e')
self.assertEqual(q.pop(), 'c')
self.assertEqual(q.pop(), 'd')
q.close()
del q
q = self.queue()
self.assertEqual(q.pop(), 'e')
self.assertEqual(len(q), 0)
def test_chunks(self):
"""Test chunks are created and removed"""
q = self.queue()
for x in range(5):
q.push(str(x))
chunks = glob.glob(os.path.join(self.qdir, 'q*'))
self.assertEqual(len(chunks), 5/self.chunksize + 1)
for x in range(5):
q.pop()
chunks = glob.glob(os.path.join(self.qdir, 'q*'))
self.assertEqual(len(chunks), 1)
def test_cleanup(self):
"""Test queue dir is removed if queue is empty"""
q = self.queue()
assert os.path.exists(self.qdir)
for x in range(5):
q.push(str(x))
for x in range(5):
q.pop()
q.close()
assert not os.path.exists(self.qdir)
class ChunkSize1FifoDiskQueueTest(FifoDiskQueueTest):
chunksize = 1
class ChunkSize2FifoDiskQueueTest(FifoDiskQueueTest):
chunksize = 2
class ChunkSize3FifoDiskQueueTest(FifoDiskQueueTest):
chunksize = 3
class ChunkSize4FifoDiskQueueTest(FifoDiskQueueTest):
chunksize = 4
class LifoDiskQueueTest(LifoMemoryQueueTest):
def setUp(self):
self.path = self.mktemp()
def queue(self):
return LifoDiskQueue(self.path)
def test_close_open(self):
"""Test closing and re-opening keeps state"""
q = self.queue()
q.push('a')
q.push('b')
q.push('c')
q.push('d')
self.assertEqual(q.pop(), 'd')
self.assertEqual(q.pop(), 'c')
q.close()
del q
q = self.queue()
self.assertEqual(len(q), 2)
q.push('e')
self.assertEqual(q.pop(), 'e')
self.assertEqual(q.pop(), 'b')
q.close()
del q
q = self.queue()
self.assertEqual(q.pop(), 'a')
self.assertEqual(len(q), 0)
def test_cleanup(self):
"""Test queue file is removed if queue is empty"""
q = self.queue()
assert os.path.exists(self.path)
for x in range(5):
q.push(str(x))
for x in range(5):
q.pop()
q.close()
assert not os.path.exists(self.path)
def test_file_size_shrinks(self):
"""Test size of queue file shrinks when popping items"""
q = self.queue()
q.push('a')
q.push('b')
q.close()
size = os.path.getsize(self.path)
q = self.queue()
q.pop()
q.close()
assert os.path.getsize(self.path), size

View File

@ -1,55 +0,0 @@
class PriorityQueue(object):
"""A priority queue implemented using multiple internal queues (typically,
FIFO queues). The internal queue must implement the following methods:
* push(obj)
* pop()
* close()
* __len__()
The constructor receives a qfactory argument, which is a callable used to
instantiate a new (internal) queue when a new priority is allocated. The
qfactory function is called with the priority number as first and only
argument.
Only integer priorities should be used. Lower numbers are higher
priorities.
"""
def __init__(self, qfactory, startprios=()):
self.queues = {}
self.qfactory = qfactory
for p in startprios:
self.queues[p] = self.qfactory(p)
self.curprio = min(startprios) if startprios else None
def push(self, obj, priority=0):
if priority not in self.queues:
self.queues[priority] = self.qfactory(priority)
q = self.queues[priority]
q.push(obj) # this may fail (eg. serialization error)
if priority < self.curprio or self.curprio is None:
self.curprio = priority
def pop(self):
if self.curprio is None:
return
q = self.queues[self.curprio]
m = q.pop()
if len(q) == 0:
del self.queues[self.curprio]
q.close()
prios = [p for p, q in self.queues.items() if len(q) > 0]
self.curprio = min(prios) if prios else None
return m
def close(self):
active = []
for p, q in self.queues.items():
if len(q):
active.append(p)
q.close()
return active
def __len__(self):
return sum(len(x) for x in self.queues.values()) if self.queues else 0

View File

@ -1,173 +0,0 @@
import os
import struct
import glob
import json
from collections import deque
class FifoMemoryQueue(object):
"""Memory FIFO queue."""
def __init__(self):
self.q = deque()
self.push = self.q.append
def pop(self):
q = self.q
return q.popleft() if q else None
def close(self):
pass
def __len__(self):
return len(self.q)
class LifoMemoryQueue(FifoMemoryQueue):
"""Memory LIFO queue."""
def pop(self):
q = self.q
return q.pop() if q else None
class FifoDiskQueue(object):
"""Persistent FIFO queue."""
szhdr_format = ">L"
szhdr_size = struct.calcsize(szhdr_format)
def __init__(self, path, chunksize=100000):
self.path = path
if not os.path.exists(path):
os.makedirs(path)
self.info = self._loadinfo(chunksize)
self.chunksize = self.info['chunksize']
self.headf = self._openchunk(self.info['head'][0], 'ab+')
self.tailf = self._openchunk(self.info['tail'][0])
os.lseek(self.tailf.fileno(), self.info['tail'][2], os.SEEK_SET)
def push(self, string):
hnum, hpos = self.info['head']
hpos += 1
szhdr = struct.pack(self.szhdr_format, len(string))
os.write(self.headf.fileno(), szhdr + string)
if hpos == self.chunksize:
hpos = 0
hnum += 1
self.headf.close()
self.headf = self._openchunk(hnum, 'ab+')
self.info['size'] += 1
self.info['head'] = hnum, hpos
def _openchunk(self, number, mode='r'):
return open(os.path.join(self.path, 'q%05d' % number), mode)
def pop(self):
tnum, tcnt, toffset = self.info['tail']
if [tnum, tcnt] >= self.info['head']:
return
tfd = self.tailf.fileno()
szhdr = os.read(tfd, self.szhdr_size)
if not szhdr:
return
size, = struct.unpack(self.szhdr_format, szhdr)
data = os.read(tfd, size)
tcnt += 1
toffset += self.szhdr_size + size
if tcnt == self.chunksize and tnum <= self.info['head'][0]:
tcnt = toffset = 0
tnum += 1
self.tailf.close()
os.remove(self.tailf.name)
self.tailf = self._openchunk(tnum)
self.info['size'] -= 1
self.info['tail'] = tnum, tcnt, toffset
return data
def close(self):
self.headf.close()
self.tailf.close()
self._saveinfo(self.info)
if len(self) == 0:
self._cleanup()
def __len__(self):
return self.info['size']
def _loadinfo(self, chunksize):
infopath = self._infopath()
if os.path.exists(infopath):
with open(infopath) as f:
info = json.load(f)
else:
info = {
'chunksize': chunksize,
'size': 0,
'tail': [0, 0, 0],
'head': [0, 0],
}
return info
def _saveinfo(self, info):
with open(self._infopath(), 'w') as f:
json.dump(info, f)
def _infopath(self):
return os.path.join(self.path, 'info.json')
def _cleanup(self):
for x in glob.glob(os.path.join(self.path, 'q*')):
os.remove(x)
os.remove(os.path.join(self.path, 'info.json'))
if not os.listdir(self.path):
os.rmdir(self.path)
class LifoDiskQueue(object):
"""Persistent LIFO queue."""
SIZE_FORMAT = ">L"
SIZE_SIZE = struct.calcsize(SIZE_FORMAT)
def __init__(self, path):
self.path = path
if os.path.exists(path):
self.f = open(path, 'rb+')
qsize = self.f.read(self.SIZE_SIZE)
self.size, = struct.unpack(self.SIZE_FORMAT, qsize)
self.f.seek(0, os.SEEK_END)
else:
self.f = open(path, 'wb+')
self.f.write(struct.pack(self.SIZE_FORMAT, 0))
self.size = 0
def push(self, string):
self.f.write(string)
ssize = struct.pack(self.SIZE_FORMAT, len(string))
self.f.write(ssize)
self.size += 1
def pop(self):
if not self.size:
return
self.f.seek(-self.SIZE_SIZE, os.SEEK_END)
size, = struct.unpack(self.SIZE_FORMAT, self.f.read())
self.f.seek(-size-self.SIZE_SIZE, os.SEEK_END)
data = self.f.read(size)
self.f.seek(-size, os.SEEK_CUR)
self.f.truncate()
self.size -= 1
return data
def close(self):
if self.size:
self.f.seek(0)
self.f.write(struct.pack(self.SIZE_FORMAT, self.size))
self.f.close()
if not self.size:
os.remove(self.path)
def __len__(self):
return self.size

View File

@ -122,6 +122,6 @@ try:
except ImportError:
from distutils.core import setup
else:
setup_args['install_requires'] = ['Twisted>=8.0', 'w3lib>=1.2', 'lxml', 'pyOpenSSL']
setup_args['install_requires'] = ['Twisted>=8.0', 'w3lib>=1.2', 'queuelib', 'lxml', 'pyOpenSSL']
setup(**setup_args)