1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-22 22:03:53 +00:00

moved xpathselector_iternodes from scrapy.utils.xml to scrapy.utils.iterators and renamed it to "xmliter", also renamed csv_iter to csviter and added tests

--HG--
extra : convert_revision : svn%3Ab85faa78-f9eb-468e-a121-7cced6da292c%40360
This commit is contained in:
samus_ 2008-11-03 16:10:43 +00:00
parent 1ef65b97b5
commit 9b46c20da2
6 changed files with 165 additions and 78 deletions

View File

@ -98,7 +98,7 @@ class XMLFeedSpider(BasicSpider):
raise NotConfigured('You must define parse_item method in order to scrape this feed')
if self.iternodes:
nodes = xpathselector_iternodes(response, self.itertag)
nodes = xmliter(response, self.itertag)
else:
nodes = XmlXPathSelector(response).x('//%s' % self.itertag)

View File

@ -0,0 +1,6 @@
id,name,value
1,alpha,foobar
2,unicode,únícódé‽
3,multi,"foo
bar"
4,empty,
1 id name value
2 1 alpha foobar
3 2 unicode únícódé‽
4 3 multi foo bar
5 4 empty

View File

@ -0,0 +1,117 @@
import os
import unittest
from scrapy.utils.iterators import csviter, xmliter
from scrapy.http import Response
class UtilsXmlTestCase(unittest.TestCase):
### NOTE: Encoding issues have been found with BeautifulSoup for utf-16 files, utf-16 test removed ###
def test_iterator(self):
body = """<?xml version="1.0" encoding="UTF-8"?>
<products xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="someschmea.xsd">
<product id="001">
<type>Type 1</type>
<name>Name 1</name>
</product>
<product id="002">
<type>Type 2</type>
<name>Name 2</name>
</product>
</products>
"""
response = Response(domain="example.com", url="http://example.com", body=body)
attrs = []
for x in xmliter(response, 'product'):
attrs.append((x.x("@id").extract(), x.x("name/text()").extract(), x.x("./type/text()").extract()))
self.assertEqual(attrs,
[(['001'], ['Name 1'], ['Type 1']), (['002'], ['Name 2'], ['Type 2'])])
def test_iterator_text(self):
body = u"""<?xml version="1.0" encoding="UTF-8"?><products><product>one</product><product>two</product></products>"""
self.assertEqual([x.x("text()").extract() for x in xmliter(body, 'product')],
[[u'one'], [u'two']])
def test_iterator_exception(self):
body = u"""<?xml version="1.0" encoding="UTF-8"?><products><product>one</product><product>two</product></products>"""
iter = xmliter(body, 'product')
iter.next()
iter.next()
self.assertRaises(StopIteration, iter.next)
class UtilsCsvTestCase(unittest.TestCase):
sample_feed_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'sample_data', 'feeds', 'feed-sample3.csv')
def test_iterator_defaults(self):
body = open(self.sample_feed_path).read()
response = Response(domain="example.com", url="http://example.com/", body=body)
csv = csviter(response)
result = [row for row in csv]
self.assertEqual(result,
[{u'id': u'1', u'name': u'alpha', u'value': u'foobar'},
{u'id': u'2', u'name': u'unicode', u'value': u'\xfan\xedc\xf3d\xe9\u203d'},
{u'id': u'3', u'name': u'multi', u'value': u'foo\nbar'},
{u'id': u'4', u'name': u'empty', u'value': u''}])
# explicit type check cuz' we no like stinkin' autocasting! yarrr
for result_row in result:
self.assert_(all((isinstance(k, unicode) for k in result_row.keys())))
self.assert_(all((isinstance(v, unicode) for v in result_row.values())))
def test_iterator_delimiter(self):
body = open(self.sample_feed_path).read().replace(',', '\t')
response = Response(domain="example.com", url="http://example.com/", body=body)
csv = csviter(response, delimiter='\t')
self.assertEqual([row for row in csv],
[{u'id': u'1', u'name': u'alpha', u'value': u'foobar'},
{u'id': u'2', u'name': u'unicode', u'value': u'\xfan\xedc\xf3d\xe9\u203d'},
{u'id': u'3', u'name': u'multi', u'value': u'foo\nbar'},
{u'id': u'4', u'name': u'empty', u'value': u''}])
def test_iterator_headers(self):
sample = open(self.sample_feed_path).read().splitlines()
headers, body = sample[0].split(','), '\n'.join(sample[1:])
response = Response(domain="example.com", url="http://example.com/", body=body)
csv = csviter(response, headers=headers)
self.assertEqual([row for row in csv],
[{u'id': u'1', u'name': u'alpha', u'value': u'foobar'},
{u'id': u'2', u'name': u'unicode', u'value': u'\xfan\xedc\xf3d\xe9\u203d'},
{u'id': u'3', u'name': u'multi', u'value': u'foo\nbar'},
{u'id': u'4', u'name': u'empty', u'value': u''}])
def test_iterator_falserow(self):
body = open(self.sample_feed_path).read()
body = '\n'.join((body, 'a,b', 'a,b,c,d'))
response = Response(domain="example.com", url="http://example.com/", body=body)
csv = csviter(response)
self.assertEqual([row for row in csv],
[{u'id': u'1', u'name': u'alpha', u'value': u'foobar'},
{u'id': u'2', u'name': u'unicode', u'value': u'\xfan\xedc\xf3d\xe9\u203d'},
{u'id': u'3', u'name': u'multi', u'value': u'foo\nbar'},
{u'id': u'4', u'name': u'empty', u'value': u''}])
def test_iterator_exception(self):
body = open(self.sample_feed_path).read()
response = Response(domain="example.com", url="http://example.com/", body=body)
iter = csviter(response)
iter.next()
iter.next()
iter.next()
iter.next()
self.assertRaises(StopIteration, iter.next)
if __name__ == "__main__":
unittest.main()

