mirror of
https://github.com/scrapy/scrapy.git
synced 2025-02-24 16:03:49 +00:00
80 lines
2.3 KiB
Python
80 lines
2.3 KiB
Python
import hashlib
|
|
import tempfile
|
|
import unittest
|
|
import shutil
|
|
|
|
from scrapy.dupefilters import RFPDupeFilter
|
|
from scrapy.http import Request
|
|
from scrapy.utils.python import to_bytes
|
|
|
|
|
|
class RFPDupeFilterTest(unittest.TestCase):
|
|
|
|
def test_filter(self):
|
|
dupefilter = RFPDupeFilter()
|
|
dupefilter.open()
|
|
|
|
r1 = Request('http://scrapytest.org/1')
|
|
r2 = Request('http://scrapytest.org/2')
|
|
r3 = Request('http://scrapytest.org/2')
|
|
|
|
assert not dupefilter.request_seen(r1)
|
|
assert dupefilter.request_seen(r1)
|
|
|
|
assert not dupefilter.request_seen(r2)
|
|
assert dupefilter.request_seen(r3)
|
|
|
|
dupefilter.close('finished')
|
|
|
|
def test_dupefilter_path(self):
|
|
r1 = Request('http://scrapytest.org/1')
|
|
r2 = Request('http://scrapytest.org/2')
|
|
|
|
path = tempfile.mkdtemp()
|
|
try:
|
|
df = RFPDupeFilter(path)
|
|
df.open()
|
|
assert not df.request_seen(r1)
|
|
assert df.request_seen(r1)
|
|
df.close('finished')
|
|
|
|
df2 = RFPDupeFilter(path)
|
|
df2.open()
|
|
assert df2.request_seen(r1)
|
|
assert not df2.request_seen(r2)
|
|
assert df2.request_seen(r2)
|
|
df2.close('finished')
|
|
finally:
|
|
shutil.rmtree(path)
|
|
|
|
def test_request_fingerprint(self):
|
|
"""Test if customization of request_fingerprint method will change
|
|
output of request_seen.
|
|
|
|
"""
|
|
r1 = Request('http://scrapytest.org/index.html')
|
|
r2 = Request('http://scrapytest.org/INDEX.html')
|
|
|
|
dupefilter = RFPDupeFilter()
|
|
dupefilter.open()
|
|
|
|
assert not dupefilter.request_seen(r1)
|
|
assert not dupefilter.request_seen(r2)
|
|
|
|
dupefilter.close('finished')
|
|
|
|
class CaseInsensitiveRFPDupeFilter(RFPDupeFilter):
|
|
|
|
def request_fingerprint(self, request):
|
|
fp = hashlib.sha1()
|
|
fp.update(to_bytes(request.url.lower()))
|
|
return fp.hexdigest()
|
|
|
|
case_insensitive_dupefilter = CaseInsensitiveRFPDupeFilter()
|
|
case_insensitive_dupefilter.open()
|
|
|
|
assert not case_insensitive_dupefilter.request_seen(r1)
|
|
assert case_insensitive_dupefilter.request_seen(r2)
|
|
|
|
case_insensitive_dupefilter.close('finished')
|