1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-26 15:24:12 +00:00
scrapy/tests/test_feedexport.py

308 lines
11 KiB
Python
Raw Normal View History

2015-04-15 02:48:26 +05:00
from __future__ import absolute_import
import os
2015-04-15 02:48:26 +05:00
import csv
import json
from io import BytesIO
2015-04-15 02:48:26 +05:00
import tempfile
import shutil
from six.moves.urllib.parse import urlparse
from zope.interface.verify import verifyObject
from twisted.trial import unittest
from twisted.internet import defer
2015-04-15 02:48:26 +05:00
from scrapy.crawler import CrawlerRunner
from scrapy.settings import Settings
from tests.mockserver import MockServer
from w3lib.url import path_to_file_uri
2015-04-15 02:48:26 +05:00
import scrapy
from scrapy.extensions.feedexport import (
2015-04-15 02:48:26 +05:00
IFeedStorage, FileFeedStorage, FTPFeedStorage,
S3FeedStorage, StdoutFeedStorage
)
from scrapy.utils.test import assert_aws_environ
2016-01-21 21:22:12 -02:00
from scrapy.utils.python import to_native_str
2015-04-15 02:48:26 +05:00
class FileFeedStorageTest(unittest.TestCase):
def test_store_file_uri(self):
path = os.path.abspath(self.mktemp())
2010-08-22 19:37:20 -03:00
uri = path_to_file_uri(path)
return self._assert_stores(FileFeedStorage(uri), path)
def test_store_file_uri_makedirs(self):
path = os.path.abspath(self.mktemp())
path = os.path.join(path, 'more', 'paths', 'file.txt')
2010-08-22 19:37:20 -03:00
uri = path_to_file_uri(path)
return self._assert_stores(FileFeedStorage(uri), path)
def test_store_direct_path(self):
path = os.path.abspath(self.mktemp())
return self._assert_stores(FileFeedStorage(path), path)
def test_store_direct_path_relative(self):
path = self.mktemp()
return self._assert_stores(FileFeedStorage(path), path)
def test_interface(self):
path = self.mktemp()
st = FileFeedStorage(path)
verifyObject(IFeedStorage, st)
@defer.inlineCallbacks
def _assert_stores(self, storage, path):
2015-04-15 02:48:26 +05:00
spider = scrapy.Spider("default")
file = storage.open(spider)
2014-08-01 23:28:28 +06:00
file.write(b"content")
yield storage.store(file)
2014-08-01 23:44:23 +06:00
self.assertTrue(os.path.exists(path))
2014-08-01 23:28:28 +06:00
with open(path, 'rb') as fp:
2014-08-01 23:44:23 +06:00
self.assertEqual(fp.read(), b"content")
class FTPFeedStorageTest(unittest.TestCase):
def test_store(self):
uri = os.environ.get('FEEDTEST_FTP_URI')
path = os.environ.get('FEEDTEST_FTP_PATH')
if not (uri and path):
raise unittest.SkipTest("No FTP server available for testing")
st = FTPFeedStorage(uri)
verifyObject(IFeedStorage, st)
return self._assert_stores(st, path)
@defer.inlineCallbacks
def _assert_stores(self, storage, path):
2015-04-15 02:48:26 +05:00
spider = scrapy.Spider("default")
file = storage.open(spider)
file.write(b"content")
yield storage.store(file)
2014-08-01 23:44:23 +06:00
self.assertTrue(os.path.exists(path))
2014-08-01 23:28:28 +06:00
with open(path, 'rb') as fp:
2014-08-01 23:44:23 +06:00
self.assertEqual(fp.read(), b"content")
# again, to check s3 objects are overwritten
yield storage.store(BytesIO(b"new content"))
2014-08-01 23:28:28 +06:00
with open(path, 'rb') as fp:
2014-08-01 23:44:23 +06:00
self.assertEqual(fp.read(), b"new content")
class S3FeedStorageTest(unittest.TestCase):
@defer.inlineCallbacks
def test_store(self):
assert_aws_environ()
uri = os.environ.get('FEEDTEST_S3_URI')
if not uri:
raise unittest.SkipTest("No S3 URI available for testing")
from boto import connect_s3
storage = S3FeedStorage(uri)
verifyObject(IFeedStorage, storage)
2015-04-15 02:48:26 +05:00
file = storage.open(scrapy.Spider("default"))
file.write("content")
yield storage.store(file)
u = urlparse(uri)
key = connect_s3().get_bucket(u.hostname, validate=False).get_key(u.path)
2014-08-01 23:44:23 +06:00
self.assertEqual(key.get_contents_as_string(), "content")
class StdoutFeedStorageTest(unittest.TestCase):
@defer.inlineCallbacks
def test_store(self):
out = BytesIO()
storage = StdoutFeedStorage('stdout:', _stdout=out)
2015-04-15 02:48:26 +05:00
file = storage.open(scrapy.Spider("default"))
file.write(b"content")
yield storage.store(file)
self.assertEqual(out.getvalue(), b"content")
2015-04-15 02:48:26 +05:00
class FeedExportTest(unittest.TestCase):
class MyItem(scrapy.Item):
foo = scrapy.Field()
egg = scrapy.Field()
baz = scrapy.Field()
@defer.inlineCallbacks
def run_and_export(self, spider_cls, settings=None):
""" Run spider with specified settings; return exported data. """
tmpdir = tempfile.mkdtemp()
res_name = tmpdir + '/res'
defaults = {
'FEED_URI': 'file://' + res_name,
'FEED_FORMAT': 'csv',
}
defaults.update(settings or {})
try:
with MockServer() as s:
runner = CrawlerRunner(Settings(defaults))
yield runner.crawl(spider_cls)
with open(res_name, 'rb') as f:
defer.returnValue(f.read())
finally:
shutil.rmtree(tmpdir)
@defer.inlineCallbacks
def exported_data(self, items, settings):
"""
Return exported data which a spider yielding ``items`` would return.
"""
class TestSpider(scrapy.Spider):
name = 'testspider'
start_urls = ['http://localhost:8998/']
def parse(self, response):
for item in items:
yield item
data = yield self.run_and_export(TestSpider, settings)
defer.returnValue(data)
@defer.inlineCallbacks
def assertExportedCsv(self, items, header, rows, settings=None, ordered=True):
settings = settings or {}
settings.update({'FEED_FORMAT': 'csv'})
data = yield self.exported_data(items, settings)
2016-01-21 21:22:12 -02:00
reader = csv.DictReader(to_native_str(data).splitlines())
2015-04-15 02:48:26 +05:00
got_rows = list(reader)
if ordered:
self.assertEqual(reader.fieldnames, header)
else:
self.assertEqual(set(reader.fieldnames), set(header))
self.assertEqual(rows, got_rows)
@defer.inlineCallbacks
def assertExportedJsonLines(self, items, rows, settings=None):
settings = settings or {}
settings.update({'FEED_FORMAT': 'jl'})
data = yield self.exported_data(items, settings)
2016-01-21 21:22:12 -02:00
parsed = [json.loads(to_native_str(line)) for line in data.splitlines()]
rows = [{k: v for k, v in row.items() if v} for row in rows]
self.assertEqual(rows, parsed)
2016-01-23 17:17:40 -02:00
@defer.inlineCallbacks
def assertExportedXml(self, items, rows, settings=None):
settings = settings or {}
settings.update({'FEED_FORMAT': 'xml'})
data = yield self.exported_data(items, settings)
rows = [{k: v for k, v in row.items() if v} for row in rows]
import lxml.etree
root = lxml.etree.fromstring(data)
got_rows = [{e.tag: e.text for e in it} for it in root.findall('item')]
self.assertEqual(rows, got_rows)
@defer.inlineCallbacks
def assertExported(self, items, header, rows, settings=None, ordered=True):
yield self.assertExportedCsv(items, header, rows, settings, ordered)
yield self.assertExportedJsonLines(items, rows, settings)
2016-01-23 17:17:40 -02:00
yield self.assertExportedXml(items, rows, settings)
@defer.inlineCallbacks
def test_export_items(self):
2015-04-15 02:48:26 +05:00
# feed exporters use field names from Item
items = [
self.MyItem({'foo': 'bar1', 'egg': 'spam1'}),
self.MyItem({'foo': 'bar2', 'egg': 'spam2', 'baz': 'quux2'}),
]
rows = [
{'egg': 'spam1', 'foo': 'bar1', 'baz': ''},
{'egg': 'spam2', 'foo': 'bar2', 'baz': 'quux2'}
]
header = self.MyItem.fields.keys()
yield self.assertExported(items, header, rows, ordered=False)
2015-04-15 02:48:26 +05:00
2015-04-15 03:23:30 +05:00
@defer.inlineCallbacks
def test_export_multiple_item_classes(self):
2015-04-15 03:23:30 +05:00
class MyItem2(scrapy.Item):
foo = scrapy.Field()
hello = scrapy.Field()
items = [
self.MyItem({'foo': 'bar1', 'egg': 'spam1'}),
MyItem2({'hello': 'world2', 'foo': 'bar2'}),
self.MyItem({'foo': 'bar3', 'egg': 'spam3', 'baz': 'quux3'}),
{'hello': 'world4', 'egg': 'spam4'},
]
# by default, Scrapy uses fields of the first Item for CSV and
# all fields for JSON Lines
2015-04-15 03:23:30 +05:00
header = self.MyItem.fields.keys()
rows_csv = [
2015-04-15 03:23:30 +05:00
{'egg': 'spam1', 'foo': 'bar1', 'baz': ''},
{'egg': '', 'foo': 'bar2', 'baz': ''},
{'egg': 'spam3', 'foo': 'bar3', 'baz': 'quux3'},
{'egg': 'spam4', 'foo': '', 'baz': ''},
]
rows_jl = [dict(row) for row in items]
yield self.assertExportedCsv(items, header, rows_csv, ordered=False)
yield self.assertExportedJsonLines(items, rows_jl)
2015-04-15 03:23:30 +05:00
# edge case: FEED_EXPORT_FIELDS==[] means the same as default None
settings = {'FEED_EXPORT_FIELDS': []}
yield self.assertExportedCsv(items, header, rows_csv, ordered=False)
yield self.assertExportedJsonLines(items, rows_jl, settings)
# it is possible to override fields using FEED_EXPORT_FIELDS
2015-04-15 03:23:30 +05:00
header = ["foo", "baz", "hello"]
settings = {'FEED_EXPORT_FIELDS': header}
rows = [
{'foo': 'bar1', 'baz': '', 'hello': ''},
{'foo': 'bar2', 'baz': '', 'hello': 'world2'},
{'foo': 'bar3', 'baz': 'quux3', 'hello': ''},
{'foo': '', 'baz': '', 'hello': 'world4'},
]
yield self.assertExported(items, header, rows,
settings=settings, ordered=True)
2015-04-15 03:23:30 +05:00
2015-04-15 02:48:26 +05:00
@defer.inlineCallbacks
def test_export_dicts(self):
2015-04-15 02:48:26 +05:00
# When dicts are used, only keys from the first row are used as
# a header for CSV, and all fields are used for JSON Lines.
2015-04-15 02:48:26 +05:00
items = [
{'foo': 'bar', 'egg': 'spam'},
{'foo': 'bar', 'egg': 'spam', 'baz': 'quux'},
]
rows_csv = [
2015-04-15 02:48:26 +05:00
{'egg': 'spam', 'foo': 'bar'},
{'egg': 'spam', 'foo': 'bar'}
]
rows_jl = items
yield self.assertExportedCsv(items, ['egg', 'foo'], rows_csv, ordered=False)
yield self.assertExportedJsonLines(items, rows_jl)
2015-04-15 02:48:26 +05:00
@defer.inlineCallbacks
def test_export_feed_export_fields(self):
2015-04-15 02:48:26 +05:00
# FEED_EXPORT_FIELDS option allows to order export fields
# and to select a subset of fields to export, both for Items and dicts.
for item_cls in [self.MyItem, dict]:
items = [
item_cls({'foo': 'bar1', 'egg': 'spam1'}),
item_cls({'foo': 'bar2', 'egg': 'spam2', 'baz': 'quux2'}),
]
# export all columns
settings = {'FEED_EXPORT_FIELDS': 'foo,baz,egg'}
rows = [
{'egg': 'spam1', 'foo': 'bar1', 'baz': ''},
{'egg': 'spam2', 'foo': 'bar2', 'baz': 'quux2'}
]
yield self.assertExported(items, ['foo', 'baz', 'egg'], rows,
settings=settings, ordered=True)
2015-04-15 02:48:26 +05:00
# export a subset of columns
settings = {'FEED_EXPORT_FIELDS': 'egg,baz'}
rows = [
{'egg': 'spam1', 'baz': ''},
{'egg': 'spam2', 'baz': 'quux2'}
]
yield self.assertExported(items, ['egg', 'baz'], rows,
settings=settings, ordered=True)