1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-06 11:00:46 +00:00
scrapy/tests/test_loader_deprecated.py
2025-01-02 12:45:04 +01:00

718 lines
25 KiB
Python

"""
These tests are kept as references from the ones that were ported to a itemloaders library.
Once we remove the references from scrapy, we can remove these tests.
"""
import unittest
from functools import partial
from itemloaders.processors import (
Compose,
Identity,
Join,
MapCompose,
SelectJmes,
TakeFirst,
)
from scrapy.item import Field, Item
from scrapy.loader import ItemLoader
# test items
class NameItem(Item):
name = Field()
class TestItem(NameItem):
url = Field()
summary = Field()
# test item loaders
class NameItemLoader(ItemLoader):
default_item_class = TestItem
class TestItemLoader(NameItemLoader):
name_in = MapCompose(lambda v: v.title())
class DefaultedItemLoader(NameItemLoader):
default_input_processor = MapCompose(lambda v: v[:-1])
# test processors
def processor_with_args(value, other=None, loader_context=None):
if "key" in loader_context:
return loader_context["key"]
return value
class BasicItemLoaderTest(unittest.TestCase):
def test_load_item_using_default_loader(self):
i = TestItem()
i["summary"] = "lala"
il = ItemLoader(item=i)
il.add_value("name", "marta")
item = il.load_item()
assert item is i
self.assertEqual(item["summary"], ["lala"])
self.assertEqual(item["name"], ["marta"])
def test_load_item_using_custom_loader(self):
il = TestItemLoader()
il.add_value("name", "marta")
item = il.load_item()
self.assertEqual(item["name"], ["Marta"])
def test_load_item_ignore_none_field_values(self):
def validate_sku(value):
# Let's assume a SKU is only digits.
return value if value.isdigit() else None
class MyLoader(ItemLoader):
name_out = Compose(lambda vs: vs[0]) # take first which allows empty values
price_out = Compose(TakeFirst(), float)
sku_out = Compose(TakeFirst(), validate_sku)
valid_fragment = "SKU: 1234"
invalid_fragment = "SKU: not available"
sku_re = "SKU: (.+)"
il = MyLoader(item={})
# Should not return "sku: None".
il.add_value("sku", [invalid_fragment], re=sku_re)
# Should not ignore empty values.
il.add_value("name", "")
il.add_value("price", ["0"])
self.assertEqual(
il.load_item(),
{
"name": "",
"price": 0.0,
},
)
il.replace_value("sku", [valid_fragment], re=sku_re)
self.assertEqual(il.load_item()["sku"], "1234")
def test_self_referencing_loader(self):
class MyLoader(ItemLoader):
url_out = TakeFirst()
def img_url_out(self, values):
return (self.get_output_value("url") or "") + values[0]
il = MyLoader(item={})
il.add_value("url", "http://example.com/")
il.add_value("img_url", "1234.png")
self.assertEqual(
il.load_item(),
{
"url": "http://example.com/",
"img_url": "http://example.com/1234.png",
},
)
il = MyLoader(item={})
il.add_value("img_url", "1234.png")
self.assertEqual(
il.load_item(),
{
"img_url": "1234.png",
},
)
def test_add_value(self):
il = TestItemLoader()
il.add_value("name", "marta")
self.assertEqual(il.get_collected_values("name"), ["Marta"])
self.assertEqual(il.get_output_value("name"), ["Marta"])
il.add_value("name", "pepe")
self.assertEqual(il.get_collected_values("name"), ["Marta", "Pepe"])
self.assertEqual(il.get_output_value("name"), ["Marta", "Pepe"])
# test add object value
il.add_value("summary", {"key": 1})
self.assertEqual(il.get_collected_values("summary"), [{"key": 1}])
il.add_value(None, "Jim", lambda x: {"name": x})
self.assertEqual(il.get_collected_values("name"), ["Marta", "Pepe", "Jim"])
def test_add_zero(self):
il = NameItemLoader()
il.add_value("name", 0)
self.assertEqual(il.get_collected_values("name"), [0])
def test_replace_value(self):
il = TestItemLoader()
il.replace_value("name", "marta")
self.assertEqual(il.get_collected_values("name"), ["Marta"])
self.assertEqual(il.get_output_value("name"), ["Marta"])
il.replace_value("name", "pepe")
self.assertEqual(il.get_collected_values("name"), ["Pepe"])
self.assertEqual(il.get_output_value("name"), ["Pepe"])
il.replace_value(None, "Jim", lambda x: {"name": x})
self.assertEqual(il.get_collected_values("name"), ["Jim"])
def test_get_value(self):
il = NameItemLoader()
self.assertEqual("FOO", il.get_value(["foo", "bar"], TakeFirst(), str.upper))
self.assertEqual(
["foo", "bar"], il.get_value(["name:foo", "name:bar"], re="name:(.*)$")
)
self.assertEqual(
"foo", il.get_value(["name:foo", "name:bar"], TakeFirst(), re="name:(.*)$")
)
il.add_value("name", ["name:foo", "name:bar"], TakeFirst(), re="name:(.*)$")
self.assertEqual(["foo"], il.get_collected_values("name"))
il.replace_value("name", "name:bar", re="name:(.*)$")
self.assertEqual(["bar"], il.get_collected_values("name"))
def test_iter_on_input_processor_input(self):
class NameFirstItemLoader(NameItemLoader):
name_in = TakeFirst()
il = NameFirstItemLoader()
il.add_value("name", "marta")
self.assertEqual(il.get_collected_values("name"), ["marta"])
il = NameFirstItemLoader()
il.add_value("name", ["marta", "jose"])
self.assertEqual(il.get_collected_values("name"), ["marta"])
il = NameFirstItemLoader()
il.replace_value("name", "marta")
self.assertEqual(il.get_collected_values("name"), ["marta"])
il = NameFirstItemLoader()
il.replace_value("name", ["marta", "jose"])
self.assertEqual(il.get_collected_values("name"), ["marta"])
il = NameFirstItemLoader()
il.add_value("name", "marta")
il.add_value("name", ["jose", "pedro"])
self.assertEqual(il.get_collected_values("name"), ["marta", "jose"])
def test_map_compose_filter(self):
def filter_world(x):
return None if x == "world" else x
proc = MapCompose(filter_world, str.upper)
self.assertEqual(
proc(["hello", "world", "this", "is", "scrapy"]),
["HELLO", "THIS", "IS", "SCRAPY"],
)
def test_map_compose_filter_multil(self):
class TestItemLoader(NameItemLoader):
name_in = MapCompose(lambda v: v.title(), lambda v: v[:-1])
il = TestItemLoader()
il.add_value("name", "marta")
self.assertEqual(il.get_output_value("name"), ["Mart"])
item = il.load_item()
self.assertEqual(item["name"], ["Mart"])
def test_default_input_processor(self):
il = DefaultedItemLoader()
il.add_value("name", "marta")
self.assertEqual(il.get_output_value("name"), ["mart"])
def test_inherited_default_input_processor(self):
class InheritDefaultedItemLoader(DefaultedItemLoader):
pass
il = InheritDefaultedItemLoader()
il.add_value("name", "marta")
self.assertEqual(il.get_output_value("name"), ["mart"])
def test_input_processor_inheritance(self):
class ChildItemLoader(TestItemLoader):
url_in = MapCompose(lambda v: v.lower())
il = ChildItemLoader()
il.add_value("url", "HTTP://scrapy.ORG")
self.assertEqual(il.get_output_value("url"), ["http://scrapy.org"])
il.add_value("name", "marta")
self.assertEqual(il.get_output_value("name"), ["Marta"])
class ChildChildItemLoader(ChildItemLoader):
url_in = MapCompose(lambda v: v.upper())
summary_in = MapCompose(lambda v: v)
il = ChildChildItemLoader()
il.add_value("url", "http://scrapy.org")
self.assertEqual(il.get_output_value("url"), ["HTTP://SCRAPY.ORG"])
il.add_value("name", "marta")
self.assertEqual(il.get_output_value("name"), ["Marta"])
def test_empty_map_compose(self):
class IdentityDefaultedItemLoader(DefaultedItemLoader):
name_in = MapCompose()
il = IdentityDefaultedItemLoader()
il.add_value("name", "marta")
self.assertEqual(il.get_output_value("name"), ["marta"])
def test_identity_input_processor(self):
class IdentityDefaultedItemLoader(DefaultedItemLoader):
name_in = Identity()
il = IdentityDefaultedItemLoader()
il.add_value("name", "marta")
self.assertEqual(il.get_output_value("name"), ["marta"])
def test_extend_custom_input_processors(self):
class ChildItemLoader(TestItemLoader):
name_in = MapCompose(TestItemLoader.name_in, str.swapcase)
il = ChildItemLoader()
il.add_value("name", "marta")
self.assertEqual(il.get_output_value("name"), ["mARTA"])
def test_extend_default_input_processors(self):
class ChildDefaultedItemLoader(DefaultedItemLoader):
name_in = MapCompose(
DefaultedItemLoader.default_input_processor, str.swapcase
)
il = ChildDefaultedItemLoader()
il.add_value("name", "marta")
self.assertEqual(il.get_output_value("name"), ["MART"])
def test_output_processor_using_function(self):
il = TestItemLoader()
il.add_value("name", ["mar", "ta"])
self.assertEqual(il.get_output_value("name"), ["Mar", "Ta"])
class TakeFirstItemLoader(TestItemLoader):
name_out = " ".join
il = TakeFirstItemLoader()
il.add_value("name", ["mar", "ta"])
self.assertEqual(il.get_output_value("name"), "Mar Ta")
def test_output_processor_error(self):
class TestItemLoader(ItemLoader):
default_item_class = TestItem
name_out = MapCompose(float)
il = TestItemLoader()
il.add_value("name", ["$10"])
try:
float("$10")
except Exception as e:
expected_exc_str = str(e)
exc = None
try:
il.load_item()
except Exception as e:
exc = e
assert isinstance(exc, ValueError)
s = str(exc)
assert "name" in s, s
assert "$10" in s, s
assert "ValueError" in s, s
assert expected_exc_str in s, s
def test_output_processor_using_classes(self):
il = TestItemLoader()
il.add_value("name", ["mar", "ta"])
self.assertEqual(il.get_output_value("name"), ["Mar", "Ta"])
class TakeFirstItemLoader(TestItemLoader):
name_out = Join()
il = TakeFirstItemLoader()
il.add_value("name", ["mar", "ta"])
self.assertEqual(il.get_output_value("name"), "Mar Ta")
class TakeFirstItemLoader2(TestItemLoader):
name_out = Join("<br>")
il = TakeFirstItemLoader2()
il.add_value("name", ["mar", "ta"])
self.assertEqual(il.get_output_value("name"), "Mar<br>Ta")
def test_default_output_processor(self):
il = TestItemLoader()
il.add_value("name", ["mar", "ta"])
self.assertEqual(il.get_output_value("name"), ["Mar", "Ta"])
class LalaItemLoader(TestItemLoader):
default_output_processor = Identity()
il = LalaItemLoader()
il.add_value("name", ["mar", "ta"])
self.assertEqual(il.get_output_value("name"), ["Mar", "Ta"])
def test_loader_context_on_declaration(self):
class ChildItemLoader(TestItemLoader):
url_in = MapCompose(processor_with_args, key="val")
il = ChildItemLoader()
il.add_value("url", "text")
self.assertEqual(il.get_output_value("url"), ["val"])
il.replace_value("url", "text2")
self.assertEqual(il.get_output_value("url"), ["val"])
def test_loader_context_on_instantiation(self):
class ChildItemLoader(TestItemLoader):
url_in = MapCompose(processor_with_args)
il = ChildItemLoader(key="val")
il.add_value("url", "text")
self.assertEqual(il.get_output_value("url"), ["val"])
il.replace_value("url", "text2")
self.assertEqual(il.get_output_value("url"), ["val"])
def test_loader_context_on_assign(self):
class ChildItemLoader(TestItemLoader):
url_in = MapCompose(processor_with_args)
il = ChildItemLoader()
il.context["key"] = "val"
il.add_value("url", "text")
self.assertEqual(il.get_output_value("url"), ["val"])
il.replace_value("url", "text2")
self.assertEqual(il.get_output_value("url"), ["val"])
def test_item_passed_to_input_processor_functions(self):
def processor(value, loader_context):
return loader_context["item"]["name"]
class ChildItemLoader(TestItemLoader):
url_in = MapCompose(processor)
it = TestItem(name="marta")
il = ChildItemLoader(item=it)
il.add_value("url", "text")
self.assertEqual(il.get_output_value("url"), ["marta"])
il.replace_value("url", "text2")
self.assertEqual(il.get_output_value("url"), ["marta"])
def test_compose_processor(self):
class TestItemLoader(NameItemLoader):
name_out = Compose(lambda v: v[0], lambda v: v.title(), lambda v: v[:-1])
il = TestItemLoader()
il.add_value("name", ["marta", "other"])
self.assertEqual(il.get_output_value("name"), "Mart")
item = il.load_item()
self.assertEqual(item["name"], "Mart")
def test_partial_processor(self):
def join(values, sep=None, loader_context=None, ignored=None):
if sep is not None:
return sep.join(values)
if loader_context and "sep" in loader_context:
return loader_context["sep"].join(values)
return "".join(values)
class TestItemLoader(NameItemLoader):
name_out = Compose(partial(join, sep="+"))
url_out = Compose(partial(join, loader_context={"sep": "."}))
summary_out = Compose(partial(join, ignored="foo"))
il = TestItemLoader()
il.add_value("name", ["rabbit", "hole"])
il.add_value("url", ["rabbit", "hole"])
il.add_value("summary", ["rabbit", "hole"])
item = il.load_item()
self.assertEqual(item["name"], "rabbit+hole")
self.assertEqual(item["url"], "rabbit.hole")
self.assertEqual(item["summary"], "rabbithole")
def test_error_input_processor(self):
class TestItem(Item):
name = Field()
class TestItemLoader(ItemLoader):
default_item_class = TestItem
name_in = MapCompose(float)
il = TestItemLoader()
self.assertRaises(ValueError, il.add_value, "name", ["marta", "other"])
def test_error_output_processor(self):
class TestItem(Item):
name = Field()
class TestItemLoader(ItemLoader):
default_item_class = TestItem
name_out = Compose(Join(), float)
il = TestItemLoader()
il.add_value("name", "marta")
with self.assertRaises(ValueError):
il.load_item()
def test_error_processor_as_argument(self):
class TestItem(Item):
name = Field()
class TestItemLoader(ItemLoader):
default_item_class = TestItem
il = TestItemLoader()
self.assertRaises(
ValueError, il.add_value, "name", ["marta", "other"], Compose(float)
)
class InitializationFromDictTest(unittest.TestCase):
item_class = dict
def test_keep_single_value(self):
"""Loaded item should contain values from the initial item"""
input_item = self.item_class(name="foo")
il = ItemLoader(item=input_item)
loaded_item = il.load_item()
self.assertIsInstance(loaded_item, self.item_class)
self.assertEqual(dict(loaded_item), {"name": ["foo"]})
def test_keep_list(self):
"""Loaded item should contain values from the initial item"""
input_item = self.item_class(name=["foo", "bar"])
il = ItemLoader(item=input_item)
loaded_item = il.load_item()
self.assertIsInstance(loaded_item, self.item_class)
self.assertEqual(dict(loaded_item), {"name": ["foo", "bar"]})
def test_add_value_singlevalue_singlevalue(self):
"""Values added after initialization should be appended"""
input_item = self.item_class(name="foo")
il = ItemLoader(item=input_item)
il.add_value("name", "bar")
loaded_item = il.load_item()
self.assertIsInstance(loaded_item, self.item_class)
self.assertEqual(dict(loaded_item), {"name": ["foo", "bar"]})
def test_add_value_singlevalue_list(self):
"""Values added after initialization should be appended"""
input_item = self.item_class(name="foo")
il = ItemLoader(item=input_item)
il.add_value("name", ["item", "loader"])
loaded_item = il.load_item()
self.assertIsInstance(loaded_item, self.item_class)
self.assertEqual(dict(loaded_item), {"name": ["foo", "item", "loader"]})
def test_add_value_list_singlevalue(self):
"""Values added after initialization should be appended"""
input_item = self.item_class(name=["foo", "bar"])
il = ItemLoader(item=input_item)
il.add_value("name", "qwerty")
loaded_item = il.load_item()
self.assertIsInstance(loaded_item, self.item_class)
self.assertEqual(dict(loaded_item), {"name": ["foo", "bar", "qwerty"]})
def test_add_value_list_list(self):
"""Values added after initialization should be appended"""
input_item = self.item_class(name=["foo", "bar"])
il = ItemLoader(item=input_item)
il.add_value("name", ["item", "loader"])
loaded_item = il.load_item()
self.assertIsInstance(loaded_item, self.item_class)
self.assertEqual(dict(loaded_item), {"name": ["foo", "bar", "item", "loader"]})
def test_get_output_value_singlevalue(self):
"""Getting output value must not remove value from item"""
input_item = self.item_class(name="foo")
il = ItemLoader(item=input_item)
self.assertEqual(il.get_output_value("name"), ["foo"])
loaded_item = il.load_item()
self.assertIsInstance(loaded_item, self.item_class)
self.assertEqual(loaded_item, {"name": ["foo"]})
def test_get_output_value_list(self):
"""Getting output value must not remove value from item"""
input_item = self.item_class(name=["foo", "bar"])
il = ItemLoader(item=input_item)
self.assertEqual(il.get_output_value("name"), ["foo", "bar"])
loaded_item = il.load_item()
self.assertIsInstance(loaded_item, self.item_class)
self.assertEqual(loaded_item, {"name": ["foo", "bar"]})
def test_values_single(self):
"""Values from initial item must be added to loader._values"""
input_item = self.item_class(name="foo")
il = ItemLoader(item=input_item)
self.assertEqual(il._values.get("name"), ["foo"])
def test_values_list(self):
"""Values from initial item must be added to loader._values"""
input_item = self.item_class(name=["foo", "bar"])
il = ItemLoader(item=input_item)
self.assertEqual(il._values.get("name"), ["foo", "bar"])
class BaseNoInputReprocessingLoader(ItemLoader):
title_in = MapCompose(str.upper)
title_out = TakeFirst()
class NoInputReprocessingDictLoader(BaseNoInputReprocessingLoader):
default_item_class = dict
class NoInputReprocessingFromDictTest(unittest.TestCase):
"""
Loaders initialized from loaded items must not reprocess fields (dict instances)
"""
def test_avoid_reprocessing_with_initial_values_single(self):
il = NoInputReprocessingDictLoader(item={"title": "foo"})
il_loaded = il.load_item()
self.assertEqual(il_loaded, {"title": "foo"})
self.assertEqual(
NoInputReprocessingDictLoader(item=il_loaded).load_item(), {"title": "foo"}
)
def test_avoid_reprocessing_with_initial_values_list(self):
il = NoInputReprocessingDictLoader(item={"title": ["foo", "bar"]})
il_loaded = il.load_item()
self.assertEqual(il_loaded, {"title": "foo"})
self.assertEqual(
NoInputReprocessingDictLoader(item=il_loaded).load_item(), {"title": "foo"}
)
def test_avoid_reprocessing_without_initial_values_single(self):
il = NoInputReprocessingDictLoader()
il.add_value("title", "foo")
il_loaded = il.load_item()
self.assertEqual(il_loaded, {"title": "FOO"})
self.assertEqual(
NoInputReprocessingDictLoader(item=il_loaded).load_item(), {"title": "FOO"}
)
def test_avoid_reprocessing_without_initial_values_list(self):
il = NoInputReprocessingDictLoader()
il.add_value("title", ["foo", "bar"])
il_loaded = il.load_item()
self.assertEqual(il_loaded, {"title": "FOO"})
self.assertEqual(
NoInputReprocessingDictLoader(item=il_loaded).load_item(), {"title": "FOO"}
)
class TestOutputProcessorDict(unittest.TestCase):
def test_output_processor(self):
class TempDict(dict):
def __init__(self, *args, **kwargs):
super().__init__(self, *args, **kwargs)
self.setdefault("temp", 0.3)
class TempLoader(ItemLoader):
default_item_class = TempDict
default_input_processor = Identity()
default_output_processor = Compose(TakeFirst())
loader = TempLoader()
item = loader.load_item()
self.assertIsInstance(item, TempDict)
self.assertEqual(dict(item), {"temp": 0.3})
class ProcessorsTest(unittest.TestCase):
def test_take_first(self):
proc = TakeFirst()
self.assertEqual(proc([None, "", "hello", "world"]), "hello")
self.assertEqual(proc([None, "", 0, "hello", "world"]), 0)
def test_identity(self):
proc = Identity()
self.assertEqual(
proc([None, "", "hello", "world"]), [None, "", "hello", "world"]
)
def test_join(self):
proc = Join()
self.assertRaises(TypeError, proc, [None, "", "hello", "world"])
self.assertEqual(proc(["", "hello", "world"]), " hello world")
self.assertEqual(proc(["hello", "world"]), "hello world")
self.assertIsInstance(proc(["hello", "world"]), str)
def test_compose(self):
proc = Compose(lambda v: v[0], str.upper)
self.assertEqual(proc(["hello", "world"]), "HELLO")
proc = Compose(str.upper)
self.assertEqual(proc(None), None)
proc = Compose(str.upper, stop_on_none=False)
self.assertRaises(ValueError, proc, None)
proc = Compose(str.upper, lambda x: x + 1)
self.assertRaises(ValueError, proc, "hello")
def test_mapcompose(self):
def filter_world(x):
return None if x == "world" else x
proc = MapCompose(filter_world, str.upper)
self.assertEqual(
proc(["hello", "world", "this", "is", "scrapy"]),
["HELLO", "THIS", "IS", "SCRAPY"],
)
proc = MapCompose(filter_world, str.upper)
self.assertEqual(proc(None), [])
proc = MapCompose(filter_world, str.upper)
self.assertRaises(ValueError, proc, [1])
proc = MapCompose(filter_world, lambda x: x + 1)
self.assertRaises(ValueError, proc, "hello")
class SelectJmesTestCase(unittest.TestCase):
test_list_equals = {
"simple": ("foo.bar", {"foo": {"bar": "baz"}}, "baz"),
"invalid": ("foo.bar.baz", {"foo": {"bar": "baz"}}, None),
"top_level": ("foo", {"foo": {"bar": "baz"}}, {"bar": "baz"}),
"double_vs_single_quote_string": ("foo.bar", {"foo": {"bar": "baz"}}, "baz"),
"dict": (
"foo.bar[*].name",
{"foo": {"bar": [{"name": "one"}, {"name": "two"}]}},
["one", "two"],
),
"list": ("[1]", [1, 2], 2),
}
def test_output(self):
for k, v in self.test_list_equals.items():
expr, test_list, expected = v
test = SelectJmes(expr)(test_list)
self.assertEqual(
test, expected, msg=f'test "{k}" got {test} expected {expected}'
)
# Functions as processors
def function_processor_strip(iterable):
return [x.strip() for x in iterable]
def function_processor_upper(iterable):
return [x.upper() for x in iterable]
class FunctionProcessorItem(Item):
foo = Field(
input_processor=function_processor_strip,
output_processor=function_processor_upper,
)
class FunctionProcessorDictLoader(ItemLoader):
default_item_class = dict
foo_in = function_processor_strip
foo_out = function_processor_upper
class FunctionProcessorTestCase(unittest.TestCase):
def test_processor_defined_in_item_loader(self):
lo = FunctionProcessorDictLoader()
lo.add_value("foo", " bar ")
lo.add_value("foo", [" asdf ", " qwerty "])
self.assertEqual(dict(lo.load_item()), {"foo": ["BAR", "ASDF", "QWERTY"]})