View File

@ -1,37 +0,0 @@
import os
import unittest
from scrapy.utils.xml import xpathselector_iternodes
from scrapy.http import Response
class UtilsXmlTestCase(unittest.TestCase):
def test_iterator(self):
body = """<?xml version="1.0" encoding="UTF-8"?>
<products xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="someschmea.xsd">
<product id="001">
<type>Type 1</type>
<name>Name 1</name>
</product>
<product id="002">
<type>Type 2</type>
<name>Name 2</name>
</product>
</products>
"""
response = Response(domain="example.com", url="http://example.com", body=body)
attrs = []
for x in xpathselector_iternodes(response, 'product'):
attrs.append((x.x("@id").extract(), x.x("name/text()").extract(), x.x("./type/text()").extract()))
self.assertEqual(attrs,
[(['001'], ['Name 1'], ['Type 1']), (['002'], ['Name 2'], ['Type 2'])])
def test_iterator_text(self):
body = u"""<?xml version="1.0" encoding="UTF-8"?><products><product>one</product><product>two</product></products>"""
self.assertEqual([x.x("text()").extract() for x in xpathselector_iternodes(body, 'product')],
[[u'one'], [u'two']])
if __name__ == "__main__":
unittest.main()

View File

@ -1,23 +1,52 @@
import csv
import re, csv
from scrapy.xpath import XmlXPathSelector
from scrapy.http import Response
from scrapy import log
def csv_iter(response, delimiter=None, headers=None):
if delimiter:
csv_r = csv.reader(response.body.to_unicode().split('\n'), delimiter=delimiter)
def _normalize_input(obj):
assert isinstance(obj, (Response, basestring)), "obj must be Response or basestring, not %s" % type(obj).__name__
if isinstance(obj, Response):
return obj.body.to_unicode()
elif isinstance(obj, str):
return obj.decode('utf-8')
else:
csv_r = csv.reader(response.body.to_unicode().split('\n'))
return obj
def xmliter(obj, nodename):
"""Return a iterator of XPathSelector's over all nodes of a XML document,
given tha name of the node to iterate. Useful for parsing XML feeds.
obj can be:
- a Response object
- a unicode string
- a string encoded as utf-8
"""
text = _normalize_input(obj)
r = re.compile(r"<%s[\s>].*?</%s>" % (nodename, nodename), re.DOTALL)
for match in r.finditer(text):
nodetext = match.group()
yield XmlXPathSelector(text=nodetext).x('/' + nodename)[0]
def csviter(obj, delimiter=None, headers=None):
def _getrow(csv_r):
return [field.decode() for field in csv_r.next()]
lines = _normalize_input(obj).splitlines(True)
if delimiter:
csv_r = csv.reader(lines, delimiter=delimiter)
else:
csv_r = csv.reader(lines)
if not headers:
headers = csv_r.next()
headers = _getrow(csv_r)
while True:
node = csv_r.next()
if len(node) != len(headers):
log.msg("ignoring node %d (length: %d, should be: %d)" % (csv_r.line_num, len(node), len(headers)), log.WARNING)
row = _getrow(csv_r)
if len(row) != len(headers):
log.msg("ignoring row %d (length: %d, should be: %d)" % (csv_r.line_num, len(row), len(headers)), log.WARNING)
continue
yield dict(zip(headers, node))
else:
yield dict(zip(headers, row))

View File

@ -1,28 +0,0 @@
import re
from scrapy.xpath import XmlXPathSelector
from scrapy.http import Response
def xpathselector_iternodes(obj, nodename):
"""Return a iterator of XPathSelector's over all nodes of a XML document,
given tha name of the node to iterate. Useful for parsing XML feeds.
obj can be:
- a Response object
- a unicode string
- a string encoded as utf-8
"""
assert isinstance(obj, (Response, basestring)), "obj must be Response or basestring, not %s" % type(obj).__name__
if isinstance(obj, Response):
text = obj.body.to_unicode()
elif isinstance(obj, str):
text = obj.decode('utf-8')
else:
text = obj
r = re.compile(r"<%s[\s>].*?</%s>" % (nodename, nodename), re.DOTALL)
for match in r.finditer(text):
nodetext = match.group()
yield XmlXPathSelector(text=nodetext).x('/' + nodename)[0]