mirror of
https://github.com/scrapy/scrapy.git
synced 2025-02-24 20:43:50 +00:00
298 lines
10 KiB
Python
298 lines
10 KiB
Python
from __future__ import absolute_import
|
|
import os
|
|
import csv
|
|
import json
|
|
from io import BytesIO
|
|
import tempfile
|
|
import shutil
|
|
import six
|
|
from six.moves.urllib.parse import urlparse
|
|
|
|
from zope.interface.verify import verifyObject
|
|
from twisted.trial import unittest
|
|
from twisted.internet import defer
|
|
from scrapy.crawler import CrawlerRunner
|
|
from scrapy.settings import Settings
|
|
from tests.mockserver import MockServer
|
|
from w3lib.url import path_to_file_uri
|
|
|
|
import scrapy
|
|
from scrapy.extensions.feedexport import (
|
|
IFeedStorage, FileFeedStorage, FTPFeedStorage,
|
|
S3FeedStorage, StdoutFeedStorage
|
|
)
|
|
from scrapy.utils.test import assert_aws_environ
|
|
|
|
|
|
class FileFeedStorageTest(unittest.TestCase):
|
|
|
|
def test_store_file_uri(self):
|
|
path = os.path.abspath(self.mktemp())
|
|
uri = path_to_file_uri(path)
|
|
return self._assert_stores(FileFeedStorage(uri), path)
|
|
|
|
def test_store_file_uri_makedirs(self):
|
|
path = os.path.abspath(self.mktemp())
|
|
path = os.path.join(path, 'more', 'paths', 'file.txt')
|
|
uri = path_to_file_uri(path)
|
|
return self._assert_stores(FileFeedStorage(uri), path)
|
|
|
|
def test_store_direct_path(self):
|
|
path = os.path.abspath(self.mktemp())
|
|
return self._assert_stores(FileFeedStorage(path), path)
|
|
|
|
def test_store_direct_path_relative(self):
|
|
path = self.mktemp()
|
|
return self._assert_stores(FileFeedStorage(path), path)
|
|
|
|
def test_interface(self):
|
|
path = self.mktemp()
|
|
st = FileFeedStorage(path)
|
|
verifyObject(IFeedStorage, st)
|
|
|
|
@defer.inlineCallbacks
|
|
def _assert_stores(self, storage, path):
|
|
spider = scrapy.Spider("default")
|
|
file = storage.open(spider)
|
|
file.write(b"content")
|
|
yield storage.store(file)
|
|
self.assertTrue(os.path.exists(path))
|
|
with open(path, 'rb') as fp:
|
|
self.assertEqual(fp.read(), b"content")
|
|
|
|
|
|
class FTPFeedStorageTest(unittest.TestCase):
|
|
|
|
def test_store(self):
|
|
uri = os.environ.get('FEEDTEST_FTP_URI')
|
|
path = os.environ.get('FEEDTEST_FTP_PATH')
|
|
if not (uri and path):
|
|
raise unittest.SkipTest("No FTP server available for testing")
|
|
st = FTPFeedStorage(uri)
|
|
verifyObject(IFeedStorage, st)
|
|
return self._assert_stores(st, path)
|
|
|
|
@defer.inlineCallbacks
|
|
def _assert_stores(self, storage, path):
|
|
spider = scrapy.Spider("default")
|
|
file = storage.open(spider)
|
|
file.write(b"content")
|
|
yield storage.store(file)
|
|
self.assertTrue(os.path.exists(path))
|
|
with open(path, 'rb') as fp:
|
|
self.assertEqual(fp.read(), b"content")
|
|
# again, to check s3 objects are overwritten
|
|
yield storage.store(BytesIO(b"new content"))
|
|
with open(path, 'rb') as fp:
|
|
self.assertEqual(fp.read(), b"new content")
|
|
|
|
|
|
class S3FeedStorageTest(unittest.TestCase):
|
|
|
|
@defer.inlineCallbacks
|
|
def test_store(self):
|
|
assert_aws_environ()
|
|
uri = os.environ.get('FEEDTEST_S3_URI')
|
|
if not uri:
|
|
raise unittest.SkipTest("No S3 URI available for testing")
|
|
from boto import connect_s3
|
|
storage = S3FeedStorage(uri)
|
|
verifyObject(IFeedStorage, storage)
|
|
file = storage.open(scrapy.Spider("default"))
|
|
file.write("content")
|
|
yield storage.store(file)
|
|
u = urlparse(uri)
|
|
key = connect_s3().get_bucket(u.hostname, validate=False).get_key(u.path)
|
|
self.assertEqual(key.get_contents_as_string(), "content")
|
|
|
|
|
|
class StdoutFeedStorageTest(unittest.TestCase):
|
|
|
|
@defer.inlineCallbacks
|
|
def test_store(self):
|
|
out = BytesIO()
|
|
storage = StdoutFeedStorage('stdout:', _stdout=out)
|
|
file = storage.open(scrapy.Spider("default"))
|
|
file.write(b"content")
|
|
yield storage.store(file)
|
|
self.assertEqual(out.getvalue(), b"content")
|
|
|
|
|
|
class FeedExportTest(unittest.TestCase):
|
|
|
|
skip = not six.PY2
|
|
|
|
class MyItem(scrapy.Item):
|
|
foo = scrapy.Field()
|
|
egg = scrapy.Field()
|
|
baz = scrapy.Field()
|
|
|
|
@defer.inlineCallbacks
|
|
def run_and_export(self, spider_cls, settings=None):
|
|
""" Run spider with specified settings; return exported data. """
|
|
tmpdir = tempfile.mkdtemp()
|
|
res_name = tmpdir + '/res'
|
|
defaults = {
|
|
'FEED_URI': 'file://' + res_name,
|
|
'FEED_FORMAT': 'csv',
|
|
}
|
|
defaults.update(settings or {})
|
|
try:
|
|
with MockServer() as s:
|
|
runner = CrawlerRunner(Settings(defaults))
|
|
yield runner.crawl(spider_cls)
|
|
|
|
with open(res_name, 'rb') as f:
|
|
defer.returnValue(f.read())
|
|
|
|
finally:
|
|
shutil.rmtree(tmpdir)
|
|
|
|
@defer.inlineCallbacks
|
|
def exported_data(self, items, settings):
|
|
"""
|
|
Return exported data which a spider yielding ``items`` would return.
|
|
"""
|
|
class TestSpider(scrapy.Spider):
|
|
name = 'testspider'
|
|
start_urls = ['http://localhost:8998/']
|
|
|
|
def parse(self, response):
|
|
for item in items:
|
|
yield item
|
|
|
|
data = yield self.run_and_export(TestSpider, settings)
|
|
defer.returnValue(data)
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExportedCsv(self, items, header, rows, settings=None, ordered=True):
|
|
settings = settings or {}
|
|
settings.update({'FEED_FORMAT': 'csv'})
|
|
data = yield self.exported_data(items, settings)
|
|
|
|
reader = csv.DictReader(data.splitlines())
|
|
got_rows = list(reader)
|
|
if ordered:
|
|
self.assertEqual(reader.fieldnames, header)
|
|
else:
|
|
self.assertEqual(set(reader.fieldnames), set(header))
|
|
|
|
self.assertEqual(rows, got_rows)
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExportedJsonLines(self, items, rows, settings=None):
|
|
settings = settings or {}
|
|
settings.update({'FEED_FORMAT': 'jl'})
|
|
data = yield self.exported_data(items, settings)
|
|
parsed = [json.loads(line) for line in data.splitlines()]
|
|
rows = [{k: v for k, v in row.items() if v} for row in rows]
|
|
self.assertEqual(rows, parsed)
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExported(self, items, header, rows, settings=None, ordered=True):
|
|
yield self.assertExportedCsv(items, header, rows, settings, ordered)
|
|
yield self.assertExportedJsonLines(items, rows, settings)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_items(self):
|
|
# feed exporters use field names from Item
|
|
items = [
|
|
self.MyItem({'foo': 'bar1', 'egg': 'spam1'}),
|
|
self.MyItem({'foo': 'bar2', 'egg': 'spam2', 'baz': 'quux2'}),
|
|
]
|
|
rows = [
|
|
{'egg': 'spam1', 'foo': 'bar1', 'baz': ''},
|
|
{'egg': 'spam2', 'foo': 'bar2', 'baz': 'quux2'}
|
|
]
|
|
header = self.MyItem.fields.keys()
|
|
yield self.assertExported(items, header, rows, ordered=False)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_multiple_item_classes(self):
|
|
|
|
class MyItem2(scrapy.Item):
|
|
foo = scrapy.Field()
|
|
hello = scrapy.Field()
|
|
|
|
items = [
|
|
self.MyItem({'foo': 'bar1', 'egg': 'spam1'}),
|
|
MyItem2({'hello': 'world2', 'foo': 'bar2'}),
|
|
self.MyItem({'foo': 'bar3', 'egg': 'spam3', 'baz': 'quux3'}),
|
|
{'hello': 'world4', 'egg': 'spam4'},
|
|
]
|
|
|
|
# by default, Scrapy uses fields of the first Item for CSV and
|
|
# all fields for JSON Lines
|
|
header = self.MyItem.fields.keys()
|
|
rows_csv = [
|
|
{'egg': 'spam1', 'foo': 'bar1', 'baz': ''},
|
|
{'egg': '', 'foo': 'bar2', 'baz': ''},
|
|
{'egg': 'spam3', 'foo': 'bar3', 'baz': 'quux3'},
|
|
{'egg': 'spam4', 'foo': '', 'baz': ''},
|
|
]
|
|
rows_jl = [dict(row) for row in items]
|
|
yield self.assertExportedCsv(items, header, rows_csv, ordered=False)
|
|
yield self.assertExportedJsonLines(items, rows_jl)
|
|
|
|
# edge case: FEED_EXPORT_FIELDS==[] means the same as default None
|
|
settings = {'FEED_EXPORT_FIELDS': []}
|
|
yield self.assertExportedCsv(items, header, rows_csv, ordered=False)
|
|
yield self.assertExportedJsonLines(items, rows_jl, settings)
|
|
|
|
# it is possible to override fields using FEED_EXPORT_FIELDS
|
|
header = ["foo", "baz", "hello"]
|
|
settings = {'FEED_EXPORT_FIELDS': header}
|
|
rows = [
|
|
{'foo': 'bar1', 'baz': '', 'hello': ''},
|
|
{'foo': 'bar2', 'baz': '', 'hello': 'world2'},
|
|
{'foo': 'bar3', 'baz': 'quux3', 'hello': ''},
|
|
{'foo': '', 'baz': '', 'hello': 'world4'},
|
|
]
|
|
yield self.assertExported(items, header, rows,
|
|
settings=settings, ordered=True)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_dicts(self):
|
|
# When dicts are used, only keys from the first row are used as
|
|
# a header for CSV, and all fields are used for JSON Lines.
|
|
items = [
|
|
{'foo': 'bar', 'egg': 'spam'},
|
|
{'foo': 'bar', 'egg': 'spam', 'baz': 'quux'},
|
|
]
|
|
rows_csv = [
|
|
{'egg': 'spam', 'foo': 'bar'},
|
|
{'egg': 'spam', 'foo': 'bar'}
|
|
]
|
|
rows_jl = items
|
|
yield self.assertExportedCsv(items, ['egg', 'foo'], rows_csv, ordered=False)
|
|
yield self.assertExportedJsonLines(items, rows_jl)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_feed_export_fields(self):
|
|
# FEED_EXPORT_FIELDS option allows to order export fields
|
|
# and to select a subset of fields to export, both for Items and dicts.
|
|
|
|
for item_cls in [self.MyItem, dict]:
|
|
items = [
|
|
item_cls({'foo': 'bar1', 'egg': 'spam1'}),
|
|
item_cls({'foo': 'bar2', 'egg': 'spam2', 'baz': 'quux2'}),
|
|
]
|
|
|
|
# export all columns
|
|
settings = {'FEED_EXPORT_FIELDS': 'foo,baz,egg'}
|
|
rows = [
|
|
{'egg': 'spam1', 'foo': 'bar1', 'baz': ''},
|
|
{'egg': 'spam2', 'foo': 'bar2', 'baz': 'quux2'}
|
|
]
|
|
yield self.assertExported(items, ['foo', 'baz', 'egg'], rows,
|
|
settings=settings, ordered=True)
|
|
|
|
# export a subset of columns
|
|
settings = {'FEED_EXPORT_FIELDS': 'egg,baz'}
|
|
rows = [
|
|
{'egg': 'spam1', 'baz': ''},
|
|
{'egg': 'spam2', 'baz': 'quux2'}
|
|
]
|
|
yield self.assertExported(items, ['egg', 'baz'], rows,
|
|
settings=settings, ordered=True)
|