mirror of
https://github.com/scrapy/scrapy.git
synced 2025-02-06 10:24:24 +00:00
2931 lines
99 KiB
Python
2931 lines
99 KiB
Python
from __future__ import annotations
|
|
|
|
import bz2
|
|
import csv
|
|
import gzip
|
|
import json
|
|
import lzma
|
|
import random
|
|
import shutil
|
|
import string
|
|
import sys
|
|
import tempfile
|
|
import warnings
|
|
from abc import ABC, abstractmethod
|
|
from collections import defaultdict
|
|
from io import BytesIO
|
|
from logging import getLogger
|
|
from pathlib import Path
|
|
from string import ascii_letters, digits
|
|
from typing import TYPE_CHECKING
|
|
from unittest import mock
|
|
from urllib.parse import quote, urljoin
|
|
from urllib.request import pathname2url
|
|
|
|
import lxml.etree
|
|
import pytest
|
|
from testfixtures import LogCapture
|
|
from twisted.internet import defer
|
|
from twisted.trial import unittest
|
|
from w3lib.url import file_uri_to_path, path_to_file_uri
|
|
from zope.interface import implementer
|
|
from zope.interface.verify import verifyObject
|
|
|
|
import scrapy
|
|
from scrapy import signals
|
|
from scrapy.exceptions import NotConfigured, ScrapyDeprecationWarning
|
|
from scrapy.exporters import CsvItemExporter, JsonItemExporter
|
|
from scrapy.extensions.feedexport import (
|
|
BlockingFeedStorage,
|
|
FeedExporter,
|
|
FeedSlot,
|
|
FileFeedStorage,
|
|
FTPFeedStorage,
|
|
GCSFeedStorage,
|
|
IFeedStorage,
|
|
S3FeedStorage,
|
|
StdoutFeedStorage,
|
|
)
|
|
from scrapy.settings import Settings
|
|
from scrapy.utils.python import to_unicode
|
|
from scrapy.utils.test import get_crawler, mock_google_cloud_storage
|
|
from tests.mockserver import MockFTPServer, MockServer
|
|
from tests.spiders import ItemSpider
|
|
|
|
if TYPE_CHECKING:
|
|
from os import PathLike
|
|
|
|
|
|
def path_to_url(path):
|
|
return urljoin("file:", pathname2url(str(path)))
|
|
|
|
|
|
def printf_escape(string):
|
|
return string.replace("%", "%%")
|
|
|
|
|
|
def build_url(path: str | PathLike) -> str:
|
|
path_str = str(path)
|
|
if path_str[0] != "/":
|
|
path_str = "/" + path_str
|
|
return urljoin("file:", path_str)
|
|
|
|
|
|
class FileFeedStorageTest(unittest.TestCase):
|
|
def test_store_file_uri(self):
|
|
path = Path(self.mktemp()).resolve()
|
|
uri = path_to_file_uri(str(path))
|
|
return self._assert_stores(FileFeedStorage(uri), path)
|
|
|
|
def test_store_file_uri_makedirs(self):
|
|
path = Path(self.mktemp()).resolve() / "more" / "paths" / "file.txt"
|
|
uri = path_to_file_uri(str(path))
|
|
return self._assert_stores(FileFeedStorage(uri), path)
|
|
|
|
def test_store_direct_path(self):
|
|
path = Path(self.mktemp()).resolve()
|
|
return self._assert_stores(FileFeedStorage(str(path)), path)
|
|
|
|
def test_store_direct_path_relative(self):
|
|
path = Path(self.mktemp())
|
|
return self._assert_stores(FileFeedStorage(str(path)), path)
|
|
|
|
def test_interface(self):
|
|
path = self.mktemp()
|
|
st = FileFeedStorage(path)
|
|
verifyObject(IFeedStorage, st)
|
|
|
|
def _store(self, feed_options=None) -> Path:
|
|
path = Path(self.mktemp()).resolve()
|
|
storage = FileFeedStorage(str(path), feed_options=feed_options)
|
|
spider = scrapy.Spider("default")
|
|
file = storage.open(spider)
|
|
file.write(b"content")
|
|
storage.store(file)
|
|
return path
|
|
|
|
def test_append(self):
|
|
path = self._store()
|
|
return self._assert_stores(FileFeedStorage(str(path)), path, b"contentcontent")
|
|
|
|
def test_overwrite(self):
|
|
path = self._store({"overwrite": True})
|
|
return self._assert_stores(
|
|
FileFeedStorage(str(path), feed_options={"overwrite": True}), path
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def _assert_stores(self, storage, path: Path, expected_content=b"content"):
|
|
spider = scrapy.Spider("default")
|
|
file = storage.open(spider)
|
|
file.write(b"content")
|
|
yield storage.store(file)
|
|
self.assertTrue(path.exists())
|
|
try:
|
|
self.assertEqual(path.read_bytes(), expected_content)
|
|
finally:
|
|
path.unlink()
|
|
|
|
|
|
class FTPFeedStorageTest(unittest.TestCase):
|
|
def get_test_spider(self, settings=None):
|
|
class TestSpider(scrapy.Spider):
|
|
name = "test_spider"
|
|
|
|
crawler = get_crawler(settings_dict=settings)
|
|
return TestSpider.from_crawler(crawler)
|
|
|
|
def _store(self, uri, content, feed_options=None, settings=None):
|
|
crawler = get_crawler(settings_dict=settings or {})
|
|
storage = FTPFeedStorage.from_crawler(
|
|
crawler,
|
|
uri,
|
|
feed_options=feed_options,
|
|
)
|
|
verifyObject(IFeedStorage, storage)
|
|
spider = self.get_test_spider()
|
|
file = storage.open(spider)
|
|
file.write(content)
|
|
return storage.store(file)
|
|
|
|
def _assert_stored(self, path: Path, content):
|
|
self.assertTrue(path.exists())
|
|
try:
|
|
self.assertEqual(path.read_bytes(), content)
|
|
finally:
|
|
path.unlink()
|
|
|
|
@defer.inlineCallbacks
|
|
def test_append(self):
|
|
with MockFTPServer() as ftp_server:
|
|
filename = "file"
|
|
url = ftp_server.url(filename)
|
|
feed_options = {"overwrite": False}
|
|
yield self._store(url, b"foo", feed_options=feed_options)
|
|
yield self._store(url, b"bar", feed_options=feed_options)
|
|
self._assert_stored(ftp_server.path / filename, b"foobar")
|
|
|
|
@defer.inlineCallbacks
|
|
def test_overwrite(self):
|
|
with MockFTPServer() as ftp_server:
|
|
filename = "file"
|
|
url = ftp_server.url(filename)
|
|
yield self._store(url, b"foo")
|
|
yield self._store(url, b"bar")
|
|
self._assert_stored(ftp_server.path / filename, b"bar")
|
|
|
|
@defer.inlineCallbacks
|
|
def test_append_active_mode(self):
|
|
with MockFTPServer() as ftp_server:
|
|
settings = {"FEED_STORAGE_FTP_ACTIVE": True}
|
|
filename = "file"
|
|
url = ftp_server.url(filename)
|
|
feed_options = {"overwrite": False}
|
|
yield self._store(url, b"foo", feed_options=feed_options, settings=settings)
|
|
yield self._store(url, b"bar", feed_options=feed_options, settings=settings)
|
|
self._assert_stored(ftp_server.path / filename, b"foobar")
|
|
|
|
@defer.inlineCallbacks
|
|
def test_overwrite_active_mode(self):
|
|
with MockFTPServer() as ftp_server:
|
|
settings = {"FEED_STORAGE_FTP_ACTIVE": True}
|
|
filename = "file"
|
|
url = ftp_server.url(filename)
|
|
yield self._store(url, b"foo", settings=settings)
|
|
yield self._store(url, b"bar", settings=settings)
|
|
self._assert_stored(ftp_server.path / filename, b"bar")
|
|
|
|
def test_uri_auth_quote(self):
|
|
# RFC3986: 3.2.1. User Information
|
|
pw_quoted = quote(string.punctuation, safe="")
|
|
st = FTPFeedStorage(f"ftp://foo:{pw_quoted}@example.com/some_path", {})
|
|
self.assertEqual(st.password, string.punctuation)
|
|
|
|
|
|
class BlockingFeedStorageTest(unittest.TestCase):
|
|
def get_test_spider(self, settings=None):
|
|
class TestSpider(scrapy.Spider):
|
|
name = "test_spider"
|
|
|
|
crawler = get_crawler(settings_dict=settings)
|
|
return TestSpider.from_crawler(crawler)
|
|
|
|
def test_default_temp_dir(self):
|
|
b = BlockingFeedStorage()
|
|
|
|
tmp = b.open(self.get_test_spider())
|
|
tmp_path = Path(tmp.name).parent
|
|
self.assertEqual(str(tmp_path), tempfile.gettempdir())
|
|
|
|
def test_temp_file(self):
|
|
b = BlockingFeedStorage()
|
|
|
|
tests_path = Path(__file__).resolve().parent
|
|
spider = self.get_test_spider({"FEED_TEMPDIR": str(tests_path)})
|
|
tmp = b.open(spider)
|
|
tmp_path = Path(tmp.name).parent
|
|
self.assertEqual(tmp_path, tests_path)
|
|
|
|
def test_invalid_folder(self):
|
|
b = BlockingFeedStorage()
|
|
|
|
tests_path = Path(__file__).resolve().parent
|
|
invalid_path = tests_path / "invalid_path"
|
|
spider = self.get_test_spider({"FEED_TEMPDIR": str(invalid_path)})
|
|
|
|
self.assertRaises(OSError, b.open, spider=spider)
|
|
|
|
|
|
@pytest.mark.requires_boto3
|
|
class S3FeedStorageTest(unittest.TestCase):
|
|
def test_parse_credentials(self):
|
|
aws_credentials = {
|
|
"AWS_ACCESS_KEY_ID": "settings_key",
|
|
"AWS_SECRET_ACCESS_KEY": "settings_secret",
|
|
"AWS_SESSION_TOKEN": "settings_token",
|
|
}
|
|
crawler = get_crawler(settings_dict=aws_credentials)
|
|
# Instantiate with crawler
|
|
storage = S3FeedStorage.from_crawler(
|
|
crawler,
|
|
"s3://mybucket/export.csv",
|
|
)
|
|
self.assertEqual(storage.access_key, "settings_key")
|
|
self.assertEqual(storage.secret_key, "settings_secret")
|
|
self.assertEqual(storage.session_token, "settings_token")
|
|
# Instantiate directly
|
|
storage = S3FeedStorage(
|
|
"s3://mybucket/export.csv",
|
|
aws_credentials["AWS_ACCESS_KEY_ID"],
|
|
aws_credentials["AWS_SECRET_ACCESS_KEY"],
|
|
session_token=aws_credentials["AWS_SESSION_TOKEN"],
|
|
)
|
|
self.assertEqual(storage.access_key, "settings_key")
|
|
self.assertEqual(storage.secret_key, "settings_secret")
|
|
self.assertEqual(storage.session_token, "settings_token")
|
|
# URI priority > settings priority
|
|
storage = S3FeedStorage(
|
|
"s3://uri_key:uri_secret@mybucket/export.csv",
|
|
aws_credentials["AWS_ACCESS_KEY_ID"],
|
|
aws_credentials["AWS_SECRET_ACCESS_KEY"],
|
|
)
|
|
self.assertEqual(storage.access_key, "uri_key")
|
|
self.assertEqual(storage.secret_key, "uri_secret")
|
|
|
|
@defer.inlineCallbacks
|
|
def test_store(self):
|
|
settings = {
|
|
"AWS_ACCESS_KEY_ID": "access_key",
|
|
"AWS_SECRET_ACCESS_KEY": "secret_key",
|
|
}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
bucket = "mybucket"
|
|
key = "export.csv"
|
|
storage = S3FeedStorage.from_crawler(crawler, f"s3://{bucket}/{key}")
|
|
verifyObject(IFeedStorage, storage)
|
|
|
|
file = mock.MagicMock()
|
|
|
|
storage.s3_client = mock.MagicMock()
|
|
yield storage.store(file)
|
|
self.assertEqual(
|
|
storage.s3_client.upload_fileobj.call_args,
|
|
mock.call(Bucket=bucket, Key=key, Fileobj=file),
|
|
)
|
|
|
|
def test_init_without_acl(self):
|
|
storage = S3FeedStorage("s3://mybucket/export.csv", "access_key", "secret_key")
|
|
self.assertEqual(storage.access_key, "access_key")
|
|
self.assertEqual(storage.secret_key, "secret_key")
|
|
self.assertEqual(storage.acl, None)
|
|
|
|
def test_init_with_acl(self):
|
|
storage = S3FeedStorage(
|
|
"s3://mybucket/export.csv", "access_key", "secret_key", "custom-acl"
|
|
)
|
|
self.assertEqual(storage.access_key, "access_key")
|
|
self.assertEqual(storage.secret_key, "secret_key")
|
|
self.assertEqual(storage.acl, "custom-acl")
|
|
|
|
def test_init_with_endpoint_url(self):
|
|
storage = S3FeedStorage(
|
|
"s3://mybucket/export.csv",
|
|
"access_key",
|
|
"secret_key",
|
|
endpoint_url="https://example.com",
|
|
)
|
|
self.assertEqual(storage.access_key, "access_key")
|
|
self.assertEqual(storage.secret_key, "secret_key")
|
|
self.assertEqual(storage.endpoint_url, "https://example.com")
|
|
|
|
def test_init_with_region_name(self):
|
|
region_name = "ap-east-1"
|
|
storage = S3FeedStorage(
|
|
"s3://mybucket/export.csv",
|
|
"access_key",
|
|
"secret_key",
|
|
region_name=region_name,
|
|
)
|
|
self.assertEqual(storage.access_key, "access_key")
|
|
self.assertEqual(storage.secret_key, "secret_key")
|
|
self.assertEqual(storage.region_name, region_name)
|
|
self.assertEqual(storage.s3_client._client_config.region_name, region_name)
|
|
|
|
def test_from_crawler_without_acl(self):
|
|
settings = {
|
|
"AWS_ACCESS_KEY_ID": "access_key",
|
|
"AWS_SECRET_ACCESS_KEY": "secret_key",
|
|
}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
storage = S3FeedStorage.from_crawler(
|
|
crawler,
|
|
"s3://mybucket/export.csv",
|
|
)
|
|
self.assertEqual(storage.access_key, "access_key")
|
|
self.assertEqual(storage.secret_key, "secret_key")
|
|
self.assertEqual(storage.acl, None)
|
|
|
|
def test_without_endpoint_url(self):
|
|
settings = {
|
|
"AWS_ACCESS_KEY_ID": "access_key",
|
|
"AWS_SECRET_ACCESS_KEY": "secret_key",
|
|
}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
storage = S3FeedStorage.from_crawler(
|
|
crawler,
|
|
"s3://mybucket/export.csv",
|
|
)
|
|
self.assertEqual(storage.access_key, "access_key")
|
|
self.assertEqual(storage.secret_key, "secret_key")
|
|
self.assertEqual(storage.endpoint_url, None)
|
|
|
|
def test_without_region_name(self):
|
|
settings = {
|
|
"AWS_ACCESS_KEY_ID": "access_key",
|
|
"AWS_SECRET_ACCESS_KEY": "secret_key",
|
|
}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
storage = S3FeedStorage.from_crawler(
|
|
crawler,
|
|
"s3://mybucket/export.csv",
|
|
)
|
|
self.assertEqual(storage.access_key, "access_key")
|
|
self.assertEqual(storage.secret_key, "secret_key")
|
|
self.assertEqual(storage.s3_client._client_config.region_name, "us-east-1")
|
|
|
|
def test_from_crawler_with_acl(self):
|
|
settings = {
|
|
"AWS_ACCESS_KEY_ID": "access_key",
|
|
"AWS_SECRET_ACCESS_KEY": "secret_key",
|
|
"FEED_STORAGE_S3_ACL": "custom-acl",
|
|
}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
storage = S3FeedStorage.from_crawler(
|
|
crawler,
|
|
"s3://mybucket/export.csv",
|
|
)
|
|
self.assertEqual(storage.access_key, "access_key")
|
|
self.assertEqual(storage.secret_key, "secret_key")
|
|
self.assertEqual(storage.acl, "custom-acl")
|
|
|
|
def test_from_crawler_with_endpoint_url(self):
|
|
settings = {
|
|
"AWS_ACCESS_KEY_ID": "access_key",
|
|
"AWS_SECRET_ACCESS_KEY": "secret_key",
|
|
"AWS_ENDPOINT_URL": "https://example.com",
|
|
}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
storage = S3FeedStorage.from_crawler(crawler, "s3://mybucket/export.csv")
|
|
self.assertEqual(storage.access_key, "access_key")
|
|
self.assertEqual(storage.secret_key, "secret_key")
|
|
self.assertEqual(storage.endpoint_url, "https://example.com")
|
|
|
|
def test_from_crawler_with_region_name(self):
|
|
region_name = "ap-east-1"
|
|
settings = {
|
|
"AWS_ACCESS_KEY_ID": "access_key",
|
|
"AWS_SECRET_ACCESS_KEY": "secret_key",
|
|
"AWS_REGION_NAME": region_name,
|
|
}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
storage = S3FeedStorage.from_crawler(crawler, "s3://mybucket/export.csv")
|
|
self.assertEqual(storage.access_key, "access_key")
|
|
self.assertEqual(storage.secret_key, "secret_key")
|
|
self.assertEqual(storage.region_name, region_name)
|
|
self.assertEqual(storage.s3_client._client_config.region_name, region_name)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_store_without_acl(self):
|
|
storage = S3FeedStorage(
|
|
"s3://mybucket/export.csv",
|
|
"access_key",
|
|
"secret_key",
|
|
)
|
|
self.assertEqual(storage.access_key, "access_key")
|
|
self.assertEqual(storage.secret_key, "secret_key")
|
|
self.assertEqual(storage.acl, None)
|
|
|
|
storage.s3_client = mock.MagicMock()
|
|
yield storage.store(BytesIO(b"test file"))
|
|
acl = (
|
|
storage.s3_client.upload_fileobj.call_args[1]
|
|
.get("ExtraArgs", {})
|
|
.get("ACL")
|
|
)
|
|
self.assertIsNone(acl)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_store_with_acl(self):
|
|
storage = S3FeedStorage(
|
|
"s3://mybucket/export.csv", "access_key", "secret_key", "custom-acl"
|
|
)
|
|
self.assertEqual(storage.access_key, "access_key")
|
|
self.assertEqual(storage.secret_key, "secret_key")
|
|
self.assertEqual(storage.acl, "custom-acl")
|
|
|
|
storage.s3_client = mock.MagicMock()
|
|
yield storage.store(BytesIO(b"test file"))
|
|
acl = storage.s3_client.upload_fileobj.call_args[1]["ExtraArgs"]["ACL"]
|
|
self.assertEqual(acl, "custom-acl")
|
|
|
|
def test_overwrite_default(self):
|
|
with LogCapture() as log:
|
|
S3FeedStorage(
|
|
"s3://mybucket/export.csv", "access_key", "secret_key", "custom-acl"
|
|
)
|
|
self.assertNotIn("S3 does not support appending to files", str(log))
|
|
|
|
def test_overwrite_false(self):
|
|
with LogCapture() as log:
|
|
S3FeedStorage(
|
|
"s3://mybucket/export.csv",
|
|
"access_key",
|
|
"secret_key",
|
|
"custom-acl",
|
|
feed_options={"overwrite": False},
|
|
)
|
|
self.assertIn("S3 does not support appending to files", str(log))
|
|
|
|
|
|
class GCSFeedStorageTest(unittest.TestCase):
|
|
def test_parse_settings(self):
|
|
try:
|
|
from google.cloud.storage import Client # noqa: F401
|
|
except ImportError:
|
|
raise unittest.SkipTest("GCSFeedStorage requires google-cloud-storage")
|
|
|
|
settings = {"GCS_PROJECT_ID": "123", "FEED_STORAGE_GCS_ACL": "publicRead"}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
storage = GCSFeedStorage.from_crawler(crawler, "gs://mybucket/export.csv")
|
|
assert storage.project_id == "123"
|
|
assert storage.acl == "publicRead"
|
|
assert storage.bucket_name == "mybucket"
|
|
assert storage.blob_name == "export.csv"
|
|
|
|
def test_parse_empty_acl(self):
|
|
try:
|
|
from google.cloud.storage import Client # noqa: F401
|
|
except ImportError:
|
|
raise unittest.SkipTest("GCSFeedStorage requires google-cloud-storage")
|
|
|
|
settings = {"GCS_PROJECT_ID": "123", "FEED_STORAGE_GCS_ACL": ""}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
storage = GCSFeedStorage.from_crawler(crawler, "gs://mybucket/export.csv")
|
|
assert storage.acl is None
|
|
|
|
settings = {"GCS_PROJECT_ID": "123", "FEED_STORAGE_GCS_ACL": None}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
storage = GCSFeedStorage.from_crawler(crawler, "gs://mybucket/export.csv")
|
|
assert storage.acl is None
|
|
|
|
@defer.inlineCallbacks
|
|
def test_store(self):
|
|
try:
|
|
from google.cloud.storage import Client # noqa: F401
|
|
except ImportError:
|
|
raise unittest.SkipTest("GCSFeedStorage requires google-cloud-storage")
|
|
|
|
uri = "gs://mybucket/export.csv"
|
|
project_id = "myproject-123"
|
|
acl = "publicRead"
|
|
(client_mock, bucket_mock, blob_mock) = mock_google_cloud_storage()
|
|
with mock.patch("google.cloud.storage.Client") as m:
|
|
m.return_value = client_mock
|
|
|
|
f = mock.Mock()
|
|
storage = GCSFeedStorage(uri, project_id, acl)
|
|
yield storage.store(f)
|
|
|
|
f.seek.assert_called_once_with(0)
|
|
m.assert_called_once_with(project=project_id)
|
|
client_mock.get_bucket.assert_called_once_with("mybucket")
|
|
bucket_mock.blob.assert_called_once_with("export.csv")
|
|
blob_mock.upload_from_file.assert_called_once_with(f, predefined_acl=acl)
|
|
|
|
def test_overwrite_default(self):
|
|
with LogCapture() as log:
|
|
GCSFeedStorage("gs://mybucket/export.csv", "myproject-123", "custom-acl")
|
|
self.assertNotIn("GCS does not support appending to files", str(log))
|
|
|
|
def test_overwrite_false(self):
|
|
with LogCapture() as log:
|
|
GCSFeedStorage(
|
|
"gs://mybucket/export.csv",
|
|
"myproject-123",
|
|
"custom-acl",
|
|
feed_options={"overwrite": False},
|
|
)
|
|
self.assertIn("GCS does not support appending to files", str(log))
|
|
|
|
|
|
class StdoutFeedStorageTest(unittest.TestCase):
|
|
@defer.inlineCallbacks
|
|
def test_store(self):
|
|
out = BytesIO()
|
|
storage = StdoutFeedStorage("stdout:", _stdout=out)
|
|
file = storage.open(scrapy.Spider("default"))
|
|
file.write(b"content")
|
|
yield storage.store(file)
|
|
self.assertEqual(out.getvalue(), b"content")
|
|
|
|
def test_overwrite_default(self):
|
|
with LogCapture() as log:
|
|
StdoutFeedStorage("stdout:")
|
|
self.assertNotIn(
|
|
"Standard output (stdout) storage does not support overwriting", str(log)
|
|
)
|
|
|
|
def test_overwrite_true(self):
|
|
with LogCapture() as log:
|
|
StdoutFeedStorage("stdout:", feed_options={"overwrite": True})
|
|
self.assertIn(
|
|
"Standard output (stdout) storage does not support overwriting", str(log)
|
|
)
|
|
|
|
|
|
class FromCrawlerMixin:
|
|
init_with_crawler = False
|
|
|
|
@classmethod
|
|
def from_crawler(cls, crawler, *args, feed_options=None, **kwargs):
|
|
cls.init_with_crawler = True
|
|
return cls(*args, **kwargs)
|
|
|
|
|
|
class FromCrawlerCsvItemExporter(CsvItemExporter, FromCrawlerMixin):
|
|
pass
|
|
|
|
|
|
class FromCrawlerFileFeedStorage(FileFeedStorage, FromCrawlerMixin):
|
|
@classmethod
|
|
def from_crawler(cls, crawler, *args, feed_options=None, **kwargs):
|
|
cls.init_with_crawler = True
|
|
return cls(*args, feed_options=feed_options, **kwargs)
|
|
|
|
|
|
class DummyBlockingFeedStorage(BlockingFeedStorage):
|
|
def __init__(self, uri, *args, feed_options=None):
|
|
self.path = Path(file_uri_to_path(uri))
|
|
|
|
def _store_in_thread(self, file):
|
|
dirname = self.path.parent
|
|
if dirname and not dirname.exists():
|
|
dirname.mkdir(parents=True)
|
|
with self.path.open("ab") as output_file:
|
|
output_file.write(file.read())
|
|
|
|
file.close()
|
|
|
|
|
|
class FailingBlockingFeedStorage(DummyBlockingFeedStorage):
|
|
def _store_in_thread(self, file):
|
|
raise OSError("Cannot store")
|
|
|
|
|
|
@implementer(IFeedStorage)
|
|
class LogOnStoreFileStorage:
|
|
"""
|
|
This storage logs inside `store` method.
|
|
It can be used to make sure `store` method is invoked.
|
|
"""
|
|
|
|
def __init__(self, uri, feed_options=None):
|
|
self.path = file_uri_to_path(uri)
|
|
self.logger = getLogger()
|
|
|
|
def open(self, spider):
|
|
return tempfile.NamedTemporaryFile(prefix="feed-")
|
|
|
|
def store(self, file):
|
|
self.logger.info("Storage.store is called")
|
|
file.close()
|
|
|
|
|
|
class FeedExportTestBase(ABC, unittest.TestCase):
|
|
class MyItem(scrapy.Item):
|
|
foo = scrapy.Field()
|
|
egg = scrapy.Field()
|
|
baz = scrapy.Field()
|
|
|
|
class MyItem2(scrapy.Item):
|
|
foo = scrapy.Field()
|
|
hello = scrapy.Field()
|
|
|
|
def _random_temp_filename(self, inter_dir="") -> Path:
|
|
chars = [random.choice(ascii_letters + digits) for _ in range(15)]
|
|
filename = "".join(chars)
|
|
return Path(self.temp_dir, inter_dir, filename)
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.mockserver = MockServer()
|
|
cls.mockserver.__enter__()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
cls.mockserver.__exit__(None, None, None)
|
|
|
|
def setUp(self):
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
|
|
|
@defer.inlineCallbacks
|
|
def exported_data(self, items, settings):
|
|
"""
|
|
Return exported data which a spider yielding ``items`` would return.
|
|
"""
|
|
|
|
class TestSpider(scrapy.Spider):
|
|
name = "testspider"
|
|
|
|
def parse(self, response):
|
|
yield from items
|
|
|
|
data = yield self.run_and_export(TestSpider, settings)
|
|
return data
|
|
|
|
@defer.inlineCallbacks
|
|
def exported_no_data(self, settings):
|
|
"""
|
|
Return exported data which a spider yielding no ``items`` would return.
|
|
"""
|
|
|
|
class TestSpider(scrapy.Spider):
|
|
name = "testspider"
|
|
|
|
def parse(self, response):
|
|
pass
|
|
|
|
data = yield self.run_and_export(TestSpider, settings)
|
|
return data
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExported(self, items, header, rows, settings=None):
|
|
yield self.assertExportedCsv(items, header, rows, settings)
|
|
yield self.assertExportedJsonLines(items, rows, settings)
|
|
yield self.assertExportedXml(items, rows, settings)
|
|
yield self.assertExportedPickle(items, rows, settings)
|
|
yield self.assertExportedMarshal(items, rows, settings)
|
|
yield self.assertExportedMultiple(items, rows, settings)
|
|
|
|
@abstractmethod
|
|
def run_and_export(self, spider_cls, settings):
|
|
pass
|
|
|
|
def _load_until_eof(self, data, load_func):
|
|
result = []
|
|
with tempfile.TemporaryFile() as temp:
|
|
temp.write(data)
|
|
temp.seek(0)
|
|
while True:
|
|
try:
|
|
result.append(load_func(temp))
|
|
except EOFError:
|
|
break
|
|
return result
|
|
|
|
|
|
class InstrumentedFeedSlot(FeedSlot):
|
|
"""Instrumented FeedSlot subclass for keeping track of calls to
|
|
start_exporting and finish_exporting."""
|
|
|
|
def start_exporting(self):
|
|
self.update_listener("start")
|
|
super().start_exporting()
|
|
|
|
def finish_exporting(self):
|
|
self.update_listener("finish")
|
|
super().finish_exporting()
|
|
|
|
@classmethod
|
|
def subscribe__listener(cls, listener):
|
|
cls.update_listener = listener.update
|
|
|
|
|
|
class IsExportingListener:
|
|
"""When subscribed to InstrumentedFeedSlot, keeps track of when
|
|
a call to start_exporting has been made without a closing call to
|
|
finish_exporting and when a call to finish_exporting has been made
|
|
before a call to start_exporting."""
|
|
|
|
def __init__(self):
|
|
self.start_without_finish = False
|
|
self.finish_without_start = False
|
|
|
|
def update(self, method):
|
|
if method == "start":
|
|
self.start_without_finish = True
|
|
elif method == "finish":
|
|
if self.start_without_finish:
|
|
self.start_without_finish = False
|
|
else:
|
|
self.finish_before_start = True
|
|
|
|
|
|
class ExceptionJsonItemExporter(JsonItemExporter):
|
|
"""JsonItemExporter that throws an exception every time export_item is called."""
|
|
|
|
def export_item(self, _):
|
|
raise RuntimeError("foo")
|
|
|
|
|
|
class FeedExportTest(FeedExportTestBase):
|
|
@defer.inlineCallbacks
|
|
def run_and_export(self, spider_cls, settings):
|
|
"""Run spider with specified settings; return exported data."""
|
|
|
|
FEEDS = settings.get("FEEDS") or {}
|
|
settings["FEEDS"] = {
|
|
printf_escape(path_to_url(file_path)): feed_options
|
|
for file_path, feed_options in FEEDS.items()
|
|
}
|
|
|
|
content = {}
|
|
try:
|
|
spider_cls.start_urls = [self.mockserver.url("/")]
|
|
crawler = get_crawler(spider_cls, settings)
|
|
yield crawler.crawl()
|
|
|
|
for file_path, feed_options in FEEDS.items():
|
|
content[feed_options["format"]] = (
|
|
Path(file_path).read_bytes() if Path(file_path).exists() else None
|
|
)
|
|
|
|
finally:
|
|
for file_path in FEEDS:
|
|
if not Path(file_path).exists():
|
|
continue
|
|
|
|
Path(file_path).unlink()
|
|
|
|
return content
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExportedCsv(self, items, header, rows, settings=None):
|
|
settings = settings or {}
|
|
settings.update(
|
|
{
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "csv"},
|
|
},
|
|
}
|
|
)
|
|
data = yield self.exported_data(items, settings)
|
|
reader = csv.DictReader(to_unicode(data["csv"]).splitlines())
|
|
self.assertEqual(reader.fieldnames, list(header))
|
|
self.assertEqual(rows, list(reader))
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExportedJsonLines(self, items, rows, settings=None):
|
|
settings = settings or {}
|
|
settings.update(
|
|
{
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "jl"},
|
|
},
|
|
}
|
|
)
|
|
data = yield self.exported_data(items, settings)
|
|
parsed = [json.loads(to_unicode(line)) for line in data["jl"].splitlines()]
|
|
rows = [{k: v for k, v in row.items() if v} for row in rows]
|
|
self.assertEqual(rows, parsed)
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExportedXml(self, items, rows, settings=None):
|
|
settings = settings or {}
|
|
settings.update(
|
|
{
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "xml"},
|
|
},
|
|
}
|
|
)
|
|
data = yield self.exported_data(items, settings)
|
|
rows = [{k: v for k, v in row.items() if v} for row in rows]
|
|
root = lxml.etree.fromstring(data["xml"])
|
|
got_rows = [{e.tag: e.text for e in it} for it in root.findall("item")]
|
|
self.assertEqual(rows, got_rows)
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExportedMultiple(self, items, rows, settings=None):
|
|
settings = settings or {}
|
|
settings.update(
|
|
{
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "xml"},
|
|
self._random_temp_filename(): {"format": "json"},
|
|
},
|
|
}
|
|
)
|
|
data = yield self.exported_data(items, settings)
|
|
rows = [{k: v for k, v in row.items() if v} for row in rows]
|
|
# XML
|
|
root = lxml.etree.fromstring(data["xml"])
|
|
xml_rows = [{e.tag: e.text for e in it} for it in root.findall("item")]
|
|
self.assertEqual(rows, xml_rows)
|
|
# JSON
|
|
json_rows = json.loads(to_unicode(data["json"]))
|
|
self.assertEqual(rows, json_rows)
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExportedPickle(self, items, rows, settings=None):
|
|
settings = settings or {}
|
|
settings.update(
|
|
{
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "pickle"},
|
|
},
|
|
}
|
|
)
|
|
data = yield self.exported_data(items, settings)
|
|
expected = [{k: v for k, v in row.items() if v} for row in rows]
|
|
import pickle
|
|
|
|
result = self._load_until_eof(data["pickle"], load_func=pickle.load)
|
|
self.assertEqual(expected, result)
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExportedMarshal(self, items, rows, settings=None):
|
|
settings = settings or {}
|
|
settings.update(
|
|
{
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "marshal"},
|
|
},
|
|
}
|
|
)
|
|
data = yield self.exported_data(items, settings)
|
|
expected = [{k: v for k, v in row.items() if v} for row in rows]
|
|
import marshal
|
|
|
|
result = self._load_until_eof(data["marshal"], load_func=marshal.load)
|
|
self.assertEqual(expected, result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_stats_file_success(self):
|
|
settings = {
|
|
"FEEDS": {
|
|
printf_escape(path_to_url(str(self._random_temp_filename()))): {
|
|
"format": "json",
|
|
}
|
|
},
|
|
}
|
|
crawler = get_crawler(ItemSpider, settings)
|
|
yield crawler.crawl(mockserver=self.mockserver)
|
|
self.assertIn(
|
|
"feedexport/success_count/FileFeedStorage", crawler.stats.get_stats()
|
|
)
|
|
self.assertEqual(
|
|
crawler.stats.get_value("feedexport/success_count/FileFeedStorage"), 1
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_stats_file_failed(self):
|
|
settings = {
|
|
"FEEDS": {
|
|
printf_escape(path_to_url(str(self._random_temp_filename()))): {
|
|
"format": "json",
|
|
}
|
|
},
|
|
}
|
|
crawler = get_crawler(ItemSpider, settings)
|
|
with mock.patch(
|
|
"scrapy.extensions.feedexport.FileFeedStorage.store",
|
|
side_effect=KeyError("foo"),
|
|
):
|
|
yield crawler.crawl(mockserver=self.mockserver)
|
|
self.assertIn(
|
|
"feedexport/failed_count/FileFeedStorage", crawler.stats.get_stats()
|
|
)
|
|
self.assertEqual(
|
|
crawler.stats.get_value("feedexport/failed_count/FileFeedStorage"), 1
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_stats_multiple_file(self):
|
|
settings = {
|
|
"FEEDS": {
|
|
printf_escape(path_to_url(str(self._random_temp_filename()))): {
|
|
"format": "json",
|
|
},
|
|
"stdout:": {
|
|
"format": "xml",
|
|
},
|
|
},
|
|
}
|
|
crawler = get_crawler(ItemSpider, settings)
|
|
with mock.patch.object(S3FeedStorage, "store"):
|
|
yield crawler.crawl(mockserver=self.mockserver)
|
|
self.assertIn(
|
|
"feedexport/success_count/FileFeedStorage", crawler.stats.get_stats()
|
|
)
|
|
self.assertIn(
|
|
"feedexport/success_count/StdoutFeedStorage", crawler.stats.get_stats()
|
|
)
|
|
self.assertEqual(
|
|
crawler.stats.get_value("feedexport/success_count/FileFeedStorage"), 1
|
|
)
|
|
self.assertEqual(
|
|
crawler.stats.get_value("feedexport/success_count/StdoutFeedStorage"), 1
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_items(self):
|
|
# feed exporters use field names from Item
|
|
items = [
|
|
self.MyItem({"foo": "bar1", "egg": "spam1"}),
|
|
self.MyItem({"foo": "bar2", "egg": "spam2", "baz": "quux2"}),
|
|
]
|
|
rows = [
|
|
{"egg": "spam1", "foo": "bar1", "baz": ""},
|
|
{"egg": "spam2", "foo": "bar2", "baz": "quux2"},
|
|
]
|
|
header = self.MyItem.fields.keys()
|
|
yield self.assertExported(items, header, rows)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_no_items_not_store_empty(self):
|
|
for fmt in ("json", "jsonlines", "xml", "csv"):
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": fmt},
|
|
},
|
|
"FEED_STORE_EMPTY": False,
|
|
}
|
|
data = yield self.exported_no_data(settings)
|
|
self.assertEqual(None, data[fmt])
|
|
|
|
@defer.inlineCallbacks
|
|
def test_start_finish_exporting_items(self):
|
|
items = [
|
|
self.MyItem({"foo": "bar1", "egg": "spam1"}),
|
|
]
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "json"},
|
|
},
|
|
"FEED_EXPORT_INDENT": None,
|
|
}
|
|
|
|
listener = IsExportingListener()
|
|
InstrumentedFeedSlot.subscribe__listener(listener)
|
|
|
|
with mock.patch("scrapy.extensions.feedexport.FeedSlot", InstrumentedFeedSlot):
|
|
_ = yield self.exported_data(items, settings)
|
|
self.assertFalse(listener.start_without_finish)
|
|
self.assertFalse(listener.finish_without_start)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_start_finish_exporting_no_items(self):
|
|
items = []
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "json"},
|
|
},
|
|
"FEED_EXPORT_INDENT": None,
|
|
}
|
|
|
|
listener = IsExportingListener()
|
|
InstrumentedFeedSlot.subscribe__listener(listener)
|
|
|
|
with mock.patch("scrapy.extensions.feedexport.FeedSlot", InstrumentedFeedSlot):
|
|
_ = yield self.exported_data(items, settings)
|
|
self.assertFalse(listener.start_without_finish)
|
|
self.assertFalse(listener.finish_without_start)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_start_finish_exporting_items_exception(self):
|
|
items = [
|
|
self.MyItem({"foo": "bar1", "egg": "spam1"}),
|
|
]
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "json"},
|
|
},
|
|
"FEED_EXPORTERS": {"json": ExceptionJsonItemExporter},
|
|
"FEED_EXPORT_INDENT": None,
|
|
}
|
|
|
|
listener = IsExportingListener()
|
|
InstrumentedFeedSlot.subscribe__listener(listener)
|
|
|
|
with mock.patch("scrapy.extensions.feedexport.FeedSlot", InstrumentedFeedSlot):
|
|
_ = yield self.exported_data(items, settings)
|
|
self.assertFalse(listener.start_without_finish)
|
|
self.assertFalse(listener.finish_without_start)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_start_finish_exporting_no_items_exception(self):
|
|
items = []
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "json"},
|
|
},
|
|
"FEED_EXPORTERS": {"json": ExceptionJsonItemExporter},
|
|
"FEED_EXPORT_INDENT": None,
|
|
}
|
|
|
|
listener = IsExportingListener()
|
|
InstrumentedFeedSlot.subscribe__listener(listener)
|
|
|
|
with mock.patch("scrapy.extensions.feedexport.FeedSlot", InstrumentedFeedSlot):
|
|
_ = yield self.exported_data(items, settings)
|
|
self.assertFalse(listener.start_without_finish)
|
|
self.assertFalse(listener.finish_without_start)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_no_items_store_empty(self):
|
|
formats = (
|
|
("json", b"[]"),
|
|
("jsonlines", b""),
|
|
("xml", b'<?xml version="1.0" encoding="utf-8"?>\n<items></items>'),
|
|
("csv", b""),
|
|
)
|
|
|
|
for fmt, expctd in formats:
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": fmt},
|
|
},
|
|
"FEED_STORE_EMPTY": True,
|
|
"FEED_EXPORT_INDENT": None,
|
|
}
|
|
data = yield self.exported_no_data(settings)
|
|
self.assertEqual(expctd, data[fmt])
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_no_items_multiple_feeds(self):
|
|
"""Make sure that `storage.store` is called for every feed."""
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "json"},
|
|
self._random_temp_filename(): {"format": "xml"},
|
|
self._random_temp_filename(): {"format": "csv"},
|
|
},
|
|
"FEED_STORAGES": {"file": LogOnStoreFileStorage},
|
|
"FEED_STORE_EMPTY": False,
|
|
}
|
|
|
|
with LogCapture() as log:
|
|
yield self.exported_no_data(settings)
|
|
|
|
self.assertEqual(str(log).count("Storage.store is called"), 0)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_multiple_item_classes(self):
|
|
items = [
|
|
self.MyItem({"foo": "bar1", "egg": "spam1"}),
|
|
self.MyItem2({"hello": "world2", "foo": "bar2"}),
|
|
self.MyItem({"foo": "bar3", "egg": "spam3", "baz": "quux3"}),
|
|
{"hello": "world4", "egg": "spam4"},
|
|
]
|
|
|
|
# by default, Scrapy uses fields of the first Item for CSV and
|
|
# all fields for JSON Lines
|
|
header = self.MyItem.fields.keys()
|
|
rows_csv = [
|
|
{"egg": "spam1", "foo": "bar1", "baz": ""},
|
|
{"egg": "", "foo": "bar2", "baz": ""},
|
|
{"egg": "spam3", "foo": "bar3", "baz": "quux3"},
|
|
{"egg": "spam4", "foo": "", "baz": ""},
|
|
]
|
|
rows_jl = [dict(row) for row in items]
|
|
yield self.assertExportedCsv(items, header, rows_csv)
|
|
yield self.assertExportedJsonLines(items, rows_jl)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_items_empty_field_list(self):
|
|
# FEED_EXPORT_FIELDS==[] means the same as default None
|
|
items = [{"foo": "bar"}]
|
|
header = ["foo"]
|
|
rows = [{"foo": "bar"}]
|
|
settings = {"FEED_EXPORT_FIELDS": []}
|
|
yield self.assertExportedCsv(items, header, rows)
|
|
yield self.assertExportedJsonLines(items, rows, settings)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_items_field_list(self):
|
|
items = [{"foo": "bar"}]
|
|
header = ["foo", "baz"]
|
|
rows = [{"foo": "bar", "baz": ""}]
|
|
settings = {"FEED_EXPORT_FIELDS": header}
|
|
yield self.assertExported(items, header, rows, settings=settings)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_items_comma_separated_field_list(self):
|
|
items = [{"foo": "bar"}]
|
|
header = ["foo", "baz"]
|
|
rows = [{"foo": "bar", "baz": ""}]
|
|
settings = {"FEED_EXPORT_FIELDS": ",".join(header)}
|
|
yield self.assertExported(items, header, rows, settings=settings)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_items_json_field_list(self):
|
|
items = [{"foo": "bar"}]
|
|
header = ["foo", "baz"]
|
|
rows = [{"foo": "bar", "baz": ""}]
|
|
settings = {"FEED_EXPORT_FIELDS": json.dumps(header)}
|
|
yield self.assertExported(items, header, rows, settings=settings)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_items_field_names(self):
|
|
items = [{"foo": "bar"}]
|
|
header = {"foo": "Foo"}
|
|
rows = [{"Foo": "bar"}]
|
|
settings = {"FEED_EXPORT_FIELDS": header}
|
|
yield self.assertExported(items, list(header.values()), rows, settings=settings)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_items_dict_field_names(self):
|
|
items = [{"foo": "bar"}]
|
|
header = {
|
|
"baz": "Baz",
|
|
"foo": "Foo",
|
|
}
|
|
rows = [{"Baz": "", "Foo": "bar"}]
|
|
settings = {"FEED_EXPORT_FIELDS": header}
|
|
yield self.assertExported(items, ["Baz", "Foo"], rows, settings=settings)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_items_json_field_names(self):
|
|
items = [{"foo": "bar"}]
|
|
header = {"foo": "Foo"}
|
|
rows = [{"Foo": "bar"}]
|
|
settings = {"FEED_EXPORT_FIELDS": json.dumps(header)}
|
|
yield self.assertExported(items, list(header.values()), rows, settings=settings)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_based_on_item_classes(self):
|
|
items = [
|
|
self.MyItem({"foo": "bar1", "egg": "spam1"}),
|
|
self.MyItem2({"hello": "world2", "foo": "bar2"}),
|
|
{"hello": "world3", "egg": "spam3"},
|
|
]
|
|
|
|
formats = {
|
|
"csv": b"baz,egg,foo\r\n,spam1,bar1\r\n",
|
|
"json": b'[\n{"hello": "world2", "foo": "bar2"}\n]',
|
|
"jsonlines": (
|
|
b'{"foo": "bar1", "egg": "spam1"}\n{"hello": "world2", "foo": "bar2"}\n'
|
|
),
|
|
"xml": (
|
|
b'<?xml version="1.0" encoding="utf-8"?>\n<items>\n<item>'
|
|
b"<foo>bar1</foo><egg>spam1</egg></item>\n<item><hello>"
|
|
b"world2</hello><foo>bar2</foo></item>\n<item><hello>world3"
|
|
b"</hello><egg>spam3</egg></item>\n</items>"
|
|
),
|
|
}
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {
|
|
"format": "csv",
|
|
"item_classes": [self.MyItem],
|
|
},
|
|
self._random_temp_filename(): {
|
|
"format": "json",
|
|
"item_classes": [self.MyItem2],
|
|
},
|
|
self._random_temp_filename(): {
|
|
"format": "jsonlines",
|
|
"item_classes": [self.MyItem, self.MyItem2],
|
|
},
|
|
self._random_temp_filename(): {
|
|
"format": "xml",
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(items, settings)
|
|
for fmt, expected in formats.items():
|
|
self.assertEqual(expected, data[fmt])
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_based_on_custom_filters(self):
|
|
items = [
|
|
self.MyItem({"foo": "bar1", "egg": "spam1"}),
|
|
self.MyItem2({"hello": "world2", "foo": "bar2"}),
|
|
{"hello": "world3", "egg": "spam3"},
|
|
]
|
|
|
|
MyItem = self.MyItem
|
|
|
|
class CustomFilter1:
|
|
def __init__(self, feed_options):
|
|
pass
|
|
|
|
def accepts(self, item):
|
|
return isinstance(item, MyItem)
|
|
|
|
class CustomFilter2(scrapy.extensions.feedexport.ItemFilter):
|
|
def accepts(self, item):
|
|
return "foo" in item.fields
|
|
|
|
class CustomFilter3(scrapy.extensions.feedexport.ItemFilter):
|
|
def accepts(self, item):
|
|
return (
|
|
isinstance(item, tuple(self.item_classes)) and item["foo"] == "bar1"
|
|
)
|
|
|
|
formats = {
|
|
"json": b'[\n{"foo": "bar1", "egg": "spam1"}\n]',
|
|
"xml": (
|
|
b'<?xml version="1.0" encoding="utf-8"?>\n<items>\n<item>'
|
|
b"<foo>bar1</foo><egg>spam1</egg></item>\n<item><hello>"
|
|
b"world2</hello><foo>bar2</foo></item>\n</items>"
|
|
),
|
|
"jsonlines": b'{"foo": "bar1", "egg": "spam1"}\n',
|
|
}
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {
|
|
"format": "json",
|
|
"item_filter": CustomFilter1,
|
|
},
|
|
self._random_temp_filename(): {
|
|
"format": "xml",
|
|
"item_filter": CustomFilter2,
|
|
},
|
|
self._random_temp_filename(): {
|
|
"format": "jsonlines",
|
|
"item_classes": [self.MyItem, self.MyItem2],
|
|
"item_filter": CustomFilter3,
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(items, settings)
|
|
for fmt, expected in formats.items():
|
|
self.assertEqual(expected, data[fmt])
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_dicts(self):
|
|
# When dicts are used, only keys from the first row are used as
|
|
# a header for CSV, and all fields are used for JSON Lines.
|
|
items = [
|
|
{"foo": "bar", "egg": "spam"},
|
|
{"foo": "bar", "egg": "spam", "baz": "quux"},
|
|
]
|
|
rows_csv = [{"egg": "spam", "foo": "bar"}, {"egg": "spam", "foo": "bar"}]
|
|
rows_jl = items
|
|
yield self.assertExportedCsv(items, ["foo", "egg"], rows_csv)
|
|
yield self.assertExportedJsonLines(items, rows_jl)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_tuple(self):
|
|
items = [
|
|
{"foo": "bar1", "egg": "spam1"},
|
|
{"foo": "bar2", "egg": "spam2", "baz": "quux"},
|
|
]
|
|
|
|
settings = {"FEED_EXPORT_FIELDS": ("foo", "baz")}
|
|
rows = [{"foo": "bar1", "baz": ""}, {"foo": "bar2", "baz": "quux"}]
|
|
yield self.assertExported(items, ["foo", "baz"], rows, settings=settings)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_feed_export_fields(self):
|
|
# FEED_EXPORT_FIELDS option allows to order export fields
|
|
# and to select a subset of fields to export, both for Items and dicts.
|
|
|
|
for item_cls in [self.MyItem, dict]:
|
|
items = [
|
|
item_cls({"foo": "bar1", "egg": "spam1"}),
|
|
item_cls({"foo": "bar2", "egg": "spam2", "baz": "quux2"}),
|
|
]
|
|
|
|
# export all columns
|
|
settings = {"FEED_EXPORT_FIELDS": "foo,baz,egg"}
|
|
rows = [
|
|
{"egg": "spam1", "foo": "bar1", "baz": ""},
|
|
{"egg": "spam2", "foo": "bar2", "baz": "quux2"},
|
|
]
|
|
yield self.assertExported(
|
|
items, ["foo", "baz", "egg"], rows, settings=settings
|
|
)
|
|
|
|
# export a subset of columns
|
|
settings = {"FEED_EXPORT_FIELDS": "egg,baz"}
|
|
rows = [{"egg": "spam1", "baz": ""}, {"egg": "spam2", "baz": "quux2"}]
|
|
yield self.assertExported(items, ["egg", "baz"], rows, settings=settings)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_encoding(self):
|
|
items = [{"foo": "Test\xd6"}]
|
|
|
|
formats = {
|
|
"json": b'[{"foo": "Test\\u00d6"}]',
|
|
"jsonlines": b'{"foo": "Test\\u00d6"}\n',
|
|
"xml": (
|
|
'<?xml version="1.0" encoding="utf-8"?>\n'
|
|
"<items><item><foo>Test\xd6</foo></item></items>"
|
|
).encode(),
|
|
"csv": "foo\r\nTest\xd6\r\n".encode(),
|
|
}
|
|
|
|
for fmt, expected in formats.items():
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": fmt},
|
|
},
|
|
"FEED_EXPORT_INDENT": None,
|
|
}
|
|
data = yield self.exported_data(items, settings)
|
|
self.assertEqual(expected, data[fmt])
|
|
|
|
formats = {
|
|
"json": b'[{"foo": "Test\xd6"}]',
|
|
"jsonlines": b'{"foo": "Test\xd6"}\n',
|
|
"xml": (
|
|
b'<?xml version="1.0" encoding="latin-1"?>\n'
|
|
b"<items><item><foo>Test\xd6</foo></item></items>"
|
|
),
|
|
"csv": b"foo\r\nTest\xd6\r\n",
|
|
}
|
|
|
|
for fmt, expected in formats.items():
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": fmt},
|
|
},
|
|
"FEED_EXPORT_INDENT": None,
|
|
"FEED_EXPORT_ENCODING": "latin-1",
|
|
}
|
|
data = yield self.exported_data(items, settings)
|
|
self.assertEqual(expected, data[fmt])
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_multiple_configs(self):
|
|
items = [{"foo": "FOO", "bar": "BAR"}]
|
|
|
|
formats = {
|
|
"json": b'[\n{"bar": "BAR"}\n]',
|
|
"xml": (
|
|
b'<?xml version="1.0" encoding="latin-1"?>\n'
|
|
b"<items>\n <item>\n <foo>FOO</foo>\n </item>\n</items>"
|
|
),
|
|
"csv": b"bar,foo\r\nBAR,FOO\r\n",
|
|
}
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {
|
|
"format": "json",
|
|
"indent": 0,
|
|
"fields": ["bar"],
|
|
"encoding": "utf-8",
|
|
},
|
|
self._random_temp_filename(): {
|
|
"format": "xml",
|
|
"indent": 2,
|
|
"fields": ["foo"],
|
|
"encoding": "latin-1",
|
|
},
|
|
self._random_temp_filename(): {
|
|
"format": "csv",
|
|
"indent": None,
|
|
"fields": ["bar", "foo"],
|
|
"encoding": "utf-8",
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(items, settings)
|
|
for fmt, expected in formats.items():
|
|
self.assertEqual(expected, data[fmt])
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_indentation(self):
|
|
items = [
|
|
{"foo": ["bar"]},
|
|
{"key": "value"},
|
|
]
|
|
|
|
test_cases = [
|
|
# JSON
|
|
{
|
|
"format": "json",
|
|
"indent": None,
|
|
"expected": b'[{"foo": ["bar"]},{"key": "value"}]',
|
|
},
|
|
{
|
|
"format": "json",
|
|
"indent": -1,
|
|
"expected": b"""[
|
|
{"foo": ["bar"]},
|
|
{"key": "value"}
|
|
]""",
|
|
},
|
|
{
|
|
"format": "json",
|
|
"indent": 0,
|
|
"expected": b"""[
|
|
{"foo": ["bar"]},
|
|
{"key": "value"}
|
|
]""",
|
|
},
|
|
{
|
|
"format": "json",
|
|
"indent": 2,
|
|
"expected": b"""[
|
|
{
|
|
"foo": [
|
|
"bar"
|
|
]
|
|
},
|
|
{
|
|
"key": "value"
|
|
}
|
|
]""",
|
|
},
|
|
{
|
|
"format": "json",
|
|
"indent": 4,
|
|
"expected": b"""[
|
|
{
|
|
"foo": [
|
|
"bar"
|
|
]
|
|
},
|
|
{
|
|
"key": "value"
|
|
}
|
|
]""",
|
|
},
|
|
{
|
|
"format": "json",
|
|
"indent": 5,
|
|
"expected": b"""[
|
|
{
|
|
"foo": [
|
|
"bar"
|
|
]
|
|
},
|
|
{
|
|
"key": "value"
|
|
}
|
|
]""",
|
|
},
|
|
# XML
|
|
{
|
|
"format": "xml",
|
|
"indent": None,
|
|
"expected": b"""<?xml version="1.0" encoding="utf-8"?>
|
|
<items><item><foo><value>bar</value></foo></item><item><key>value</key></item></items>""",
|
|
},
|
|
{
|
|
"format": "xml",
|
|
"indent": -1,
|
|
"expected": b"""<?xml version="1.0" encoding="utf-8"?>
|
|
<items>
|
|
<item><foo><value>bar</value></foo></item>
|
|
<item><key>value</key></item>
|
|
</items>""",
|
|
},
|
|
{
|
|
"format": "xml",
|
|
"indent": 0,
|
|
"expected": b"""<?xml version="1.0" encoding="utf-8"?>
|
|
<items>
|
|
<item><foo><value>bar</value></foo></item>
|
|
<item><key>value</key></item>
|
|
</items>""",
|
|
},
|
|
{
|
|
"format": "xml",
|
|
"indent": 2,
|
|
"expected": b"""<?xml version="1.0" encoding="utf-8"?>
|
|
<items>
|
|
<item>
|
|
<foo>
|
|
<value>bar</value>
|
|
</foo>
|
|
</item>
|
|
<item>
|
|
<key>value</key>
|
|
</item>
|
|
</items>""",
|
|
},
|
|
{
|
|
"format": "xml",
|
|
"indent": 4,
|
|
"expected": b"""<?xml version="1.0" encoding="utf-8"?>
|
|
<items>
|
|
<item>
|
|
<foo>
|
|
<value>bar</value>
|
|
</foo>
|
|
</item>
|
|
<item>
|
|
<key>value</key>
|
|
</item>
|
|
</items>""",
|
|
},
|
|
{
|
|
"format": "xml",
|
|
"indent": 5,
|
|
"expected": b"""<?xml version="1.0" encoding="utf-8"?>
|
|
<items>
|
|
<item>
|
|
<foo>
|
|
<value>bar</value>
|
|
</foo>
|
|
</item>
|
|
<item>
|
|
<key>value</key>
|
|
</item>
|
|
</items>""",
|
|
},
|
|
]
|
|
|
|
for row in test_cases:
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {
|
|
"format": row["format"],
|
|
"indent": row["indent"],
|
|
},
|
|
},
|
|
}
|
|
data = yield self.exported_data(items, settings)
|
|
self.assertEqual(row["expected"], data[row["format"]])
|
|
|
|
@defer.inlineCallbacks
|
|
def test_init_exporters_storages_with_crawler(self):
|
|
settings = {
|
|
"FEED_EXPORTERS": {"csv": FromCrawlerCsvItemExporter},
|
|
"FEED_STORAGES": {"file": FromCrawlerFileFeedStorage},
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "csv"},
|
|
},
|
|
}
|
|
yield self.exported_data(items=[], settings=settings)
|
|
self.assertTrue(FromCrawlerCsvItemExporter.init_with_crawler)
|
|
self.assertTrue(FromCrawlerFileFeedStorage.init_with_crawler)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_str_uri(self):
|
|
settings = {
|
|
"FEED_STORE_EMPTY": True,
|
|
"FEEDS": {str(self._random_temp_filename()): {"format": "csv"}},
|
|
}
|
|
data = yield self.exported_no_data(settings)
|
|
self.assertEqual(data["csv"], b"")
|
|
|
|
@defer.inlineCallbacks
|
|
def test_multiple_feeds_success_logs_blocking_feed_storage(self):
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "json"},
|
|
self._random_temp_filename(): {"format": "xml"},
|
|
self._random_temp_filename(): {"format": "csv"},
|
|
},
|
|
"FEED_STORAGES": {"file": DummyBlockingFeedStorage},
|
|
}
|
|
items = [
|
|
{"foo": "bar1", "baz": ""},
|
|
{"foo": "bar2", "baz": "quux"},
|
|
]
|
|
with LogCapture() as log:
|
|
yield self.exported_data(items, settings)
|
|
|
|
print(log)
|
|
for fmt in ["json", "xml", "csv"]:
|
|
self.assertIn(f"Stored {fmt} feed (2 items)", str(log))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_multiple_feeds_failing_logs_blocking_feed_storage(self):
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "json"},
|
|
self._random_temp_filename(): {"format": "xml"},
|
|
self._random_temp_filename(): {"format": "csv"},
|
|
},
|
|
"FEED_STORAGES": {"file": FailingBlockingFeedStorage},
|
|
}
|
|
items = [
|
|
{"foo": "bar1", "baz": ""},
|
|
{"foo": "bar2", "baz": "quux"},
|
|
]
|
|
with LogCapture() as log:
|
|
yield self.exported_data(items, settings)
|
|
|
|
print(log)
|
|
for fmt in ["json", "xml", "csv"]:
|
|
self.assertIn(f"Error storing {fmt} feed (2 items)", str(log))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_extend_kwargs(self):
|
|
items = [{"foo": "FOO", "bar": "BAR"}]
|
|
|
|
expected_with_title_csv = b"foo,bar\r\nFOO,BAR\r\n"
|
|
expected_without_title_csv = b"FOO,BAR\r\n"
|
|
test_cases = [
|
|
# with title
|
|
{
|
|
"options": {
|
|
"format": "csv",
|
|
"item_export_kwargs": {"include_headers_line": True},
|
|
},
|
|
"expected": expected_with_title_csv,
|
|
},
|
|
# without title
|
|
{
|
|
"options": {
|
|
"format": "csv",
|
|
"item_export_kwargs": {"include_headers_line": False},
|
|
},
|
|
"expected": expected_without_title_csv,
|
|
},
|
|
]
|
|
|
|
for row in test_cases:
|
|
feed_options = row["options"]
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): feed_options,
|
|
},
|
|
"FEED_EXPORT_INDENT": None,
|
|
}
|
|
|
|
data = yield self.exported_data(items, settings)
|
|
self.assertEqual(row["expected"], data[feed_options["format"]])
|
|
|
|
@defer.inlineCallbacks
|
|
def test_storage_file_no_postprocessing(self):
|
|
@implementer(IFeedStorage)
|
|
class Storage:
|
|
def __init__(self, uri, *, feed_options=None):
|
|
pass
|
|
|
|
def open(self, spider):
|
|
Storage.open_file = tempfile.NamedTemporaryFile(prefix="feed-")
|
|
return Storage.open_file
|
|
|
|
def store(self, file):
|
|
Storage.store_file = file
|
|
file.close()
|
|
|
|
settings = {
|
|
"FEEDS": {self._random_temp_filename(): {"format": "jsonlines"}},
|
|
"FEED_STORAGES": {"file": Storage},
|
|
}
|
|
yield self.exported_no_data(settings)
|
|
self.assertIs(Storage.open_file, Storage.store_file)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_storage_file_postprocessing(self):
|
|
@implementer(IFeedStorage)
|
|
class Storage:
|
|
def __init__(self, uri, *, feed_options=None):
|
|
pass
|
|
|
|
def open(self, spider):
|
|
Storage.open_file = tempfile.NamedTemporaryFile(prefix="feed-")
|
|
return Storage.open_file
|
|
|
|
def store(self, file):
|
|
Storage.store_file = file
|
|
Storage.file_was_closed = file.closed
|
|
file.close()
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {
|
|
"format": "jsonlines",
|
|
"postprocessing": [
|
|
"scrapy.extensions.postprocessing.GzipPlugin",
|
|
],
|
|
},
|
|
},
|
|
"FEED_STORAGES": {"file": Storage},
|
|
}
|
|
yield self.exported_no_data(settings)
|
|
self.assertIs(Storage.open_file, Storage.store_file)
|
|
self.assertFalse(Storage.file_was_closed)
|
|
|
|
|
|
class FeedPostProcessedExportsTest(FeedExportTestBase):
|
|
items = [{"foo": "bar"}]
|
|
expected = b"foo\r\nbar\r\n"
|
|
|
|
class MyPlugin1:
|
|
def __init__(self, file, feed_options):
|
|
self.file = file
|
|
self.feed_options = feed_options
|
|
self.char = self.feed_options.get("plugin1_char", b"")
|
|
|
|
def write(self, data):
|
|
written_count = self.file.write(data)
|
|
written_count += self.file.write(self.char)
|
|
return written_count
|
|
|
|
def close(self):
|
|
self.file.close()
|
|
|
|
def _named_tempfile(self, name) -> str:
|
|
return str(Path(self.temp_dir, name))
|
|
|
|
@defer.inlineCallbacks
|
|
def run_and_export(self, spider_cls, settings):
|
|
"""Run spider with specified settings; return exported data with filename."""
|
|
|
|
FEEDS = settings.get("FEEDS") or {}
|
|
settings["FEEDS"] = {
|
|
printf_escape(path_to_url(file_path)): feed_options
|
|
for file_path, feed_options in FEEDS.items()
|
|
}
|
|
|
|
content = {}
|
|
try:
|
|
spider_cls.start_urls = [self.mockserver.url("/")]
|
|
crawler = get_crawler(spider_cls, settings)
|
|
yield crawler.crawl()
|
|
|
|
for file_path in FEEDS:
|
|
content[str(file_path)] = (
|
|
Path(file_path).read_bytes() if Path(file_path).exists() else None
|
|
)
|
|
|
|
finally:
|
|
for file_path in FEEDS:
|
|
if not Path(file_path).exists():
|
|
continue
|
|
|
|
Path(file_path).unlink()
|
|
|
|
return content
|
|
|
|
def get_gzip_compressed(self, data, compresslevel=9, mtime=0, filename=""):
|
|
data_stream = BytesIO()
|
|
gzipf = gzip.GzipFile(
|
|
fileobj=data_stream,
|
|
filename=filename,
|
|
mtime=mtime,
|
|
compresslevel=compresslevel,
|
|
mode="wb",
|
|
)
|
|
gzipf.write(data)
|
|
gzipf.close()
|
|
data_stream.seek(0)
|
|
return data_stream.read()
|
|
|
|
@defer.inlineCallbacks
|
|
def test_gzip_plugin(self):
|
|
filename = self._named_tempfile("gzip_file")
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
filename: {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.GzipPlugin"],
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
try:
|
|
gzip.decompress(data[filename])
|
|
except OSError:
|
|
self.fail("Received invalid gzip data.")
|
|
|
|
@defer.inlineCallbacks
|
|
def test_gzip_plugin_compresslevel(self):
|
|
filename_to_compressed = {
|
|
self._named_tempfile("compresslevel_0"): self.get_gzip_compressed(
|
|
self.expected, compresslevel=0
|
|
),
|
|
self._named_tempfile("compresslevel_9"): self.get_gzip_compressed(
|
|
self.expected, compresslevel=9
|
|
),
|
|
}
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
self._named_tempfile("compresslevel_0"): {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.GzipPlugin"],
|
|
"gzip_compresslevel": 0,
|
|
"gzip_mtime": 0,
|
|
"gzip_filename": "",
|
|
},
|
|
self._named_tempfile("compresslevel_9"): {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.GzipPlugin"],
|
|
"gzip_compresslevel": 9,
|
|
"gzip_mtime": 0,
|
|
"gzip_filename": "",
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
|
|
for filename, compressed in filename_to_compressed.items():
|
|
result = gzip.decompress(data[filename])
|
|
self.assertEqual(compressed, data[filename])
|
|
self.assertEqual(self.expected, result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_gzip_plugin_mtime(self):
|
|
filename_to_compressed = {
|
|
self._named_tempfile("mtime_123"): self.get_gzip_compressed(
|
|
self.expected, mtime=123
|
|
),
|
|
self._named_tempfile("mtime_123456789"): self.get_gzip_compressed(
|
|
self.expected, mtime=123456789
|
|
),
|
|
}
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
self._named_tempfile("mtime_123"): {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.GzipPlugin"],
|
|
"gzip_mtime": 123,
|
|
"gzip_filename": "",
|
|
},
|
|
self._named_tempfile("mtime_123456789"): {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.GzipPlugin"],
|
|
"gzip_mtime": 123456789,
|
|
"gzip_filename": "",
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
|
|
for filename, compressed in filename_to_compressed.items():
|
|
result = gzip.decompress(data[filename])
|
|
self.assertEqual(compressed, data[filename])
|
|
self.assertEqual(self.expected, result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_gzip_plugin_filename(self):
|
|
filename_to_compressed = {
|
|
self._named_tempfile("filename_FILE1"): self.get_gzip_compressed(
|
|
self.expected, filename="FILE1"
|
|
),
|
|
self._named_tempfile("filename_FILE2"): self.get_gzip_compressed(
|
|
self.expected, filename="FILE2"
|
|
),
|
|
}
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
self._named_tempfile("filename_FILE1"): {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.GzipPlugin"],
|
|
"gzip_mtime": 0,
|
|
"gzip_filename": "FILE1",
|
|
},
|
|
self._named_tempfile("filename_FILE2"): {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.GzipPlugin"],
|
|
"gzip_mtime": 0,
|
|
"gzip_filename": "FILE2",
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
|
|
for filename, compressed in filename_to_compressed.items():
|
|
result = gzip.decompress(data[filename])
|
|
self.assertEqual(compressed, data[filename])
|
|
self.assertEqual(self.expected, result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_lzma_plugin(self):
|
|
filename = self._named_tempfile("lzma_file")
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
filename: {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.LZMAPlugin"],
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
try:
|
|
lzma.decompress(data[filename])
|
|
except lzma.LZMAError:
|
|
self.fail("Received invalid lzma data.")
|
|
|
|
@defer.inlineCallbacks
|
|
def test_lzma_plugin_format(self):
|
|
filename_to_compressed = {
|
|
self._named_tempfile("format_FORMAT_XZ"): lzma.compress(
|
|
self.expected, format=lzma.FORMAT_XZ
|
|
),
|
|
self._named_tempfile("format_FORMAT_ALONE"): lzma.compress(
|
|
self.expected, format=lzma.FORMAT_ALONE
|
|
),
|
|
}
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
self._named_tempfile("format_FORMAT_XZ"): {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.LZMAPlugin"],
|
|
"lzma_format": lzma.FORMAT_XZ,
|
|
},
|
|
self._named_tempfile("format_FORMAT_ALONE"): {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.LZMAPlugin"],
|
|
"lzma_format": lzma.FORMAT_ALONE,
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
|
|
for filename, compressed in filename_to_compressed.items():
|
|
result = lzma.decompress(data[filename])
|
|
self.assertEqual(compressed, data[filename])
|
|
self.assertEqual(self.expected, result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_lzma_plugin_check(self):
|
|
filename_to_compressed = {
|
|
self._named_tempfile("check_CHECK_NONE"): lzma.compress(
|
|
self.expected, check=lzma.CHECK_NONE
|
|
),
|
|
self._named_tempfile("check_CHECK_CRC256"): lzma.compress(
|
|
self.expected, check=lzma.CHECK_SHA256
|
|
),
|
|
}
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
self._named_tempfile("check_CHECK_NONE"): {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.LZMAPlugin"],
|
|
"lzma_check": lzma.CHECK_NONE,
|
|
},
|
|
self._named_tempfile("check_CHECK_CRC256"): {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.LZMAPlugin"],
|
|
"lzma_check": lzma.CHECK_SHA256,
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
|
|
for filename, compressed in filename_to_compressed.items():
|
|
result = lzma.decompress(data[filename])
|
|
self.assertEqual(compressed, data[filename])
|
|
self.assertEqual(self.expected, result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_lzma_plugin_preset(self):
|
|
filename_to_compressed = {
|
|
self._named_tempfile("preset_PRESET_0"): lzma.compress(
|
|
self.expected, preset=0
|
|
),
|
|
self._named_tempfile("preset_PRESET_9"): lzma.compress(
|
|
self.expected, preset=9
|
|
),
|
|
}
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
self._named_tempfile("preset_PRESET_0"): {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.LZMAPlugin"],
|
|
"lzma_preset": 0,
|
|
},
|
|
self._named_tempfile("preset_PRESET_9"): {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.LZMAPlugin"],
|
|
"lzma_preset": 9,
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
|
|
for filename, compressed in filename_to_compressed.items():
|
|
result = lzma.decompress(data[filename])
|
|
self.assertEqual(compressed, data[filename])
|
|
self.assertEqual(self.expected, result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_lzma_plugin_filters(self):
|
|
if "PyPy" in sys.version:
|
|
# https://foss.heptapod.net/pypy/pypy/-/issues/3527
|
|
raise unittest.SkipTest("lzma filters doesn't work in PyPy")
|
|
|
|
filters = [{"id": lzma.FILTER_LZMA2}]
|
|
compressed = lzma.compress(self.expected, filters=filters)
|
|
filename = self._named_tempfile("filters")
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
filename: {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.LZMAPlugin"],
|
|
"lzma_filters": filters,
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
self.assertEqual(compressed, data[filename])
|
|
result = lzma.decompress(data[filename])
|
|
self.assertEqual(self.expected, result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_bz2_plugin(self):
|
|
filename = self._named_tempfile("bz2_file")
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
filename: {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.Bz2Plugin"],
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
try:
|
|
bz2.decompress(data[filename])
|
|
except OSError:
|
|
self.fail("Received invalid bz2 data.")
|
|
|
|
@defer.inlineCallbacks
|
|
def test_bz2_plugin_compresslevel(self):
|
|
filename_to_compressed = {
|
|
self._named_tempfile("compresslevel_1"): bz2.compress(
|
|
self.expected, compresslevel=1
|
|
),
|
|
self._named_tempfile("compresslevel_9"): bz2.compress(
|
|
self.expected, compresslevel=9
|
|
),
|
|
}
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
self._named_tempfile("compresslevel_1"): {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.Bz2Plugin"],
|
|
"bz2_compresslevel": 1,
|
|
},
|
|
self._named_tempfile("compresslevel_9"): {
|
|
"format": "csv",
|
|
"postprocessing": ["scrapy.extensions.postprocessing.Bz2Plugin"],
|
|
"bz2_compresslevel": 9,
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
|
|
for filename, compressed in filename_to_compressed.items():
|
|
result = bz2.decompress(data[filename])
|
|
self.assertEqual(compressed, data[filename])
|
|
self.assertEqual(self.expected, result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_custom_plugin(self):
|
|
filename = self._named_tempfile("csv_file")
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
filename: {
|
|
"format": "csv",
|
|
"postprocessing": [self.MyPlugin1],
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
self.assertEqual(self.expected, data[filename])
|
|
|
|
@defer.inlineCallbacks
|
|
def test_custom_plugin_with_parameter(self):
|
|
expected = b"foo\r\n\nbar\r\n\n"
|
|
filename = self._named_tempfile("newline")
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
filename: {
|
|
"format": "csv",
|
|
"postprocessing": [self.MyPlugin1],
|
|
"plugin1_char": b"\n",
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
self.assertEqual(expected, data[filename])
|
|
|
|
@defer.inlineCallbacks
|
|
def test_custom_plugin_with_compression(self):
|
|
expected = b"foo\r\n\nbar\r\n\n"
|
|
|
|
filename_to_decompressor = {
|
|
self._named_tempfile("bz2"): bz2.decompress,
|
|
self._named_tempfile("lzma"): lzma.decompress,
|
|
self._named_tempfile("gzip"): gzip.decompress,
|
|
}
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
self._named_tempfile("bz2"): {
|
|
"format": "csv",
|
|
"postprocessing": [
|
|
self.MyPlugin1,
|
|
"scrapy.extensions.postprocessing.Bz2Plugin",
|
|
],
|
|
"plugin1_char": b"\n",
|
|
},
|
|
self._named_tempfile("lzma"): {
|
|
"format": "csv",
|
|
"postprocessing": [
|
|
self.MyPlugin1,
|
|
"scrapy.extensions.postprocessing.LZMAPlugin",
|
|
],
|
|
"plugin1_char": b"\n",
|
|
},
|
|
self._named_tempfile("gzip"): {
|
|
"format": "csv",
|
|
"postprocessing": [
|
|
self.MyPlugin1,
|
|
"scrapy.extensions.postprocessing.GzipPlugin",
|
|
],
|
|
"plugin1_char": b"\n",
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
|
|
for filename, decompressor in filename_to_decompressor.items():
|
|
result = decompressor(data[filename])
|
|
self.assertEqual(expected, result)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_exports_compatibility_with_postproc(self):
|
|
import marshal
|
|
import pickle
|
|
|
|
filename_to_expected = {
|
|
self._named_tempfile("csv"): b"foo\r\nbar\r\n",
|
|
self._named_tempfile("json"): b'[\n{"foo": "bar"}\n]',
|
|
self._named_tempfile("jsonlines"): b'{"foo": "bar"}\n',
|
|
self._named_tempfile("xml"): b'<?xml version="1.0" encoding="utf-8"?>\n'
|
|
b"<items>\n<item><foo>bar</foo></item>\n</items>",
|
|
}
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
self._named_tempfile("csv"): {
|
|
"format": "csv",
|
|
"postprocessing": [self.MyPlugin1],
|
|
# empty plugin to activate postprocessing.PostProcessingManager
|
|
},
|
|
self._named_tempfile("json"): {
|
|
"format": "json",
|
|
"postprocessing": [self.MyPlugin1],
|
|
},
|
|
self._named_tempfile("jsonlines"): {
|
|
"format": "jsonlines",
|
|
"postprocessing": [self.MyPlugin1],
|
|
},
|
|
self._named_tempfile("xml"): {
|
|
"format": "xml",
|
|
"postprocessing": [self.MyPlugin1],
|
|
},
|
|
self._named_tempfile("marshal"): {
|
|
"format": "marshal",
|
|
"postprocessing": [self.MyPlugin1],
|
|
},
|
|
self._named_tempfile("pickle"): {
|
|
"format": "pickle",
|
|
"postprocessing": [self.MyPlugin1],
|
|
},
|
|
},
|
|
}
|
|
|
|
data = yield self.exported_data(self.items, settings)
|
|
|
|
for filename, result in data.items():
|
|
if "pickle" in filename:
|
|
expected, result = self.items[0], pickle.loads(result)
|
|
elif "marshal" in filename:
|
|
expected, result = self.items[0], marshal.loads(result)
|
|
else:
|
|
expected = filename_to_expected[filename]
|
|
self.assertEqual(expected, result)
|
|
|
|
|
|
class BatchDeliveriesTest(FeedExportTestBase):
|
|
_file_mark = "_%(batch_time)s_#%(batch_id)02d_"
|
|
|
|
@defer.inlineCallbacks
|
|
def run_and_export(self, spider_cls, settings):
|
|
"""Run spider with specified settings; return exported data."""
|
|
|
|
FEEDS = settings.get("FEEDS") or {}
|
|
settings["FEEDS"] = {
|
|
build_url(file_path): feed for file_path, feed in FEEDS.items()
|
|
}
|
|
content = defaultdict(list)
|
|
spider_cls.start_urls = [self.mockserver.url("/")]
|
|
crawler = get_crawler(spider_cls, settings)
|
|
yield crawler.crawl()
|
|
|
|
for path, feed in FEEDS.items():
|
|
dir_name = Path(path).parent
|
|
if not dir_name.exists():
|
|
content[feed["format"]] = []
|
|
continue
|
|
for file in sorted(dir_name.iterdir()):
|
|
content[feed["format"]].append(file.read_bytes())
|
|
return content
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExportedJsonLines(self, items, rows, settings=None):
|
|
settings = settings or {}
|
|
settings.update(
|
|
{
|
|
"FEEDS": {
|
|
self._random_temp_filename() / "jl" / self._file_mark: {
|
|
"format": "jl"
|
|
},
|
|
},
|
|
}
|
|
)
|
|
batch_size = Settings(settings).getint("FEED_EXPORT_BATCH_ITEM_COUNT")
|
|
rows = [{k: v for k, v in row.items() if v} for row in rows]
|
|
data = yield self.exported_data(items, settings)
|
|
for batch in data["jl"]:
|
|
got_batch = [
|
|
json.loads(to_unicode(batch_item)) for batch_item in batch.splitlines()
|
|
]
|
|
expected_batch, rows = rows[:batch_size], rows[batch_size:]
|
|
self.assertEqual(expected_batch, got_batch)
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExportedCsv(self, items, header, rows, settings=None):
|
|
settings = settings or {}
|
|
settings.update(
|
|
{
|
|
"FEEDS": {
|
|
self._random_temp_filename() / "csv" / self._file_mark: {
|
|
"format": "csv"
|
|
},
|
|
},
|
|
}
|
|
)
|
|
batch_size = Settings(settings).getint("FEED_EXPORT_BATCH_ITEM_COUNT")
|
|
data = yield self.exported_data(items, settings)
|
|
for batch in data["csv"]:
|
|
got_batch = csv.DictReader(to_unicode(batch).splitlines())
|
|
self.assertEqual(list(header), got_batch.fieldnames)
|
|
expected_batch, rows = rows[:batch_size], rows[batch_size:]
|
|
self.assertEqual(expected_batch, list(got_batch))
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExportedXml(self, items, rows, settings=None):
|
|
settings = settings or {}
|
|
settings.update(
|
|
{
|
|
"FEEDS": {
|
|
self._random_temp_filename() / "xml" / self._file_mark: {
|
|
"format": "xml"
|
|
},
|
|
},
|
|
}
|
|
)
|
|
batch_size = Settings(settings).getint("FEED_EXPORT_BATCH_ITEM_COUNT")
|
|
rows = [{k: v for k, v in row.items() if v} for row in rows]
|
|
data = yield self.exported_data(items, settings)
|
|
for batch in data["xml"]:
|
|
root = lxml.etree.fromstring(batch)
|
|
got_batch = [{e.tag: e.text for e in it} for it in root.findall("item")]
|
|
expected_batch, rows = rows[:batch_size], rows[batch_size:]
|
|
self.assertEqual(expected_batch, got_batch)
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExportedMultiple(self, items, rows, settings=None):
|
|
settings = settings or {}
|
|
settings.update(
|
|
{
|
|
"FEEDS": {
|
|
self._random_temp_filename() / "xml" / self._file_mark: {
|
|
"format": "xml"
|
|
},
|
|
self._random_temp_filename() / "json" / self._file_mark: {
|
|
"format": "json"
|
|
},
|
|
},
|
|
}
|
|
)
|
|
batch_size = Settings(settings).getint("FEED_EXPORT_BATCH_ITEM_COUNT")
|
|
rows = [{k: v for k, v in row.items() if v} for row in rows]
|
|
data = yield self.exported_data(items, settings)
|
|
# XML
|
|
xml_rows = rows.copy()
|
|
for batch in data["xml"]:
|
|
root = lxml.etree.fromstring(batch)
|
|
got_batch = [{e.tag: e.text for e in it} for it in root.findall("item")]
|
|
expected_batch, xml_rows = xml_rows[:batch_size], xml_rows[batch_size:]
|
|
self.assertEqual(expected_batch, got_batch)
|
|
# JSON
|
|
json_rows = rows.copy()
|
|
for batch in data["json"]:
|
|
got_batch = json.loads(batch.decode("utf-8"))
|
|
expected_batch, json_rows = json_rows[:batch_size], json_rows[batch_size:]
|
|
self.assertEqual(expected_batch, got_batch)
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExportedPickle(self, items, rows, settings=None):
|
|
settings = settings or {}
|
|
settings.update(
|
|
{
|
|
"FEEDS": {
|
|
self._random_temp_filename() / "pickle" / self._file_mark: {
|
|
"format": "pickle"
|
|
},
|
|
},
|
|
}
|
|
)
|
|
batch_size = Settings(settings).getint("FEED_EXPORT_BATCH_ITEM_COUNT")
|
|
rows = [{k: v for k, v in row.items() if v} for row in rows]
|
|
data = yield self.exported_data(items, settings)
|
|
import pickle
|
|
|
|
for batch in data["pickle"]:
|
|
got_batch = self._load_until_eof(batch, load_func=pickle.load)
|
|
expected_batch, rows = rows[:batch_size], rows[batch_size:]
|
|
self.assertEqual(expected_batch, got_batch)
|
|
|
|
@defer.inlineCallbacks
|
|
def assertExportedMarshal(self, items, rows, settings=None):
|
|
settings = settings or {}
|
|
settings.update(
|
|
{
|
|
"FEEDS": {
|
|
self._random_temp_filename() / "marshal" / self._file_mark: {
|
|
"format": "marshal"
|
|
},
|
|
},
|
|
}
|
|
)
|
|
batch_size = Settings(settings).getint("FEED_EXPORT_BATCH_ITEM_COUNT")
|
|
rows = [{k: v for k, v in row.items() if v} for row in rows]
|
|
data = yield self.exported_data(items, settings)
|
|
import marshal
|
|
|
|
for batch in data["marshal"]:
|
|
got_batch = self._load_until_eof(batch, load_func=marshal.load)
|
|
expected_batch, rows = rows[:batch_size], rows[batch_size:]
|
|
self.assertEqual(expected_batch, got_batch)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_items(self):
|
|
"""Test partial deliveries in all supported formats"""
|
|
items = [
|
|
self.MyItem({"foo": "bar1", "egg": "spam1"}),
|
|
self.MyItem({"foo": "bar2", "egg": "spam2", "baz": "quux2"}),
|
|
self.MyItem({"foo": "bar3", "baz": "quux3"}),
|
|
]
|
|
rows = [
|
|
{"egg": "spam1", "foo": "bar1", "baz": ""},
|
|
{"egg": "spam2", "foo": "bar2", "baz": "quux2"},
|
|
{"foo": "bar3", "baz": "quux3", "egg": ""},
|
|
]
|
|
settings = {"FEED_EXPORT_BATCH_ITEM_COUNT": 2}
|
|
header = self.MyItem.fields.keys()
|
|
yield self.assertExported(items, header, rows, settings=settings)
|
|
|
|
def test_wrong_path(self):
|
|
"""If path is without %(batch_time)s and %(batch_id) an exception must be raised"""
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename(): {"format": "xml"},
|
|
},
|
|
"FEED_EXPORT_BATCH_ITEM_COUNT": 1,
|
|
}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
self.assertRaises(NotConfigured, FeedExporter, crawler)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_no_items_not_store_empty(self):
|
|
for fmt in ("json", "jsonlines", "xml", "csv"):
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename() / fmt / self._file_mark: {
|
|
"format": fmt
|
|
},
|
|
},
|
|
"FEED_EXPORT_BATCH_ITEM_COUNT": 1,
|
|
"FEED_STORE_EMPTY": False,
|
|
}
|
|
data = yield self.exported_no_data(settings)
|
|
data = dict(data)
|
|
self.assertEqual(0, len(data[fmt]))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_no_items_store_empty(self):
|
|
formats = (
|
|
("json", b"[]"),
|
|
("jsonlines", b""),
|
|
("xml", b'<?xml version="1.0" encoding="utf-8"?>\n<items></items>'),
|
|
("csv", b""),
|
|
)
|
|
|
|
for fmt, expctd in formats:
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename() / fmt / self._file_mark: {
|
|
"format": fmt
|
|
},
|
|
},
|
|
"FEED_STORE_EMPTY": True,
|
|
"FEED_EXPORT_INDENT": None,
|
|
"FEED_EXPORT_BATCH_ITEM_COUNT": 1,
|
|
}
|
|
data = yield self.exported_no_data(settings)
|
|
data = dict(data)
|
|
self.assertEqual(expctd, data[fmt][0])
|
|
|
|
@defer.inlineCallbacks
|
|
def test_export_multiple_configs(self):
|
|
items = [
|
|
{"foo": "FOO", "bar": "BAR"},
|
|
{"foo": "FOO1", "bar": "BAR1"},
|
|
]
|
|
|
|
formats = {
|
|
"json": [
|
|
b'[\n{"bar": "BAR"}\n]',
|
|
b'[\n{"bar": "BAR1"}\n]',
|
|
],
|
|
"xml": [
|
|
(
|
|
b'<?xml version="1.0" encoding="latin-1"?>\n'
|
|
b"<items>\n <item>\n <foo>FOO</foo>\n </item>\n</items>"
|
|
),
|
|
(
|
|
b'<?xml version="1.0" encoding="latin-1"?>\n'
|
|
b"<items>\n <item>\n <foo>FOO1</foo>\n </item>\n</items>"
|
|
),
|
|
],
|
|
"csv": [
|
|
b"foo,bar\r\nFOO,BAR\r\n",
|
|
b"foo,bar\r\nFOO1,BAR1\r\n",
|
|
],
|
|
}
|
|
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename() / "json" / self._file_mark: {
|
|
"format": "json",
|
|
"indent": 0,
|
|
"fields": ["bar"],
|
|
"encoding": "utf-8",
|
|
},
|
|
self._random_temp_filename() / "xml" / self._file_mark: {
|
|
"format": "xml",
|
|
"indent": 2,
|
|
"fields": ["foo"],
|
|
"encoding": "latin-1",
|
|
},
|
|
self._random_temp_filename() / "csv" / self._file_mark: {
|
|
"format": "csv",
|
|
"indent": None,
|
|
"fields": ["foo", "bar"],
|
|
"encoding": "utf-8",
|
|
},
|
|
},
|
|
"FEED_EXPORT_BATCH_ITEM_COUNT": 1,
|
|
}
|
|
data = yield self.exported_data(items, settings)
|
|
for fmt, expected in formats.items():
|
|
for expected_batch, got_batch in zip(expected, data[fmt]):
|
|
self.assertEqual(expected_batch, got_batch)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_batch_item_count_feeds_setting(self):
|
|
items = [{"foo": "FOO"}, {"foo": "FOO1"}]
|
|
formats = {
|
|
"json": [
|
|
b'[{"foo": "FOO"}]',
|
|
b'[{"foo": "FOO1"}]',
|
|
],
|
|
}
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename() / "json" / self._file_mark: {
|
|
"format": "json",
|
|
"indent": None,
|
|
"encoding": "utf-8",
|
|
"batch_item_count": 1,
|
|
},
|
|
},
|
|
}
|
|
data = yield self.exported_data(items, settings)
|
|
for fmt, expected in formats.items():
|
|
for expected_batch, got_batch in zip(expected, data[fmt]):
|
|
self.assertEqual(expected_batch, got_batch)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_batch_path_differ(self):
|
|
"""
|
|
Test that the name of all batch files differ from each other.
|
|
So %(batch_id)d replaced with the current id.
|
|
"""
|
|
items = [
|
|
self.MyItem({"foo": "bar1", "egg": "spam1"}),
|
|
self.MyItem({"foo": "bar2", "egg": "spam2", "baz": "quux2"}),
|
|
self.MyItem({"foo": "bar3", "baz": "quux3"}),
|
|
]
|
|
settings = {
|
|
"FEEDS": {
|
|
self._random_temp_filename() / "%(batch_id)d": {
|
|
"format": "json",
|
|
},
|
|
},
|
|
"FEED_EXPORT_BATCH_ITEM_COUNT": 1,
|
|
}
|
|
data = yield self.exported_data(items, settings)
|
|
self.assertEqual(len(items), len(data["json"]))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_stats_batch_file_success(self):
|
|
settings = {
|
|
"FEEDS": {
|
|
build_url(
|
|
str(self._random_temp_filename() / "json" / self._file_mark)
|
|
): {
|
|
"format": "json",
|
|
}
|
|
},
|
|
"FEED_EXPORT_BATCH_ITEM_COUNT": 1,
|
|
}
|
|
crawler = get_crawler(ItemSpider, settings)
|
|
yield crawler.crawl(total=2, mockserver=self.mockserver)
|
|
self.assertIn(
|
|
"feedexport/success_count/FileFeedStorage", crawler.stats.get_stats()
|
|
)
|
|
self.assertEqual(
|
|
crawler.stats.get_value("feedexport/success_count/FileFeedStorage"), 12
|
|
)
|
|
|
|
@pytest.mark.requires_boto3
|
|
@defer.inlineCallbacks
|
|
def test_s3_export(self):
|
|
bucket = "mybucket"
|
|
items = [
|
|
self.MyItem({"foo": "bar1", "egg": "spam1"}),
|
|
self.MyItem({"foo": "bar2", "egg": "spam2", "baz": "quux2"}),
|
|
self.MyItem({"foo": "bar3", "baz": "quux3"}),
|
|
]
|
|
|
|
class CustomS3FeedStorage(S3FeedStorage):
|
|
stubs = []
|
|
|
|
def open(self, *args, **kwargs):
|
|
from botocore import __version__ as botocore_version
|
|
from botocore.stub import ANY, Stubber
|
|
from packaging.version import Version
|
|
|
|
expected_params = {
|
|
"Body": ANY,
|
|
"Bucket": bucket,
|
|
"Key": ANY,
|
|
}
|
|
if Version(botocore_version) >= Version("1.36.0"):
|
|
expected_params["ChecksumAlgorithm"] = ANY
|
|
|
|
stub = Stubber(self.s3_client)
|
|
stub.activate()
|
|
CustomS3FeedStorage.stubs.append(stub)
|
|
stub.add_response(
|
|
"put_object",
|
|
expected_params=expected_params,
|
|
service_response={},
|
|
)
|
|
return super().open(*args, **kwargs)
|
|
|
|
key = "export.csv"
|
|
uri = f"s3://{bucket}/{key}/%(batch_id)d.json"
|
|
batch_item_count = 1
|
|
settings = {
|
|
"AWS_ACCESS_KEY_ID": "access_key",
|
|
"AWS_SECRET_ACCESS_KEY": "secret_key",
|
|
"FEED_EXPORT_BATCH_ITEM_COUNT": batch_item_count,
|
|
"FEED_STORAGES": {
|
|
"s3": CustomS3FeedStorage,
|
|
},
|
|
"FEEDS": {
|
|
uri: {
|
|
"format": "json",
|
|
},
|
|
},
|
|
}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
storage = S3FeedStorage.from_crawler(crawler, uri)
|
|
verifyObject(IFeedStorage, storage)
|
|
|
|
class TestSpider(scrapy.Spider):
|
|
name = "testspider"
|
|
|
|
def parse(self, response):
|
|
yield from items
|
|
|
|
TestSpider.start_urls = [self.mockserver.url("/")]
|
|
crawler = get_crawler(TestSpider, settings)
|
|
yield crawler.crawl()
|
|
|
|
self.assertEqual(len(CustomS3FeedStorage.stubs), len(items))
|
|
for stub in CustomS3FeedStorage.stubs[:-1]:
|
|
stub.assert_no_pending_responses()
|
|
|
|
|
|
# Test that the FeedExporer sends the feed_exporter_closed and feed_slot_closed signals
|
|
class FeedExporterSignalsTest(unittest.TestCase):
|
|
items = [
|
|
{"foo": "bar1", "egg": "spam1"},
|
|
{"foo": "bar2", "egg": "spam2", "baz": "quux2"},
|
|
{"foo": "bar3", "baz": "quux3"},
|
|
]
|
|
|
|
with tempfile.NamedTemporaryFile(suffix="json") as tmp:
|
|
settings = {
|
|
"FEEDS": {
|
|
f"file:///{tmp.name}": {
|
|
"format": "json",
|
|
},
|
|
},
|
|
}
|
|
|
|
def feed_exporter_closed_signal_handler(self):
|
|
self.feed_exporter_closed_received = True
|
|
|
|
def feed_slot_closed_signal_handler(self, slot):
|
|
self.feed_slot_closed_received = True
|
|
|
|
def feed_exporter_closed_signal_handler_deferred(self):
|
|
d = defer.Deferred()
|
|
d.addCallback(lambda _: setattr(self, "feed_exporter_closed_received", True))
|
|
d.callback(None)
|
|
return d
|
|
|
|
def feed_slot_closed_signal_handler_deferred(self, slot):
|
|
d = defer.Deferred()
|
|
d.addCallback(lambda _: setattr(self, "feed_slot_closed_received", True))
|
|
d.callback(None)
|
|
return d
|
|
|
|
def run_signaled_feed_exporter(
|
|
self, feed_exporter_signal_handler, feed_slot_signal_handler
|
|
):
|
|
crawler = get_crawler(settings_dict=self.settings)
|
|
feed_exporter = FeedExporter.from_crawler(crawler)
|
|
spider = scrapy.Spider("default")
|
|
spider.crawler = crawler
|
|
crawler.signals.connect(
|
|
feed_exporter_signal_handler,
|
|
signal=signals.feed_exporter_closed,
|
|
)
|
|
crawler.signals.connect(
|
|
feed_slot_signal_handler, signal=signals.feed_slot_closed
|
|
)
|
|
feed_exporter.open_spider(spider)
|
|
for item in self.items:
|
|
feed_exporter.item_scraped(item, spider)
|
|
defer.ensureDeferred(feed_exporter.close_spider(spider))
|
|
|
|
def test_feed_exporter_signals_sent(self):
|
|
self.feed_exporter_closed_received = False
|
|
self.feed_slot_closed_received = False
|
|
|
|
self.run_signaled_feed_exporter(
|
|
self.feed_exporter_closed_signal_handler,
|
|
self.feed_slot_closed_signal_handler,
|
|
)
|
|
self.assertTrue(self.feed_slot_closed_received)
|
|
self.assertTrue(self.feed_exporter_closed_received)
|
|
|
|
def test_feed_exporter_signals_sent_deferred(self):
|
|
self.feed_exporter_closed_received = False
|
|
self.feed_slot_closed_received = False
|
|
|
|
self.run_signaled_feed_exporter(
|
|
self.feed_exporter_closed_signal_handler_deferred,
|
|
self.feed_slot_closed_signal_handler_deferred,
|
|
)
|
|
self.assertTrue(self.feed_slot_closed_received)
|
|
self.assertTrue(self.feed_exporter_closed_received)
|
|
|
|
|
|
class FeedExportInitTest(unittest.TestCase):
|
|
def test_unsupported_storage(self):
|
|
settings = {
|
|
"FEEDS": {
|
|
"unsupported://uri": {},
|
|
},
|
|
}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
with self.assertRaises(NotConfigured):
|
|
FeedExporter.from_crawler(crawler)
|
|
|
|
def test_unsupported_format(self):
|
|
settings = {
|
|
"FEEDS": {
|
|
"file://path": {
|
|
"format": "unsupported_format",
|
|
},
|
|
},
|
|
}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
with self.assertRaises(NotConfigured):
|
|
FeedExporter.from_crawler(crawler)
|
|
|
|
def test_absolute_pathlib_as_uri(self):
|
|
with tempfile.NamedTemporaryFile(suffix="json") as tmp:
|
|
settings = {
|
|
"FEEDS": {
|
|
Path(tmp.name).resolve(): {
|
|
"format": "json",
|
|
},
|
|
},
|
|
}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
exporter = FeedExporter.from_crawler(crawler)
|
|
self.assertIsInstance(exporter, FeedExporter)
|
|
|
|
def test_relative_pathlib_as_uri(self):
|
|
settings = {
|
|
"FEEDS": {
|
|
Path("./items.json"): {
|
|
"format": "json",
|
|
},
|
|
},
|
|
}
|
|
crawler = get_crawler(settings_dict=settings)
|
|
exporter = FeedExporter.from_crawler(crawler)
|
|
self.assertIsInstance(exporter, FeedExporter)
|
|
|
|
|
|
class URIParamsTest:
|
|
spider_name = "uri_params_spider"
|
|
deprecated_options = False
|
|
|
|
def build_settings(self, uri="file:///tmp/foobar", uri_params=None):
|
|
raise NotImplementedError
|
|
|
|
def _crawler_feed_exporter(self, settings):
|
|
if self.deprecated_options:
|
|
with pytest.warns(
|
|
ScrapyDeprecationWarning,
|
|
match="The `FEED_URI` and `FEED_FORMAT` settings have been deprecated",
|
|
):
|
|
crawler = get_crawler(settings_dict=settings)
|
|
feed_exporter = FeedExporter.from_crawler(crawler)
|
|
else:
|
|
crawler = get_crawler(settings_dict=settings)
|
|
feed_exporter = FeedExporter.from_crawler(crawler)
|
|
return crawler, feed_exporter
|
|
|
|
def test_default(self):
|
|
settings = self.build_settings(
|
|
uri="file:///tmp/%(name)s",
|
|
)
|
|
crawler, feed_exporter = self._crawler_feed_exporter(settings)
|
|
spider = scrapy.Spider(self.spider_name)
|
|
spider.crawler = crawler
|
|
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("error", ScrapyDeprecationWarning)
|
|
feed_exporter.open_spider(spider)
|
|
|
|
self.assertEqual(feed_exporter.slots[0].uri, f"file:///tmp/{self.spider_name}")
|
|
|
|
def test_none(self):
|
|
def uri_params(params, spider):
|
|
pass
|
|
|
|
settings = self.build_settings(
|
|
uri="file:///tmp/%(name)s",
|
|
uri_params=uri_params,
|
|
)
|
|
crawler, feed_exporter = self._crawler_feed_exporter(settings)
|
|
spider = scrapy.Spider(self.spider_name)
|
|
spider.crawler = crawler
|
|
|
|
feed_exporter.open_spider(spider)
|
|
|
|
self.assertEqual(feed_exporter.slots[0].uri, f"file:///tmp/{self.spider_name}")
|
|
|
|
def test_empty_dict(self):
|
|
def uri_params(params, spider):
|
|
return {}
|
|
|
|
settings = self.build_settings(
|
|
uri="file:///tmp/%(name)s",
|
|
uri_params=uri_params,
|
|
)
|
|
crawler, feed_exporter = self._crawler_feed_exporter(settings)
|
|
spider = scrapy.Spider(self.spider_name)
|
|
spider.crawler = crawler
|
|
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("error", ScrapyDeprecationWarning)
|
|
with self.assertRaises(KeyError):
|
|
feed_exporter.open_spider(spider)
|
|
|
|
def test_params_as_is(self):
|
|
def uri_params(params, spider):
|
|
return params
|
|
|
|
settings = self.build_settings(
|
|
uri="file:///tmp/%(name)s",
|
|
uri_params=uri_params,
|
|
)
|
|
crawler, feed_exporter = self._crawler_feed_exporter(settings)
|
|
spider = scrapy.Spider(self.spider_name)
|
|
spider.crawler = crawler
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("error", ScrapyDeprecationWarning)
|
|
feed_exporter.open_spider(spider)
|
|
|
|
self.assertEqual(feed_exporter.slots[0].uri, f"file:///tmp/{self.spider_name}")
|
|
|
|
def test_custom_param(self):
|
|
def uri_params(params, spider):
|
|
return {**params, "foo": self.spider_name}
|
|
|
|
settings = self.build_settings(
|
|
uri="file:///tmp/%(foo)s",
|
|
uri_params=uri_params,
|
|
)
|
|
crawler, feed_exporter = self._crawler_feed_exporter(settings)
|
|
spider = scrapy.Spider(self.spider_name)
|
|
spider.crawler = crawler
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("error", ScrapyDeprecationWarning)
|
|
feed_exporter.open_spider(spider)
|
|
|
|
self.assertEqual(feed_exporter.slots[0].uri, f"file:///tmp/{self.spider_name}")
|
|
|
|
|
|
class URIParamsSettingTest(URIParamsTest, unittest.TestCase):
|
|
deprecated_options = True
|
|
|
|
def build_settings(self, uri="file:///tmp/foobar", uri_params=None):
|
|
extra_settings = {}
|
|
if uri_params:
|
|
extra_settings["FEED_URI_PARAMS"] = uri_params
|
|
return {
|
|
"FEED_URI": uri,
|
|
**extra_settings,
|
|
}
|
|
|
|
|
|
class URIParamsFeedOptionTest(URIParamsTest, unittest.TestCase):
|
|
deprecated_options = False
|
|
|
|
def build_settings(self, uri="file:///tmp/foobar", uri_params=None):
|
|
options = {
|
|
"format": "jl",
|
|
}
|
|
if uri_params:
|
|
options["uri_params"] = uri_params
|
|
return {
|
|
"FEEDS": {
|
|
uri: options,
|
|
},
|
|
}
|