1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-23 13:04:20 +00:00
scrapy/tests/test_contrib_loader.py

584 lines
22 KiB
Python
Raw Normal View History

import unittest
from functools import partial
from scrapy.contrib.loader import ItemLoader
from scrapy.contrib.loader.processor import Join, Identity, TakeFirst, \
Compose, MapCompose
from scrapy.item import Item, Field
from scrapy.selector import Selector
from scrapy.http import HtmlResponse
# test items
class NameItem(Item):
name = Field()
class TestItem(NameItem):
url = Field()
summary = Field()
# test item loaders
class NameItemLoader(ItemLoader):
default_item_class = TestItem
class TestItemLoader(NameItemLoader):
name_in = MapCompose(lambda v: v.title())
class DefaultedItemLoader(NameItemLoader):
default_input_processor = MapCompose(lambda v: v[:-1])
# test processors
def processor_with_args(value, other=None, loader_context=None):
if 'key' in loader_context:
return loader_context['key']
return value
class BasicItemLoaderTest(unittest.TestCase):
def test_load_item_using_default_loader(self):
i = TestItem()
i['summary'] = u'lala'
il = ItemLoader(item=i)
il.add_value('name', u'marta')
item = il.load_item()
assert item is i
self.assertEqual(item['summary'], u'lala')
self.assertEqual(item['name'], [u'marta'])
def test_load_item_using_custom_loader(self):
il = TestItemLoader()
il.add_value('name', u'marta')
item = il.load_item()
self.assertEqual(item['name'], [u'Marta'])
def test_load_item_ignore_none_field_values(self):
def validate_sku(value):
# Let's assume a SKU is only digits.
if value.isdigit():
return value
class MyLoader(ItemLoader):
name_out = Compose(lambda vs: vs[0]) # take first which allows empty values
price_out = Compose(TakeFirst(), float)
sku_out = Compose(TakeFirst(), validate_sku)
valid_fragment = u'SKU: 1234'
invalid_fragment = u'SKU: not available'
sku_re = 'SKU: (.+)'
il = MyLoader(item={})
# Should not return "sku: None".
il.add_value('sku', [invalid_fragment], re=sku_re)
# Should not ignore empty values.
il.add_value('name', u'')
il.add_value('price', [u'0'])
self.assertEqual(il.load_item(), {
'name': u'',
'price': 0.0,
})
il.replace_value('sku', [valid_fragment], re=sku_re)
self.assertEqual(il.load_item()['sku'], u'1234')
def test_add_value(self):
il = TestItemLoader()
il.add_value('name', u'marta')
self.assertEqual(il.get_collected_values('name'), [u'Marta'])
self.assertEqual(il.get_output_value('name'), [u'Marta'])
il.add_value('name', u'pepe')
self.assertEqual(il.get_collected_values('name'), [u'Marta', u'Pepe'])
self.assertEqual(il.get_output_value('name'), [u'Marta', u'Pepe'])
# test add object value
il.add_value('summary', {'key': 1})
self.assertEqual(il.get_collected_values('summary'), [{'key': 1}])
il.add_value(None, u'Jim', lambda x: {'name': x})
self.assertEqual(il.get_collected_values('name'), [u'Marta', u'Pepe', u'Jim'])
def test_add_zero(self):
il = NameItemLoader()
il.add_value('name', 0)
self.assertEqual(il.get_collected_values('name'), [0])
def test_replace_value(self):
il = TestItemLoader()
il.replace_value('name', u'marta')
self.assertEqual(il.get_collected_values('name'), [u'Marta'])
self.assertEqual(il.get_output_value('name'), [u'Marta'])
il.replace_value('name', u'pepe')
self.assertEqual(il.get_collected_values('name'), [u'Pepe'])
self.assertEqual(il.get_output_value('name'), [u'Pepe'])
il.replace_value(None, u'Jim', lambda x: {'name': x})
self.assertEqual(il.get_collected_values('name'), [u'Jim'])
def test_get_value(self):
il = NameItemLoader()
self.assertEqual(u'FOO', il.get_value([u'foo', u'bar'], TakeFirst(), unicode.upper))
self.assertEqual([u'foo', u'bar'], il.get_value([u'name:foo', u'name:bar'], re=u'name:(.*)$'))
self.assertEqual(u'foo', il.get_value([u'name:foo', u'name:bar'], TakeFirst(), re=u'name:(.*)$'))
il.add_value('name', [u'name:foo', u'name:bar'], TakeFirst(), re=u'name:(.*)$')
self.assertEqual([u'foo'], il.get_collected_values('name'))
il.replace_value('name', u'name:bar', re=u'name:(.*)$')
self.assertEqual([u'bar'], il.get_collected_values('name'))
def test_iter_on_input_processor_input(self):
class NameFirstItemLoader(NameItemLoader):
name_in = TakeFirst()
il = NameFirstItemLoader()
il.add_value('name', u'marta')
self.assertEqual(il.get_collected_values('name'), [u'marta'])
il = NameFirstItemLoader()
il.add_value('name', [u'marta', u'jose'])
self.assertEqual(il.get_collected_values('name'), [u'marta'])
il = NameFirstItemLoader()
il.replace_value('name', u'marta')
self.assertEqual(il.get_collected_values('name'), [u'marta'])
il = NameFirstItemLoader()
il.replace_value('name', [u'marta', u'jose'])
self.assertEqual(il.get_collected_values('name'), [u'marta'])
il = NameFirstItemLoader()
il.add_value('name', u'marta')
il.add_value('name', [u'jose', u'pedro'])
self.assertEqual(il.get_collected_values('name'), [u'marta', u'jose'])
def test_map_compose_filter(self):
def filter_world(x):
return None if x == 'world' else x
proc = MapCompose(filter_world, str.upper)
self.assertEqual(proc(['hello', 'world', 'this', 'is', 'scrapy']),
['HELLO', 'THIS', 'IS', 'SCRAPY'])
def test_map_compose_filter_multil(self):
class TestItemLoader(NameItemLoader):
name_in = MapCompose(lambda v: v.title(), lambda v: v[:-1])
il = TestItemLoader()
il.add_value('name', u'marta')
self.assertEqual(il.get_output_value('name'), [u'Mart'])
item = il.load_item()
self.assertEqual(item['name'], [u'Mart'])
def test_default_input_processor(self):
il = DefaultedItemLoader()
il.add_value('name', u'marta')
self.assertEqual(il.get_output_value('name'), [u'mart'])
def test_inherited_default_input_processor(self):
class InheritDefaultedItemLoader(DefaultedItemLoader):
pass
il = InheritDefaultedItemLoader()
il.add_value('name', u'marta')
self.assertEqual(il.get_output_value('name'), [u'mart'])
def test_input_processor_inheritance(self):
class ChildItemLoader(TestItemLoader):
url_in = MapCompose(lambda v: v.lower())
il = ChildItemLoader()
il.add_value('url', u'HTTP://scrapy.ORG')
self.assertEqual(il.get_output_value('url'), [u'http://scrapy.org'])
il.add_value('name', u'marta')
self.assertEqual(il.get_output_value('name'), [u'Marta'])
class ChildChildItemLoader(ChildItemLoader):
url_in = MapCompose(lambda v: v.upper())
summary_in = MapCompose(lambda v: v)
il = ChildChildItemLoader()
il.add_value('url', u'http://scrapy.org')
self.assertEqual(il.get_output_value('url'), [u'HTTP://SCRAPY.ORG'])
il.add_value('name', u'marta')
self.assertEqual(il.get_output_value('name'), [u'Marta'])
def test_empty_map_compose(self):
class IdentityDefaultedItemLoader(DefaultedItemLoader):
name_in = MapCompose()
il = IdentityDefaultedItemLoader()
il.add_value('name', u'marta')
self.assertEqual(il.get_output_value('name'), [u'marta'])
def test_identity_input_processor(self):
class IdentityDefaultedItemLoader(DefaultedItemLoader):
name_in = Identity()
il = IdentityDefaultedItemLoader()
il.add_value('name', u'marta')
self.assertEqual(il.get_output_value('name'), [u'marta'])
def test_extend_custom_input_processors(self):
class ChildItemLoader(TestItemLoader):
name_in = MapCompose(TestItemLoader.name_in, unicode.swapcase)
il = ChildItemLoader()
il.add_value('name', u'marta')
self.assertEqual(il.get_output_value('name'), [u'mARTA'])
def test_extend_default_input_processors(self):
class ChildDefaultedItemLoader(DefaultedItemLoader):
name_in = MapCompose(DefaultedItemLoader.default_input_processor, unicode.swapcase)
il = ChildDefaultedItemLoader()
il.add_value('name', u'marta')
self.assertEqual(il.get_output_value('name'), [u'MART'])
def test_output_processor_using_function(self):
il = TestItemLoader()
il.add_value('name', [u'mar', u'ta'])
self.assertEqual(il.get_output_value('name'), [u'Mar', u'Ta'])
class TakeFirstItemLoader(TestItemLoader):
name_out = u" ".join
il = TakeFirstItemLoader()
il.add_value('name', [u'mar', u'ta'])
self.assertEqual(il.get_output_value('name'), u'Mar Ta')
def test_output_processor_error(self):
class TestItemLoader(ItemLoader):
default_item_class = TestItem
name_out = MapCompose(float)
il = TestItemLoader()
il.add_value('name', [u'$10'])
try:
float('$10')
except Exception as e:
expected_exc_str = str(e)
exc = None
try:
il.load_item()
except Exception as e:
exc = e
assert isinstance(exc, ValueError)
s = str(exc)
assert 'name' in s, s
assert '$10' in s, s
assert 'ValueError' in s, s
assert expected_exc_str in s, s
def test_output_processor_using_classes(self):
il = TestItemLoader()
il.add_value('name', [u'mar', u'ta'])
self.assertEqual(il.get_output_value('name'), [u'Mar', u'Ta'])
class TakeFirstItemLoader(TestItemLoader):
name_out = Join()
il = TakeFirstItemLoader()
il.add_value('name', [u'mar', u'ta'])
self.assertEqual(il.get_output_value('name'), u'Mar Ta')
class TakeFirstItemLoader(TestItemLoader):
name_out = Join("<br>")
il = TakeFirstItemLoader()
il.add_value('name', [u'mar', u'ta'])
self.assertEqual(il.get_output_value('name'), u'Mar<br>Ta')
def test_default_output_processor(self):
il = TestItemLoader()
il.add_value('name', [u'mar', u'ta'])
self.assertEqual(il.get_output_value('name'), [u'Mar', u'Ta'])
class LalaItemLoader(TestItemLoader):
default_output_processor = Identity()
il = LalaItemLoader()
il.add_value('name', [u'mar', u'ta'])
self.assertEqual(il.get_output_value('name'), [u'Mar', u'Ta'])
def test_loader_context_on_declaration(self):
class ChildItemLoader(TestItemLoader):
url_in = MapCompose(processor_with_args, key=u'val')
il = ChildItemLoader()
il.add_value('url', u'text')
self.assertEqual(il.get_output_value('url'), ['val'])
il.replace_value('url', u'text2')
self.assertEqual(il.get_output_value('url'), ['val'])
def test_loader_context_on_instantiation(self):
class ChildItemLoader(TestItemLoader):
url_in = MapCompose(processor_with_args)
il = ChildItemLoader(key=u'val')
il.add_value('url', u'text')
self.assertEqual(il.get_output_value('url'), ['val'])
il.replace_value('url', u'text2')
self.assertEqual(il.get_output_value('url'), ['val'])
def test_loader_context_on_assign(self):
class ChildItemLoader(TestItemLoader):
url_in = MapCompose(processor_with_args)
il = ChildItemLoader()
il.context['key'] = u'val'
il.add_value('url', u'text')
self.assertEqual(il.get_output_value('url'), ['val'])
il.replace_value('url', u'text2')
self.assertEqual(il.get_output_value('url'), ['val'])
def test_item_passed_to_input_processor_functions(self):
def processor(value, loader_context):
return loader_context['item']['name']
class ChildItemLoader(TestItemLoader):
url_in = MapCompose(processor)
it = TestItem(name='marta')
il = ChildItemLoader(item=it)
il.add_value('url', u'text')
self.assertEqual(il.get_output_value('url'), ['marta'])
il.replace_value('url', u'text2')
self.assertEqual(il.get_output_value('url'), ['marta'])
def test_add_value_on_unknown_field(self):
il = TestItemLoader()
self.assertRaises(KeyError, il.add_value, 'wrong_field', [u'lala', u'lolo'])
def test_compose_processor(self):
2009-08-12 17:23:53 -03:00
class TestItemLoader(NameItemLoader):
name_out = Compose(lambda v: v[0], lambda v: v.title(), lambda v: v[:-1])
2009-08-12 17:23:53 -03:00
il = TestItemLoader()
il.add_value('name', [u'marta', u'other'])
self.assertEqual(il.get_output_value('name'), u'Mart')
item = il.load_item()
2009-08-12 17:23:53 -03:00
self.assertEqual(item['name'], u'Mart')
def test_partial_processor(self):
def join(values, sep=None, loader_context=None, ignored=None):
if sep is not None:
return sep.join(values)
elif loader_context and 'sep' in loader_context:
return loader_context['sep'].join(values)
else:
return ''.join(values)
class TestItemLoader(NameItemLoader):
name_out = Compose(partial(join, sep='+'))
url_out = Compose(partial(join, loader_context={'sep': '.'}))
summary_out = Compose(partial(join, ignored='foo'))
il = TestItemLoader()
il.add_value('name', [u'rabbit', u'hole'])
il.add_value('url', [u'rabbit', u'hole'])
il.add_value('summary', [u'rabbit', u'hole'])
item = il.load_item()
self.assertEqual(item['name'], u'rabbit+hole')
self.assertEqual(item['url'], u'rabbit.hole')
self.assertEqual(item['summary'], u'rabbithole')
class ProcessorsTest(unittest.TestCase):
def test_take_first(self):
proc = TakeFirst()
self.assertEqual(proc([None, '', 'hello', 'world']), 'hello')
self.assertEqual(proc([None, '', 0, 'hello', 'world']), 0)
def test_identity(self):
proc = Identity()
self.assertEqual(proc([None, '', 'hello', 'world']),
[None, '', 'hello', 'world'])
def test_join(self):
proc = Join()
self.assertRaises(TypeError, proc, [None, '', 'hello', 'world'])
self.assertEqual(proc(['', 'hello', 'world']), u' hello world')
self.assertEqual(proc(['hello', 'world']), u'hello world')
self.assert_(isinstance(proc(['hello', 'world']), unicode))
def test_compose(self):
proc = Compose(lambda v: v[0], str.upper)
self.assertEqual(proc(['hello', 'world']), 'HELLO')
proc = Compose(str.upper)
self.assertEqual(proc(None), None)
proc = Compose(str.upper, stop_on_none=False)
self.assertRaises(TypeError, proc, None)
def test_mapcompose(self):
filter_world = lambda x: None if x == 'world' else x
proc = MapCompose(filter_world, unicode.upper)
self.assertEqual(proc([u'hello', u'world', u'this', u'is', u'scrapy']),
[u'HELLO', u'THIS', u'IS', u'SCRAPY'])
class SelectortemLoaderTest(unittest.TestCase):
response = HtmlResponse(url="", body="""
<html>
<body>
<div id="id">marta</div>
<p>paragraph</p>
<a href="http://www.scrapy.org">homepage</a>
<img src="/images/logo.png" width="244" height="65" alt="Scrapy">
</body>
</html>
""")
def test_constructor(self):
l = TestItemLoader()
self.assertEqual(l.selector, None)
def test_constructor_errors(self):
l = TestItemLoader()
self.assertRaises(RuntimeError, l.add_xpath, 'url', '//a/@href')
self.assertRaises(RuntimeError, l.replace_xpath, 'url', '//a/@href')
self.assertRaises(RuntimeError, l.get_xpath, '//a/@href')
self.assertRaises(RuntimeError, l.add_css, 'name', '#name::text')
self.assertRaises(RuntimeError, l.replace_css, 'name', '#name::text')
self.assertRaises(RuntimeError, l.get_css, '#name::text')
def test_constructor_with_selector(self):
sel = Selector(text=u"<html><body><div>marta</div></body></html>")
l = TestItemLoader(selector=sel)
self.assert_(l.selector is sel)
l.add_xpath('name', '//div/text()')
self.assertEqual(l.get_output_value('name'), [u'Marta'])
def test_constructor_with_selector_css(self):
sel = Selector(text=u"<html><body><div>marta</div></body></html>")
l = TestItemLoader(selector=sel)
self.assert_(l.selector is sel)
l.add_css('name', 'div::text')
self.assertEqual(l.get_output_value('name'), [u'Marta'])
def test_constructor_with_response(self):
l = TestItemLoader(response=self.response)
self.assert_(l.selector)
l.add_xpath('name', '//div/text()')
self.assertEqual(l.get_output_value('name'), [u'Marta'])
def test_constructor_with_response_css(self):
l = TestItemLoader(response=self.response)
self.assert_(l.selector)
l.add_css('name', 'div::text')
self.assertEqual(l.get_output_value('name'), [u'Marta'])
l.add_css('url', 'a::attr(href)')
self.assertEqual(l.get_output_value('url'), [u'http://www.scrapy.org'])
# combining/accumulating CSS selectors and XPath expressions
l.add_xpath('name', '//div/text()')
self.assertEqual(l.get_output_value('name'), [u'Marta', u'Marta'])
l.add_xpath('url', '//img/@src')
self.assertEqual(l.get_output_value('url'), [u'http://www.scrapy.org', u'/images/logo.png'])
def test_add_xpath_re(self):
l = TestItemLoader(response=self.response)
l.add_xpath('name', '//div/text()', re='ma')
self.assertEqual(l.get_output_value('name'), [u'Ma'])
2009-09-29 16:05:34 -03:00
def test_replace_xpath(self):
l = TestItemLoader(response=self.response)
2009-09-29 16:05:34 -03:00
self.assert_(l.selector)
l.add_xpath('name', '//div/text()')
self.assertEqual(l.get_output_value('name'), [u'Marta'])
l.replace_xpath('name', '//p/text()')
self.assertEqual(l.get_output_value('name'), [u'Paragraph'])
l.replace_xpath('name', ['//p/text()', '//div/text()'])
self.assertEqual(l.get_output_value('name'), [u'Paragraph', 'Marta'])
def test_get_xpath(self):
l = TestItemLoader(response=self.response)
self.assertEqual(l.get_xpath('//p/text()'), [u'paragraph'])
self.assertEqual(l.get_xpath('//p/text()', TakeFirst()), u'paragraph')
self.assertEqual(l.get_xpath('//p/text()', TakeFirst(), re='pa'), u'pa')
self.assertEqual(l.get_xpath(['//p/text()', '//div/text()']), [u'paragraph', 'marta'])
def test_replace_xpath_multi_fields(self):
l = TestItemLoader(response=self.response)
l.add_xpath(None, '//div/text()', TakeFirst(), lambda x: {'name': x})
self.assertEqual(l.get_output_value('name'), [u'Marta'])
l.replace_xpath(None, '//p/text()', TakeFirst(), lambda x: {'name': x})
self.assertEqual(l.get_output_value('name'), [u'Paragraph'])
2009-09-29 16:05:34 -03:00
def test_replace_xpath_re(self):
l = TestItemLoader(response=self.response)
2009-09-29 16:05:34 -03:00
self.assert_(l.selector)
l.add_xpath('name', '//div/text()')
self.assertEqual(l.get_output_value('name'), [u'Marta'])
l.replace_xpath('name', '//div/text()', re='ma')
self.assertEqual(l.get_output_value('name'), [u'Ma'])
def test_add_css_re(self):
l = TestItemLoader(response=self.response)
l.add_css('name', 'div::text', re='ma')
self.assertEqual(l.get_output_value('name'), [u'Ma'])
l.add_css('url', 'a::attr(href)', re='http://(.+)')
self.assertEqual(l.get_output_value('url'), [u'www.scrapy.org'])
def test_replace_css(self):
l = TestItemLoader(response=self.response)
self.assert_(l.selector)
l.add_css('name', 'div::text')
self.assertEqual(l.get_output_value('name'), [u'Marta'])
l.replace_css('name', 'p::text')
self.assertEqual(l.get_output_value('name'), [u'Paragraph'])
l.replace_css('name', ['p::text', 'div::text'])
self.assertEqual(l.get_output_value('name'), [u'Paragraph', 'Marta'])
l.add_css('url', 'a::attr(href)', re='http://(.+)')
self.assertEqual(l.get_output_value('url'), [u'www.scrapy.org'])
l.replace_css('url', 'img::attr(src)')
self.assertEqual(l.get_output_value('url'), [u'/images/logo.png'])
def test_get_css(self):
l = TestItemLoader(response=self.response)
self.assertEqual(l.get_css('p::text'), [u'paragraph'])
self.assertEqual(l.get_css('p::text', TakeFirst()), u'paragraph')
self.assertEqual(l.get_css('p::text', TakeFirst(), re='pa'), u'pa')
self.assertEqual(l.get_css(['p::text', 'div::text']), [u'paragraph', 'marta'])
self.assertEqual(l.get_css(['a::attr(href)', 'img::attr(src)']),
[u'http://www.scrapy.org', u'/images/logo.png'])
def test_replace_css_multi_fields(self):
l = TestItemLoader(response=self.response)
l.add_css(None, 'div::text', TakeFirst(), lambda x: {'name': x})
self.assertEqual(l.get_output_value('name'), [u'Marta'])
l.replace_css(None, 'p::text', TakeFirst(), lambda x: {'name': x})
self.assertEqual(l.get_output_value('name'), [u'Paragraph'])
l.add_css(None, 'a::attr(href)', TakeFirst(), lambda x: {'url': x})
self.assertEqual(l.get_output_value('url'), [u'http://www.scrapy.org'])
l.replace_css(None, 'img::attr(src)', TakeFirst(), lambda x: {'url': x})
self.assertEqual(l.get_output_value('url'), [u'/images/logo.png'])
def test_replace_css_re(self):
l = TestItemLoader(response=self.response)
self.assert_(l.selector)
l.add_css('url', 'a::attr(href)')
self.assertEqual(l.get_output_value('url'), [u'http://www.scrapy.org'])
l.replace_css('url', 'a::attr(href)', re='http://www\.(.+)')
self.assertEqual(l.get_output_value('url'), [u'scrapy.org'])
if __name__ == "__main__":
unittest.main()