# -*- coding: utf-8 -*- import os import six from twisted.trial import unittest from scrapy.utils.iterators import csviter, xmliter, _body_or_str, xmliter_lxml from scrapy.http import XmlResponse, TextResponse, Response from tests import get_testdata FOOBAR_NL = u"foo" + os.linesep + u"bar" class XmliterTestCase(unittest.TestCase): xmliter = staticmethod(xmliter) def test_xmliter(self): body = b"""\ \ \ Type 1\ Name 1\ \ \ Type 2\ Name 2\ \ """ response = XmlResponse(url="http://example.com", body=body) attrs = [] for x in self.xmliter(response, 'product'): attrs.append((x.xpath("@id").extract(), x.xpath("name/text()").extract(), x.xpath("./type/text()").extract())) self.assertEqual(attrs, [(['001'], ['Name 1'], ['Type 1']), (['002'], ['Name 2'], ['Type 2'])]) def test_xmliter_unusual_node(self): body = b""" """ response = XmlResponse(url="http://example.com", body=body) nodenames = [e.xpath('name()').extract() for e in self.xmliter(response, 'matchme...')] self.assertEqual(nodenames, [['matchme...']]) def test_xmliter_unicode(self): # example taken from https://github.com/scrapy/scrapy/issues/1665 body = u""" <þingflokkar> <þingflokkur id="26"> - 80 <þingflokkur id="21"> Alþýðubandalag Ab Alþb. 76 123 <þingflokkur id="27"> Alþýðuflokkur A Alþfl. 27 120 """ for r in ( # with bytes XmlResponse(url="http://example.com", body=body.encode('utf-8')), # Unicode body needs encoding information XmlResponse(url="http://example.com", body=body, encoding='utf-8')): attrs = [] for x in self.xmliter(r, u'þingflokkur'): attrs.append((x.xpath('@id').extract(), x.xpath(u'./skammstafanir/stuttskammstöfun/text()').extract(), x.xpath(u'./tímabil/fyrstaþing/text()').extract())) self.assertEqual(attrs, [([u'26'], [u'-'], [u'80']), ([u'21'], [u'Ab'], [u'76']), ([u'27'], [u'A'], [u'27'])]) def test_xmliter_text(self): body = u"""onetwo""" self.assertEqual([x.xpath("text()").extract() for x in self.xmliter(body, 'product')], [[u'one'], [u'two']]) def test_xmliter_namespaces(self): body = b"""\ My Dummy Company http://www.mydummycompany.com This is a dummy company. We do nothing. Item 1 This is item 1 http://www.mydummycompany.com/items/1 http://www.mydummycompany.com/images/item1.jpg ITEM_1 400 """ response = XmlResponse(url='http://mydummycompany.com', body=body) my_iter = self.xmliter(response, 'item') node = next(my_iter) node.register_namespace('g', 'http://base.google.com/ns/1.0') self.assertEqual(node.xpath('title/text()').extract(), ['Item 1']) self.assertEqual(node.xpath('description/text()').extract(), ['This is item 1']) self.assertEqual(node.xpath('link/text()').extract(), ['http://www.mydummycompany.com/items/1']) self.assertEqual(node.xpath('g:image_link/text()').extract(), ['http://www.mydummycompany.com/images/item1.jpg']) self.assertEqual(node.xpath('g:id/text()').extract(), ['ITEM_1']) self.assertEqual(node.xpath('g:price/text()').extract(), ['400']) self.assertEqual(node.xpath('image_link/text()').extract(), []) self.assertEqual(node.xpath('id/text()').extract(), []) self.assertEqual(node.xpath('price/text()').extract(), []) def test_xmliter_exception(self): body = u"""onetwo""" iter = self.xmliter(body, 'product') next(iter) next(iter) self.assertRaises(StopIteration, next, iter) def test_xmliter_objtype_exception(self): i = self.xmliter(42, 'product') self.assertRaises(AssertionError, next, i) def test_xmliter_encoding(self): body = b'\n\n Some Turkish Characters \xd6\xc7\xde\xdd\xd0\xdc \xfc\xf0\xfd\xfe\xe7\xf6\n\n\n' response = XmlResponse('http://www.example.com', body=body) self.assertEqual( next(self.xmliter(response, 'item')).extract(), u'Some Turkish Characters \xd6\xc7\u015e\u0130\u011e\xdc \xfc\u011f\u0131\u015f\xe7\xf6' ) class LxmlXmliterTestCase(XmliterTestCase): xmliter = staticmethod(xmliter_lxml) def test_xmliter_iterate_namespace(self): body = b"""\ My Dummy Company http://www.mydummycompany.com This is a dummy company. We do nothing. Item 1 This is item 1 http://www.mydummycompany.com/items/1 http://www.mydummycompany.com/images/item1.jpg http://www.mydummycompany.com/images/item2.jpg """ response = XmlResponse(url='http://mydummycompany.com', body=body) no_namespace_iter = self.xmliter(response, 'image_link') self.assertEqual(len(list(no_namespace_iter)), 0) namespace_iter = self.xmliter(response, 'image_link', 'http://base.google.com/ns/1.0') node = next(namespace_iter) self.assertEqual(node.xpath('text()').extract(), ['http://www.mydummycompany.com/images/item1.jpg']) node = next(namespace_iter) self.assertEqual(node.xpath('text()').extract(), ['http://www.mydummycompany.com/images/item2.jpg']) def test_xmliter_namespaces_prefix(self): body = b"""\ Apples Bananas African Coffee Table 80 120 """ response = XmlResponse(url='http://mydummycompany.com', body=body) my_iter = self.xmliter(response, 'table', 'http://www.w3.org/TR/html4/', 'h') node = next(my_iter) self.assertEqual(len(node.xpath('h:tr/h:td').extract()), 2) self.assertEqual(node.xpath('h:tr/h:td[1]/text()').extract(), ['Apples']) self.assertEqual(node.xpath('h:tr/h:td[2]/text()').extract(), ['Bananas']) my_iter = self.xmliter(response, 'table', 'http://www.w3schools.com/furniture', 'f') node = next(my_iter) self.assertEqual(node.xpath('f:name/text()').extract(), ['African Coffee Table']) def test_xmliter_objtype_exception(self): i = self.xmliter(42, 'product') self.assertRaises(TypeError, next, i) class UtilsCsvTestCase(unittest.TestCase): sample_feeds_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'sample_data', 'feeds') sample_feed_path = os.path.join(sample_feeds_dir, 'feed-sample3.csv') sample_feed2_path = os.path.join(sample_feeds_dir, 'feed-sample4.csv') sample_feed3_path = os.path.join(sample_feeds_dir, 'feed-sample5.csv') def test_csviter_defaults(self): body = get_testdata('feeds', 'feed-sample3.csv') response = TextResponse(url="http://example.com/", body=body) csv = csviter(response) result = [row for row in csv] self.assertEqual(result, [{u'id': u'1', u'name': u'alpha', u'value': u'foobar'}, {u'id': u'2', u'name': u'unicode', u'value': u'\xfan\xedc\xf3d\xe9\u203d'}, {u'id': u'3', u'name': u'multi', u'value': FOOBAR_NL}, {u'id': u'4', u'name': u'empty', u'value': u''}]) # explicit type check cuz' we no like stinkin' autocasting! yarrr for result_row in result: self.assert_(all((isinstance(k, six.text_type) for k in result_row.keys()))) self.assert_(all((isinstance(v, six.text_type) for v in result_row.values()))) def test_csviter_delimiter(self): body = get_testdata('feeds', 'feed-sample3.csv').replace(b',', b'\t') response = TextResponse(url="http://example.com/", body=body) csv = csviter(response, delimiter='\t') self.assertEqual([row for row in csv], [{u'id': u'1', u'name': u'alpha', u'value': u'foobar'}, {u'id': u'2', u'name': u'unicode', u'value': u'\xfan\xedc\xf3d\xe9\u203d'}, {u'id': u'3', u'name': u'multi', u'value': FOOBAR_NL}, {u'id': u'4', u'name': u'empty', u'value': u''}]) def test_csviter_quotechar(self): body1 = get_testdata('feeds', 'feed-sample6.csv') body2 = get_testdata('feeds', 'feed-sample6.csv').replace(b',', b'|') response1 = TextResponse(url="http://example.com/", body=body1) csv1 = csviter(response1, quotechar="'") self.assertEqual([row for row in csv1], [{u'id': u'1', u'name': u'alpha', u'value': u'foobar'}, {u'id': u'2', u'name': u'unicode', u'value': u'\xfan\xedc\xf3d\xe9\u203d'}, {u'id': u'3', u'name': u'multi', u'value': FOOBAR_NL}, {u'id': u'4', u'name': u'empty', u'value': u''}]) response2 = TextResponse(url="http://example.com/", body=body2) csv2 = csviter(response2, delimiter="|", quotechar="'") self.assertEqual([row for row in csv2], [{u'id': u'1', u'name': u'alpha', u'value': u'foobar'}, {u'id': u'2', u'name': u'unicode', u'value': u'\xfan\xedc\xf3d\xe9\u203d'}, {u'id': u'3', u'name': u'multi', u'value': FOOBAR_NL}, {u'id': u'4', u'name': u'empty', u'value': u''}]) def test_csviter_wrong_quotechar(self): body = get_testdata('feeds', 'feed-sample6.csv') response = TextResponse(url="http://example.com/", body=body) csv = csviter(response) self.assertEqual([row for row in csv], [{u"'id'": u"1", u"'name'": u"'alpha'", u"'value'": u"'foobar'"}, {u"'id'": u"2", u"'name'": u"'unicode'", u"'value'": u"'\xfan\xedc\xf3d\xe9\u203d'"}, {u"'id'": u"'3'", u"'name'": u"'multi'", u"'value'": u"'foo"}, {u"'id'": u"4", u"'name'": u"'empty'", u"'value'": u""}]) def test_csviter_delimiter_binary_response_assume_utf8_encoding(self): body = get_testdata('feeds', 'feed-sample3.csv').replace(b',', b'\t') response = Response(url="http://example.com/", body=body) csv = csviter(response, delimiter='\t') self.assertEqual([row for row in csv], [{u'id': u'1', u'name': u'alpha', u'value': u'foobar'}, {u'id': u'2', u'name': u'unicode', u'value': u'\xfan\xedc\xf3d\xe9\u203d'}, {u'id': u'3', u'name': u'multi', u'value': FOOBAR_NL}, {u'id': u'4', u'name': u'empty', u'value': u''}]) def test_csviter_headers(self): sample = get_testdata('feeds', 'feed-sample3.csv').splitlines() headers, body = sample[0].split(b','), b'\n'.join(sample[1:]) response = TextResponse(url="http://example.com/", body=body) csv = csviter(response, headers=[h.decode('utf-8') for h in headers]) self.assertEqual([row for row in csv], [{u'id': u'1', u'name': u'alpha', u'value': u'foobar'}, {u'id': u'2', u'name': u'unicode', u'value': u'\xfan\xedc\xf3d\xe9\u203d'}, {u'id': u'3', u'name': u'multi', u'value': u'foo\nbar'}, {u'id': u'4', u'name': u'empty', u'value': u''}]) def test_csviter_falserow(self): body = get_testdata('feeds', 'feed-sample3.csv') body = b'\n'.join((body, b'a,b', b'a,b,c,d')) response = TextResponse(url="http://example.com/", body=body) csv = csviter(response) self.assertEqual([row for row in csv], [{u'id': u'1', u'name': u'alpha', u'value': u'foobar'}, {u'id': u'2', u'name': u'unicode', u'value': u'\xfan\xedc\xf3d\xe9\u203d'}, {u'id': u'3', u'name': u'multi', u'value': FOOBAR_NL}, {u'id': u'4', u'name': u'empty', u'value': u''}]) def test_csviter_exception(self): body = get_testdata('feeds', 'feed-sample3.csv') response = TextResponse(url="http://example.com/", body=body) iter = csviter(response) next(iter) next(iter) next(iter) next(iter) self.assertRaises(StopIteration, next, iter) def test_csviter_encoding(self): body1 = get_testdata('feeds', 'feed-sample4.csv') body2 = get_testdata('feeds', 'feed-sample5.csv') response = TextResponse(url="http://example.com/", body=body1, encoding='latin1') csv = csviter(response) self.assertEqual([row for row in csv], [{u'id': u'1', u'name': u'latin1', u'value': u'test'}, {u'id': u'2', u'name': u'something', u'value': u'\xf1\xe1\xe9\xf3'}]) response = TextResponse(url="http://example.com/", body=body2, encoding='cp852') csv = csviter(response) self.assertEqual([row for row in csv], [{u'id': u'1', u'name': u'cp852', u'value': u'test'}, {u'id': u'2', u'name': u'something', u'value': u'\u255a\u2569\u2569\u2569\u2550\u2550\u2557'}]) class TestHelper(unittest.TestCase): bbody = b'utf8-body' ubody = bbody.decode('utf8') txtresponse = TextResponse(url='http://example.org/', body=bbody, encoding='utf-8') response = Response(url='http://example.org/', body=bbody) def test_body_or_str(self): for obj in (self.bbody, self.ubody, self.txtresponse, self.response): r1 = _body_or_str(obj) self._assert_type_and_value(r1, self.ubody, obj) r2 = _body_or_str(obj, unicode=True) self._assert_type_and_value(r2, self.ubody, obj) r3 = _body_or_str(obj, unicode=False) self._assert_type_and_value(r3, self.bbody, obj) self.assertTrue(type(r1) is type(r2)) self.assertTrue(type(r1) is not type(r3)) def _assert_type_and_value(self, a, b, obj): self.assertTrue(type(a) is type(b), 'Got {}, expected {} for {!r}'.format(type(a), type(b), obj)) self.assertEqual(a, b) if __name__ == "__main__": unittest.main()