1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-26 20:23:53 +00:00

Merge pull request #1384 from scrapy/tmp-py3

In-progress Python 3 port
This commit is contained in:
Daniel Graña 2015-07-30 12:33:54 -03:00
commit 8177387415
39 changed files with 387 additions and 1154 deletions

View File

@ -5,3 +5,4 @@ cssselect>=0.9
w3lib>=1.8.0 w3lib>=1.8.0
queuelib queuelib
six>=1.5.2 six>=1.5.2
PyDispatcher>=2.0.5

View File

@ -65,5 +65,17 @@ class RobotsTxtMiddleware(object):
def _parse_robots(self, response): def _parse_robots(self, response):
rp = robotparser.RobotFileParser(response.url) rp = robotparser.RobotFileParser(response.url)
rp.parse(response.body.splitlines()) body = ''
if hasattr(response, 'body_as_unicode'):
body = response.body_as_unicode()
else: # last effort try
try:
body = response.body.decode('utf-8')
except UnicodeDecodeError:
# If we found garbage, disregard it:,
# but keep the lookup cached (in self._parsers)
# Running rp.parse() will set rp state from
# 'disallow all' to 'allow any'.
pass
rp.parse(body.splitlines())
self._parsers[urlparse_cached(response).netloc] = rp self._parsers[urlparse_cached(response).netloc] = rp

View File

@ -8,6 +8,7 @@ import six
from w3lib.url import safe_url_string from w3lib.url import safe_url_string
from scrapy.http.headers import Headers from scrapy.http.headers import Headers
from scrapy.utils.python import to_native_str, to_bytes
from scrapy.utils.trackref import object_ref from scrapy.utils.trackref import object_ref
from scrapy.utils.url import escape_ajax from scrapy.utils.url import escape_ajax
from scrapy.http.common import obsolete_setter from scrapy.http.common import obsolete_setter
@ -46,15 +47,12 @@ class Request(object_ref):
return self._url return self._url
def _set_url(self, url): def _set_url(self, url):
if isinstance(url, str): if not isinstance(url, six.string_types):
self._url = escape_ajax(safe_url_string(url))
elif isinstance(url, six.text_type):
if self.encoding is None:
raise TypeError('Cannot convert unicode url - %s has no encoding' %
type(self).__name__)
self._set_url(url.encode(self.encoding))
else:
raise TypeError('Request url must be str or unicode, got %s:' % type(url).__name__) raise TypeError('Request url must be str or unicode, got %s:' % type(url).__name__)
url = to_native_str(url, self.encoding)
self._url = escape_ajax(safe_url_string(url))
if ':' not in self._url: if ':' not in self._url:
raise ValueError('Missing scheme in request url: %s' % self._url) raise ValueError('Missing scheme in request url: %s' % self._url)
@ -64,17 +62,10 @@ class Request(object_ref):
return self._body return self._body
def _set_body(self, body): def _set_body(self, body):
if isinstance(body, str): if body is None:
self._body = body self._body = b''
elif isinstance(body, six.text_type):
if self.encoding is None:
raise TypeError('Cannot convert unicode body - %s has no encoding' %
type(self).__name__)
self._body = body.encode(self.encoding)
elif body is None:
self._body = ''
else: else:
raise TypeError("Request body must either str or unicode. Got: '%s'" % type(body).__name__) self._body = to_bytes(body, self.encoding)
body = property(_get_body, obsolete_setter(_set_body, 'body')) body = property(_get_body, obsolete_setter(_set_body, 'body'))

View File

@ -9,7 +9,7 @@ from six.moves.urllib.parse import urljoin, urlencode
import lxml.html import lxml.html
import six import six
from scrapy.http.request import Request from scrapy.http.request import Request
from scrapy.utils.python import to_bytes from scrapy.utils.python import to_bytes, is_listlike
class FormRequest(Request): class FormRequest(Request):
@ -25,7 +25,7 @@ class FormRequest(Request):
items = formdata.items() if isinstance(formdata, dict) else formdata items = formdata.items() if isinstance(formdata, dict) else formdata
querystr = _urlencode(items, self.encoding) querystr = _urlencode(items, self.encoding)
if self.method == 'POST': if self.method == 'POST':
self.headers.setdefault('Content-Type', 'application/x-www-form-urlencoded') self.headers.setdefault(b'Content-Type', b'application/x-www-form-urlencoded')
self._set_body(querystr) self._set_body(querystr)
else: else:
self._set_url(self.url + ('&' if '?' in self.url else '?') + querystr) self._set_url(self.url + ('&' if '?' in self.url else '?') + querystr)
@ -50,7 +50,7 @@ def _get_form_url(form, url):
def _urlencode(seq, enc): def _urlencode(seq, enc):
values = [(to_bytes(k, enc), to_bytes(v, enc)) values = [(to_bytes(k, enc), to_bytes(v, enc))
for k, vs in seq for k, vs in seq
for v in (vs if hasattr(vs, '__iter__') else [vs])] for v in (vs if is_listlike(vs) else [vs])]
return urlencode(values, doseq=1) return urlencode(values, doseq=1)

View File

@ -4,9 +4,6 @@ responses in Scrapy.
See documentation in docs/topics/request-response.rst See documentation in docs/topics/request-response.rst
""" """
import copy
from six.moves.urllib.parse import urljoin from six.moves.urllib.parse import urljoin
from scrapy.http.headers import Headers from scrapy.http.headers import Headers
@ -15,7 +12,7 @@ from scrapy.http.common import obsolete_setter
class Response(object_ref): class Response(object_ref):
def __init__(self, url, status=200, headers=None, body='', flags=None, request=None): def __init__(self, url, status=200, headers=None, body=b'', flags=None, request=None):
self.headers = Headers(headers or {}) self.headers = Headers(headers or {})
self.status = int(status) self.status = int(status)
self._set_body(body) self._set_body(body)
@ -28,8 +25,10 @@ class Response(object_ref):
try: try:
return self.request.meta return self.request.meta
except AttributeError: except AttributeError:
raise AttributeError("Response.meta not available, this response " \ raise AttributeError(
"is not tied to any request") "Response.meta not available, this response "
"is not tied to any request"
)
def _get_url(self): def _get_url(self):
return self._url return self._url
@ -38,7 +37,7 @@ class Response(object_ref):
if isinstance(url, str): if isinstance(url, str):
self._url = url self._url = url
else: else:
raise TypeError('%s url must be str, got %s:' % (type(self).__name__, \ raise TypeError('%s url must be str, got %s:' % (type(self).__name__,
type(url).__name__)) type(url).__name__))
url = property(_get_url, obsolete_setter(_set_url, 'url')) url = property(_get_url, obsolete_setter(_set_url, 'url'))
@ -47,16 +46,15 @@ class Response(object_ref):
return self._body return self._body
def _set_body(self, body): def _set_body(self, body):
if isinstance(body, str): if body is None:
self._body = body self._body = b''
elif isinstance(body, unicode): elif not isinstance(body, bytes):
raise TypeError("Cannot assign a unicode body to a raw Response. " \ raise TypeError(
"Use TextResponse, HtmlResponse, etc") "Response body must be bytes. "
elif body is None: "If you want to pass unicode body use TextResponse "
self._body = '' "or HtmlResponse.")
else: else:
raise TypeError("Response body must either be str or unicode. Got: '%s'" \ self._body = body
% type(body).__name__)
body = property(_get_body, obsolete_setter(_set_body, 'body')) body = property(_get_body, obsolete_setter(_set_body, 'body'))

View File

@ -5,13 +5,14 @@ discovering (through HTTP headers) to base Response class.
See documentation in docs/topics/request-response.rst See documentation in docs/topics/request-response.rst
""" """
import six
from six.moves.urllib.parse import urljoin from six.moves.urllib.parse import urljoin
from w3lib.encoding import html_to_unicode, resolve_encoding, \ from w3lib.encoding import html_to_unicode, resolve_encoding, \
html_body_declared_encoding, http_content_type_encoding html_body_declared_encoding, http_content_type_encoding
from scrapy.http.response import Response from scrapy.http.response import Response
from scrapy.utils.response import get_base_url from scrapy.utils.response import get_base_url
from scrapy.utils.python import memoizemethod_noargs from scrapy.utils.python import memoizemethod_noargs, to_native_str
class TextResponse(Response): class TextResponse(Response):
@ -26,18 +27,18 @@ class TextResponse(Response):
super(TextResponse, self).__init__(*args, **kwargs) super(TextResponse, self).__init__(*args, **kwargs)
def _set_url(self, url): def _set_url(self, url):
if isinstance(url, unicode): if isinstance(url, six.text_type):
if self.encoding is None: if six.PY2 and self.encoding is None:
raise TypeError('Cannot convert unicode url - %s has no encoding' % raise TypeError("Cannot convert unicode url - %s "
type(self).__name__) "has no encoding" % type(self).__name__)
self._url = url.encode(self.encoding) self._url = to_native_str(url, self.encoding)
else: else:
super(TextResponse, self)._set_url(url) super(TextResponse, self)._set_url(url)
def _set_body(self, body): def _set_body(self, body):
self._body = '' self._body = b'' # used by encoding detection
if isinstance(body, unicode): if isinstance(body, six.text_type):
if self.encoding is None: if self._encoding is None:
raise TypeError('Cannot convert unicode body - %s has no encoding' % raise TypeError('Cannot convert unicode body - %s has no encoding' %
type(self).__name__) type(self).__name__)
self._body = body.encode(self._encoding) self._body = body.encode(self._encoding)
@ -73,14 +74,14 @@ class TextResponse(Response):
@memoizemethod_noargs @memoizemethod_noargs
def _headers_encoding(self): def _headers_encoding(self):
content_type = self.headers.get('Content-Type') content_type = self.headers.get(b'Content-Type', b'')
return http_content_type_encoding(content_type) return http_content_type_encoding(to_native_str(content_type))
def _body_inferred_encoding(self): def _body_inferred_encoding(self):
if self._cached_benc is None: if self._cached_benc is None:
content_type = self.headers.get('Content-Type') content_type = to_native_str(self.headers.get(b'Content-Type', b''))
benc, ubody = html_to_unicode(content_type, self.body, \ benc, ubody = html_to_unicode(content_type, self.body,
auto_detect_fun=self._auto_detect_fun, \ auto_detect_fun=self._auto_detect_fun,
default_encoding=self._DEFAULT_ENCODING) default_encoding=self._DEFAULT_ENCODING)
self._cached_benc = benc self._cached_benc = benc
self._cached_ubody = ubody self._cached_ubody = ubody

View File

@ -7,9 +7,9 @@ See documentation in topics/media-pipeline.rst
import hashlib import hashlib
import os import os
import os.path import os.path
import rfc822
import time import time
import logging import logging
from email.utils import parsedate_tz, mktime_tz
from six.moves.urllib.parse import urlparse from six.moves.urllib.parse import urlparse
from collections import defaultdict from collections import defaultdict
import six import six
@ -91,8 +91,8 @@ class S3FilesStore(object):
def _onsuccess(boto_key): def _onsuccess(boto_key):
checksum = boto_key.etag.strip('"') checksum = boto_key.etag.strip('"')
last_modified = boto_key.last_modified last_modified = boto_key.last_modified
modified_tuple = rfc822.parsedate_tz(last_modified) modified_tuple = parsedate_tz(last_modified)
modified_stamp = int(rfc822.mktime_tz(modified_tuple)) modified_stamp = int(mktime_tz(modified_tuple))
return {'checksum': checksum, 'last_modified': modified_stamp} return {'checksum': checksum, 'last_modified': modified_stamp}
return self._get_boto_key(path).addCallback(_onsuccess) return self._get_boto_key(path).addCallback(_onsuccess)

View File

@ -92,9 +92,9 @@ class ResponseTypes(object):
chunk = body[:5000] chunk = body[:5000]
if isbinarytext(chunk): if isbinarytext(chunk):
return self.from_mimetype('application/octet-stream') return self.from_mimetype('application/octet-stream')
elif "<html>" in chunk.lower(): elif b"<html>" in chunk.lower():
return self.from_mimetype('text/html') return self.from_mimetype('text/html')
elif "<?xml" in chunk.lower(): elif b"<?xml" in chunk.lower():
return self.from_mimetype('text/xml') return self.from_mimetype('text/xml')
else: else:
return self.from_mimetype('text') return self.from_mimetype('text')

View File

@ -121,7 +121,7 @@ class Selector(object_ref):
try: try:
return etree.tostring(self._root, return etree.tostring(self._root,
method=self._tostring_method, method=self._tostring_method,
encoding=unicode, encoding="unicode",
with_tail=False) with_tail=False)
except (AttributeError, TypeError): except (AttributeError, TypeError):
if self._root is True: if self._root is True:
@ -129,7 +129,7 @@ class Selector(object_ref):
elif self._root is False: elif self._root is False:
return u'0' return u'0'
else: else:
return unicode(self._root) return six.text_type(self._root)
def register_namespace(self, prefix, uri): def register_namespace(self, prefix, uri):
if self.namespaces is None: if self.namespaces is None:

View File

@ -1,5 +1,5 @@
from __future__ import absolute_import from __future__ import absolute_import
from scrapy.xlib.pydispatch import dispatcher from pydispatch import dispatcher
from scrapy.utils import signal as _signal from scrapy.utils import signal as _signal

View File

@ -61,7 +61,7 @@ def parallel(iterable, count, callable, *args, **named):
""" """
coop = task.Cooperator() coop = task.Cooperator()
work = (callable(elem, *args, **named) for elem in iterable) work = (callable(elem, *args, **named) for elem in iterable)
return defer.DeferredList([coop.coiterate(work) for i in xrange(count)]) return defer.DeferredList([coop.coiterate(work) for i in range(count)])
def process_chain(callbacks, input, *a, **kw): def process_chain(callbacks, input, *a, **kw):
"""Return a Deferred built by chaining the given callbacks""" """Return a Deferred built by chaining the given callbacks"""
@ -97,7 +97,7 @@ def iter_errback(iterable, errback, *a, **kw):
iterating it. iterating it.
""" """
it = iter(iterable) it = iter(iterable)
while 1: while True:
try: try:
yield next(it) yield next(it)
except StopIteration: except StopIteration:

View File

@ -7,7 +7,7 @@ from pkgutil import iter_modules
import six import six
from w3lib.html import replace_entities from w3lib.html import replace_entities
from scrapy.utils.python import flatten from scrapy.utils.python import flatten, to_unicode
from scrapy.item import BaseItem from scrapy.item import BaseItem
@ -81,7 +81,7 @@ def extract_regex(regex, text, encoding='utf-8'):
* if the regex doesn't contain any group the entire regex matching is returned * if the regex doesn't contain any group the entire regex matching is returned
""" """
if isinstance(regex, basestring): if isinstance(regex, six.string_types):
regex = re.compile(regex, re.UNICODE) regex = re.compile(regex, re.UNICODE)
try: try:
@ -90,10 +90,11 @@ def extract_regex(regex, text, encoding='utf-8'):
strings = regex.findall(text) # full regex or numbered groups strings = regex.findall(text) # full regex or numbered groups
strings = flatten(strings) strings = flatten(strings)
if isinstance(text, unicode): if isinstance(text, six.text_type):
return [replace_entities(s, keep=['lt', 'amp']) for s in strings] return [replace_entities(s, keep=['lt', 'amp']) for s in strings]
else: else:
return [replace_entities(unicode(s, encoding), keep=['lt', 'amp']) for s in strings] return [replace_entities(to_unicode(s, encoding), keep=['lt', 'amp'])
for s in strings]
def md5sum(file): def md5sum(file):
@ -105,7 +106,7 @@ def md5sum(file):
'784406af91dd5a54fbb9c84c2236595a' '784406af91dd5a54fbb9c84c2236595a'
""" """
m = hashlib.md5() m = hashlib.md5()
while 1: while True:
d = file.read(8096) d = file.read(8096)
if not d: if not d:
break break

View File

@ -120,6 +120,15 @@ def to_bytes(text, encoding=None, errors='strict'):
return text.encode(encoding, errors) return text.encode(encoding, errors)
def to_native_str(text, encoding=None, errors='strict'):
""" Return str representation of `text`
(bytes in Python 2.x and unicode in Python 3.x). """
if six.PY2:
return to_bytes(text, encoding, errors)
else:
return to_unicode(text, encoding, errors)
def re_rsearch(pattern, text, chunk_size=1024): def re_rsearch(pattern, text, chunk_size=1024):
""" """
This function does a reverse search in a text using a regular expression This function does a reverse search in a text using a regular expression

View File

@ -10,6 +10,7 @@ from six.moves.urllib.parse import urlunparse
from twisted.internet.defer import Deferred from twisted.internet.defer import Deferred
from w3lib.http import basic_auth_header from w3lib.http import basic_auth_header
from scrapy.utils.python import to_bytes, to_native_str
from scrapy.utils.url import canonicalize_url from scrapy.utils.url import canonicalize_url
from scrapy.utils.httpobj import urlparse_cached from scrapy.utils.httpobj import urlparse_cached
@ -44,13 +45,14 @@ def request_fingerprint(request, include_headers=None):
""" """
if include_headers: if include_headers:
include_headers = tuple([h.lower() for h in sorted(include_headers)]) include_headers = tuple([to_bytes(h.lower())
for h in sorted(include_headers)])
cache = _fingerprint_cache.setdefault(request, {}) cache = _fingerprint_cache.setdefault(request, {})
if include_headers not in cache: if include_headers not in cache:
fp = hashlib.sha1() fp = hashlib.sha1()
fp.update(request.method) fp.update(to_bytes(request.method))
fp.update(canonicalize_url(request.url)) fp.update(to_bytes(canonicalize_url(request.url)))
fp.update(request.body or '') fp.update(request.body or b'')
if include_headers: if include_headers:
for hdr in include_headers: for hdr in include_headers:
if hdr in request.headers: if hdr in request.headers:
@ -60,12 +62,14 @@ def request_fingerprint(request, include_headers=None):
cache[include_headers] = fp.hexdigest() cache[include_headers] = fp.hexdigest()
return cache[include_headers] return cache[include_headers]
def request_authenticate(request, username, password): def request_authenticate(request, username, password):
"""Autenticate the given request (in place) using the HTTP basic access """Autenticate the given request (in place) using the HTTP basic access
authentication mechanism (RFC 2617) and the given username and password authentication mechanism (RFC 2617) and the given username and password
""" """
request.headers['Authorization'] = basic_auth_header(username, password) request.headers['Authorization'] = basic_auth_header(username, password)
def request_httprepr(request): def request_httprepr(request):
"""Return the raw HTTP representation (as string) of the given request. """Return the raw HTTP representation (as string) of the given request.
This is provided only for reference since it's not the actual stream of This is provided only for reference since it's not the actual stream of
@ -74,11 +78,11 @@ def request_httprepr(request):
""" """
parsed = urlparse_cached(request) parsed = urlparse_cached(request)
path = urlunparse(('', '', parsed.path or '/', parsed.params, parsed.query, '')) path = urlunparse(('', '', parsed.path or '/', parsed.params, parsed.query, ''))
s = "%s %s HTTP/1.1\r\n" % (request.method, path) s = to_bytes(request.method) + b" " + to_bytes(path) + b" HTTP/1.1\r\n"
s += "Host: %s\r\n" % parsed.hostname s += b"Host: " + to_bytes(parsed.hostname) + b"\r\n"
if request.headers: if request.headers:
s += request.headers.to_string() + "\r\n" s += request.headers.to_string() + b"\r\n"
s += "\r\n" s += b"\r\n"
s += request.body s += request.body
return s return s

View File

@ -5,19 +5,23 @@ import logging
from twisted.internet.defer import maybeDeferred, DeferredList, Deferred from twisted.internet.defer import maybeDeferred, DeferredList, Deferred
from twisted.python.failure import Failure from twisted.python.failure import Failure
from scrapy.xlib.pydispatch.dispatcher import Any, Anonymous, liveReceivers, \ from pydispatch.dispatcher import Any, Anonymous, liveReceivers, \
getAllReceivers, disconnect getAllReceivers, disconnect
from scrapy.xlib.pydispatch.robustapply import robustApply from pydispatch.robustapply import robustApply
from scrapy.utils.log import failure_to_exc_info from scrapy.utils.log import failure_to_exc_info
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class _IgnoredException(Exception):
pass
def send_catch_log(signal=Any, sender=Anonymous, *arguments, **named): def send_catch_log(signal=Any, sender=Anonymous, *arguments, **named):
"""Like pydispatcher.robust.sendRobust but it also logs errors and returns """Like pydispatcher.robust.sendRobust but it also logs errors and returns
Failures instead of exceptions. Failures instead of exceptions.
""" """
dont_log = named.pop('dont_log', None) dont_log = named.pop('dont_log', _IgnoredException)
spider = named.get('spider', None) spider = named.get('spider', None)
responses = [] responses = []
for receiver in liveReceivers(getAllReceivers(sender, signal)): for receiver in liveReceivers(getAllReceivers(sender, signal)):
@ -39,6 +43,7 @@ def send_catch_log(signal=Any, sender=Anonymous, *arguments, **named):
responses.append((receiver, result)) responses.append((receiver, result))
return responses return responses
def send_catch_log_deferred(signal=Any, sender=Anonymous, *arguments, **named): def send_catch_log_deferred(signal=Any, sender=Anonymous, *arguments, **named):
"""Like send_catch_log but supports returning deferreds on signal handlers. """Like send_catch_log but supports returning deferreds on signal handlers.
Returns a deferred that gets fired once all signal handlers deferreds were Returns a deferred that gets fired once all signal handlers deferreds were
@ -65,6 +70,7 @@ def send_catch_log_deferred(signal=Any, sender=Anonymous, *arguments, **named):
d.addCallback(lambda out: [x[1] for x in out]) d.addCallback(lambda out: [x[1] for x in out])
return d return d
def disconnect_all(signal=Any, sender=Any): def disconnect_all(signal=Any, sender=Any):
"""Disconnect all signal handlers. Useful for cleaning up after running """Disconnect all signal handlers. Useful for cleaning up after running
tests tests

View File

@ -10,19 +10,20 @@ from six.moves.urllib.parse import (ParseResult, urlunparse, urldefrag,
urlparse, parse_qsl, urlencode, urlparse, parse_qsl, urlencode,
unquote) unquote)
# scrapy.utils.url was moved to w3lib.url and import * ensures this move doesn't break old code # scrapy.utils.url was moved to w3lib.url and import * ensures this
# move doesn't break old code
from w3lib.url import * from w3lib.url import *
from scrapy.utils.python import to_bytes from w3lib.url import _safe_chars
from scrapy.utils.python import to_native_str
def url_is_from_any_domain(url, domains): def url_is_from_any_domain(url, domains):
"""Return True if the url belongs to any of the given domains""" """Return True if the url belongs to any of the given domains"""
host = parse_url(url).netloc.lower() host = parse_url(url).netloc.lower()
if not host:
if host:
return any(((host == d.lower()) or (host.endswith('.%s' % d.lower())) for d in domains))
else:
return False return False
domains = [d.lower() for d in domains]
return any((host == d) or (host.endswith('.%s' % d)) for d in domains)
def url_is_from_spider(url, spider): def url_is_from_spider(url, spider):
@ -36,7 +37,7 @@ def url_has_any_extension(url, extensions):
def canonicalize_url(url, keep_blank_values=True, keep_fragments=False, def canonicalize_url(url, keep_blank_values=True, keep_fragments=False,
encoding=None): encoding=None):
"""Canonicalize the given url by applying the following procedures: """Canonicalize the given url by applying the following procedures:
- sort query arguments, first by key, then by value - sort query arguments, first by key, then by value
@ -57,6 +58,11 @@ def canonicalize_url(url, keep_blank_values=True, keep_fragments=False,
keyvals = parse_qsl(query, keep_blank_values) keyvals = parse_qsl(query, keep_blank_values)
keyvals.sort() keyvals.sort()
query = urlencode(keyvals) query = urlencode(keyvals)
# XXX: copied from w3lib.url.safe_url_string to add encoding argument
# path = to_native_str(path, encoding)
# path = moves.urllib.parse.quote(path, _safe_chars, encoding='latin1') or '/'
path = safe_url_string(_unquotepath(path)) or '/' path = safe_url_string(_unquotepath(path)) or '/'
fragment = '' if not keep_fragments else fragment fragment = '' if not keep_fragments else fragment
return urlunparse((scheme, netloc.lower(), path, params, query, fragment)) return urlunparse((scheme, netloc.lower(), path, params, query, fragment))
@ -74,7 +80,7 @@ def parse_url(url, encoding=None):
""" """
if isinstance(url, ParseResult): if isinstance(url, ParseResult):
return url return url
return urlparse(to_bytes(url, encoding)) return urlparse(to_native_str(url, encoding))
def escape_ajax(url): def escape_ajax(url):

View File

@ -1,6 +0,0 @@
"""Multi-consumer multi-producer dispatching mechanism
"""
__version__ = "2.0.0"
__author__ = "Patrick K. O'Brien"
__license__ = "BSD-style, see license.txt for details"

View File

@ -1,511 +0,0 @@
"""Multiple-producer-multiple-consumer signal-dispatching
dispatcher is the core of the PyDispatcher system,
providing the primary API and the core logic for the
system.
Module attributes of note:
Any -- Singleton used to signal either "Any Sender" or
"Any Signal". See documentation of the _Any class.
Anonymous -- Singleton used to signal "Anonymous Sender"
See documentation of the _Anonymous class.
Internal attributes:
WEAKREF_TYPES -- tuple of types/classes which represent
weak references to receivers, and thus must be de-
referenced on retrieval to retrieve the callable
object
connections -- { senderkey (id) : { signal : [receivers...]}}
senders -- { senderkey (id) : weakref(sender) }
used for cleaning up sender references on sender
deletion
sendersBack -- { receiverkey (id) : [senderkey (id)...] }
used for cleaning up receiver references on receiver
deletion, (considerably speeds up the cleanup process
vs. the original code.)
"""
from __future__ import generators
import types, weakref, six
from scrapy.xlib.pydispatch import saferef, robustapply, errors
__author__ = "Patrick K. O'Brien <pobrien@orbtech.com>"
__cvsid__ = "$Id: dispatcher.py,v 1.1.1.1 2006/07/07 15:59:38 mcfletch Exp $"
__version__ = "$Revision: 1.1.1.1 $"[11:-2]
class _Parameter:
"""Used to represent default parameter values."""
def __repr__(self):
return self.__class__.__name__
class _Any(_Parameter):
"""Singleton used to signal either "Any Sender" or "Any Signal"
The Any object can be used with connect, disconnect,
send, or sendExact to signal that the parameter given
Any should react to all senders/signals, not just
a particular sender/signal.
"""
Any = _Any()
class _Anonymous(_Parameter):
"""Singleton used to signal "Anonymous Sender"
The Anonymous object is used to signal that the sender
of a message is not specified (as distinct from being
"any sender"). Registering callbacks for Anonymous
will only receive messages sent without senders. Sending
with anonymous will only send messages to those receivers
registered for Any or Anonymous.
Note:
The default sender for connect is Any, while the
default sender for send is Anonymous. This has
the effect that if you do not specify any senders
in either function then all messages are routed
as though there was a single sender (Anonymous)
being used everywhere.
"""
Anonymous = _Anonymous()
WEAKREF_TYPES = (weakref.ReferenceType, saferef.BoundMethodWeakref)
connections = {}
senders = {}
sendersBack = {}
def connect(receiver, signal=Any, sender=Any, weak=True):
"""Connect receiver to sender for signal
receiver -- a callable Python object which is to receive
messages/signals/events. Receivers must be hashable
objects.
if weak is True, then receiver must be weak-referencable
(more precisely saferef.safeRef() must be able to create
a reference to the receiver).
Receivers are fairly flexible in their specification,
as the machinery in the robustApply module takes care
of most of the details regarding figuring out appropriate
subsets of the sent arguments to apply to a given
receiver.
Note:
if receiver is itself a weak reference (a callable),
it will be de-referenced by the system's machinery,
so *generally* weak references are not suitable as
receivers, though some use might be found for the
facility whereby a higher-level library passes in
pre-weakrefed receiver references.
signal -- the signal to which the receiver should respond
if Any, receiver will receive any signal from the
indicated sender (which might also be Any, but is not
necessarily Any).
Otherwise must be a hashable Python object other than
None (DispatcherError raised on None).
sender -- the sender to which the receiver should respond
if Any, receiver will receive the indicated signals
from any sender.
if Anonymous, receiver will only receive indicated
signals from send/sendExact which do not specify a
sender, or specify Anonymous explicitly as the sender.
Otherwise can be any python object.
weak -- whether to use weak references to the receiver
By default, the module will attempt to use weak
references to the receiver objects. If this parameter
is false, then strong references will be used.
returns None, may raise DispatcherTypeError
"""
if signal is None:
raise errors.DispatcherTypeError(
'Signal cannot be None (receiver=%r sender=%r)' % (
receiver, sender)
)
if weak:
receiver = saferef.safeRef(receiver, onDelete=_removeReceiver)
senderkey = id(sender)
if senderkey in connections:
signals = connections[senderkey]
else:
connections[senderkey] = signals = {}
# Keep track of senders for cleanup.
# Is Anonymous something we want to clean up?
if sender not in (None, Anonymous, Any):
def remove(object, senderkey=senderkey):
_removeSender(senderkey=senderkey)
# Skip objects that can not be weakly referenced, which means
# they won't be automatically cleaned up, but that's too bad.
try:
weakSender = weakref.ref(sender, remove)
senders[senderkey] = weakSender
except:
pass
receiverID = id(receiver)
# get current set, remove any current references to
# this receiver in the set, including back-references
if signal in signals:
receivers = signals[signal]
_removeOldBackRefs(senderkey, signal, receiver, receivers)
else:
receivers = signals[signal] = []
try:
current = sendersBack.get(receiverID)
if current is None:
sendersBack[receiverID] = current = []
if senderkey not in current:
current.append(senderkey)
except:
pass
receivers.append(receiver)
def disconnect(receiver, signal=Any, sender=Any, weak=True):
"""Disconnect receiver from sender for signal
receiver -- the registered receiver to disconnect
signal -- the registered signal to disconnect
sender -- the registered sender to disconnect
weak -- the weakref state to disconnect
disconnect reverses the process of connect,
the semantics for the individual elements are
logically equivalent to a tuple of
(receiver, signal, sender, weak) used as a key
to be deleted from the internal routing tables.
(The actual process is slightly more complex
but the semantics are basically the same).
Note:
Using disconnect is not required to cleanup
routing when an object is deleted, the framework
will remove routes for deleted objects
automatically. It's only necessary to disconnect
if you want to stop routing to a live object.
returns None, may raise DispatcherTypeError or
DispatcherKeyError
"""
if signal is None:
raise errors.DispatcherTypeError(
'Signal cannot be None (receiver=%r sender=%r)' % (
receiver, sender)
)
if weak: receiver = saferef.safeRef(receiver)
senderkey = id(sender)
try:
signals = connections[senderkey]
receivers = signals[signal]
except KeyError:
raise errors.DispatcherKeyError(
"""No receivers found for signal %r from sender %r""" % (
signal,
sender
)
)
try:
# also removes from receivers
_removeOldBackRefs(senderkey, signal, receiver, receivers)
except ValueError:
raise errors.DispatcherKeyError(
"""No connection to receiver %s for signal %s from sender %s""" % (
receiver,
signal,
sender
)
)
_cleanupConnections(senderkey, signal)
def getReceivers(sender=Any, signal=Any):
"""Get list of receivers from global tables
This utility function allows you to retrieve the
raw list of receivers from the connections table
for the given sender and signal pair.
Note:
there is no guarantee that this is the actual list
stored in the connections table, so the value
should be treated as a simple iterable/truth value
rather than, for instance a list to which you
might append new records.
Normally you would use liveReceivers( getReceivers( ...))
to retrieve the actual receiver objects as an iterable
object.
"""
try:
return connections[id(sender)][signal]
except KeyError:
return []
def liveReceivers(receivers):
"""Filter sequence of receivers to get resolved, live receivers
This is a generator which will iterate over
the passed sequence, checking for weak references
and resolving them, then returning all live
receivers.
"""
for receiver in receivers:
if isinstance(receiver, WEAKREF_TYPES):
# Dereference the weak reference.
receiver = receiver()
if receiver is not None:
yield receiver
else:
yield receiver
def getAllReceivers(sender=Any, signal=Any):
"""Get list of all receivers from global tables
This gets all receivers which should receive
the given signal from sender, each receiver should
be produced only once by the resulting generator
"""
receivers = {}
for set in (
# Get receivers that receive *this* signal from *this* sender.
getReceivers(sender, signal),
# Add receivers that receive *any* signal from *this* sender.
getReceivers(sender, Any),
# Add receivers that receive *this* signal from *any* sender.
getReceivers(Any, signal),
# Add receivers that receive *any* signal from *any* sender.
getReceivers(Any, Any),
):
for receiver in set:
if receiver: # filter out dead instance-method weakrefs
try:
if receiver not in receivers:
receivers[receiver] = 1
yield receiver
except TypeError:
# dead weakrefs raise TypeError on hash...
pass
def send(signal=Any, sender=Anonymous, *arguments, **named):
"""Send signal from sender to all connected receivers.
signal -- (hashable) signal value, see connect for details
sender -- the sender of the signal
if Any, only receivers registered for Any will receive
the message.
if Anonymous, only receivers registered to receive
messages from Anonymous or Any will receive the message
Otherwise can be any python object (normally one
registered with a connect if you actually want
something to occur).
arguments -- positional arguments which will be passed to
*all* receivers. Note that this may raise TypeErrors
if the receivers do not allow the particular arguments.
Note also that arguments are applied before named
arguments, so they should be used with care.
named -- named arguments which will be filtered according
to the parameters of the receivers to only provide those
acceptable to the receiver.
Return a list of tuple pairs [(receiver, response), ... ]
if any receiver raises an error, the error propagates back
through send, terminating the dispatch loop, so it is quite
possible to not have all receivers called if a raises an
error.
"""
# Call each receiver with whatever arguments it can accept.
# Return a list of tuple pairs [(receiver, response), ... ].
responses = []
for receiver in liveReceivers(getAllReceivers(sender, signal)):
response = robustapply.robustApply(
receiver,
signal=signal,
sender=sender,
*arguments,
**named
)
responses.append((receiver, response))
return responses
def sendExact(signal=Any, sender=Anonymous, *arguments, **named):
"""Send signal only to those receivers registered for exact message
sendExact allows for avoiding Any/Anonymous registered
handlers, sending only to those receivers explicitly
registered for a particular signal on a particular
sender.
"""
responses = []
for receiver in liveReceivers(getReceivers(sender, signal)):
response = robustapply.robustApply(
receiver,
signal=signal,
sender=sender,
*arguments,
**named
)
responses.append((receiver, response))
return responses
def _removeReceiver(receiver):
"""Remove receiver from connections."""
if not sendersBack:
# During module cleanup the mapping will be replaced with None
return False
backKey = id(receiver)
try:
backSet = sendersBack.pop(backKey)
except KeyError as err:
return False
else:
for senderkey in backSet:
try:
signals = connections[senderkey].keys()
except KeyError as err:
pass
else:
for signal in signals:
try:
receivers = connections[senderkey][signal]
except KeyError:
pass
else:
try:
receivers.remove(receiver)
except Exception as err:
pass
_cleanupConnections(senderkey, signal)
def _cleanupConnections(senderkey, signal):
"""Delete any empty signals for senderkey. Delete senderkey if empty."""
try:
receivers = connections[senderkey][signal]
except:
pass
else:
if not receivers:
# No more connected receivers. Therefore, remove the signal.
try:
signals = connections[senderkey]
except KeyError:
pass
else:
del signals[signal]
if not signals:
# No more signal connections. Therefore, remove the sender.
_removeSender(senderkey)
def _removeSender(senderkey):
"""Remove senderkey from connections."""
_removeBackrefs(senderkey)
try:
del connections[senderkey]
except KeyError:
pass
# Senderkey will only be in senders dictionary if sender
# could be weakly referenced.
try:
del senders[senderkey]
except:
pass
def _removeBackrefs(senderkey):
"""Remove all back-references to this senderkey"""
try:
signals = connections[senderkey]
except KeyError:
signals = None
else:
items = signals.items()
def allReceivers():
for signal, set in items:
for item in set:
yield item
for receiver in allReceivers():
_killBackref(receiver, senderkey)
def _removeOldBackRefs(senderkey, signal, receiver, receivers):
"""Kill old sendersBack references from receiver
This guards against multiple registration of the same
receiver for a given signal and sender leaking memory
as old back reference records build up.
Also removes old receiver instance from receivers
"""
try:
index = receivers.index(receiver)
# need to scan back references here and remove senderkey
except ValueError:
return False
else:
oldReceiver = receivers[index]
del receivers[index]
found = 0
signals = connections.get(signal)
if signals is not None:
for sig, recs in six.iteritems(connections.get(signal, {})):
if sig != signal:
for rec in recs:
if rec is oldReceiver:
found = 1
break
if not found:
_killBackref(oldReceiver, senderkey)
return True
return False
def _killBackref(receiver, senderkey):
"""Do the actual removal of back reference from receiver to senderkey"""
receiverkey = id(receiver)
set = sendersBack.get(receiverkey, ())
while senderkey in set:
try:
set.remove(senderkey)
except:
break
if not set:
try:
del sendersBack[receiverkey]
except KeyError:
pass
return True

View File

@ -1,15 +0,0 @@
"""Error types for dispatcher mechanism
"""
class DispatcherError(Exception):
"""Base class for all Dispatcher errors"""
class DispatcherKeyError(KeyError, DispatcherError):
"""Error raised when unknown (sender,signal) set specified"""
class DispatcherTypeError(TypeError, DispatcherError):
"""Error raised when inappropriate signal-type specified (None)"""

View File

@ -1,34 +0,0 @@
PyDispatcher License
Copyright (c) 2001-2006, Patrick K. O'Brien and Contributors
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials
provided with the distribution.
The name of Patrick K. O'Brien, or the name of any Contributor,
may not be used to endorse or promote products derived from this
software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
COPYRIGHT HOLDERS AND CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,54 +0,0 @@
"""Module implementing error-catching version of send (sendRobust)"""
from scrapy.xlib.pydispatch.dispatcher import Any, Anonymous, liveReceivers, getAllReceivers
from scrapy.xlib.pydispatch.robustapply import robustApply
def sendRobust(signal=Any, sender=Anonymous, *arguments, **named):
"""Send signal from sender to all connected receivers catching errors
signal -- (hashable) signal value, see connect for details
sender -- the sender of the signal
if Any, only receivers registered for Any will receive
the message.
if Anonymous, only receivers registered to receive
messages from Anonymous or Any will receive the message
Otherwise can be any python object (normally one
registered with a connect if you actually want
something to occur).
arguments -- positional arguments which will be passed to
*all* receivers. Note that this may raise TypeErrors
if the receivers do not allow the particular arguments.
Note also that arguments are applied before named
arguments, so they should be used with care.
named -- named arguments which will be filtered according
to the parameters of the receivers to only provide those
acceptable to the receiver.
Return a list of tuple pairs [(receiver, response), ... ]
if any receiver raises an error (specifically any subclass of Exception),
the error instance is returned as the result for that receiver.
"""
# Call each receiver with whatever arguments it can accept.
# Return a list of tuple pairs [(receiver, response), ... ].
responses = []
for receiver in liveReceivers(getAllReceivers(sender, signal)):
try:
response = robustApply(
receiver,
signal=signal,
sender=sender,
*arguments,
**named
)
except Exception as err:
responses.append((receiver, err))
else:
responses.append((receiver, response))
return responses

View File

@ -1,58 +0,0 @@
"""Robust apply mechanism
Provides a function "call", which can sort out
what arguments a given callable object can take,
and subset the given arguments to match only
those which are acceptable.
"""
import inspect
def function(receiver):
"""Get function-like callable object for given receiver
returns (function_or_method, codeObject, fromMethod)
If fromMethod is true, then the callable already
has its first argument bound
"""
if inspect.isclass(receiver) and hasattr(receiver, '__call__'):
# receiver is a class instance; assume it is callable.
# Reassign receiver to the actual method that will be called.
if hasattr(receiver.__call__, 'im_func') or \
hasattr(receiver.__call__, 'im_code'):
receiver = receiver.__call__
if hasattr(receiver, 'im_func'):
# an instance-method...
return receiver, receiver.im_func.func_code, 1
elif not hasattr(receiver, 'func_code'):
raise ValueError(
'unknown receiver type %s %s' % (receiver, type(receiver)))
return receiver, receiver.func_code, 0
def robustApply(receiver, *arguments, **named):
"""Call receiver with arguments and an appropriate subset of named
"""
receiver, codeObject, startIndex = function(receiver)
acceptable = codeObject.co_varnames[
startIndex + len(arguments):codeObject.co_argcount]
for name in codeObject.co_varnames[startIndex:startIndex + len(arguments)]:
if name in named:
raise TypeError(
"""Argument %r specified both positionally and as a keyword for calling %r""" % (
name, receiver,
)
)
if not (codeObject.co_flags & 8):
# fc does not have a **kwds type parameter, therefore
# remove unacceptable arguments.
for arg in named.keys():
if arg not in acceptable:
del named[arg]
return receiver(*arguments, **named)

View File

@ -1,180 +0,0 @@
"""Refactored "safe reference" from dispatcher.py"""
from __future__ import print_function
import weakref, traceback
def safeRef(target, onDelete=None):
"""Return a *safe* weak reference to a callable target
target -- the object to be weakly referenced, if it's a
bound method reference, will create a BoundMethodWeakref,
otherwise creates a simple weakref.
onDelete -- if provided, will have a hard reference stored
to the callable to be called after the safe reference
goes out of scope with the reference object, (either a
weakref or a BoundMethodWeakref) as argument.
"""
if hasattr(target, 'im_self'):
if target.im_self is not None:
# Turn a bound method into a BoundMethodWeakref instance.
# Keep track of these instances for lookup by disconnect().
assert hasattr(target, 'im_func'), """safeRef target %r has im_self, but no im_func, don't know how to create reference"""%( target,)
reference = BoundMethodWeakref(
target=target,
onDelete=onDelete
)
return reference
if onDelete is not None:
return weakref.ref(target, onDelete)
else:
return weakref.ref(target)
class BoundMethodWeakref(object):
"""'Safe' and reusable weak references to instance methods
BoundMethodWeakref objects provide a mechanism for
referencing a bound method without requiring that the
method object itself (which is normally a transient
object) is kept alive. Instead, the BoundMethodWeakref
object keeps weak references to both the object and the
function which together define the instance method.
Attributes:
key -- the identity key for the reference, calculated
by the class's calculateKey method applied to the
target instance method
deletionMethods -- sequence of callable objects taking
single argument, a reference to this object which
will be called when *either* the target object or
target function is garbage collected (i.e. when
this object becomes invalid). These are specified
as the onDelete parameters of safeRef calls.
weakSelf -- weak reference to the target object
weakFunc -- weak reference to the target function
Class Attributes:
_allInstances -- class attribute pointing to all live
BoundMethodWeakref objects indexed by the class's
calculateKey(target) method applied to the target
objects. This weak value dictionary is used to
short-circuit creation so that multiple references
to the same (object, function) pair produce the
same BoundMethodWeakref instance.
"""
_allInstances = weakref.WeakValueDictionary()
def __new__(cls, target, onDelete=None, *arguments, **named):
"""Create new instance or return current instance
Basically this method of construction allows us to
short-circuit creation of references to already-
referenced instance methods. The key corresponding
to the target is calculated, and if there is already
an existing reference, that is returned, with its
deletionMethods attribute updated. Otherwise the
new instance is created and registered in the table
of already-referenced methods.
"""
key = cls.calculateKey(target)
current = cls._allInstances.get(key)
if current is not None:
current.deletionMethods.append(onDelete)
return current
else:
base = super(BoundMethodWeakref, cls).__new__(cls)
cls._allInstances[key] = base
base.__init__(target, onDelete, *arguments, **named)
return base
def __init__(self, target, onDelete=None):
"""Return a weak-reference-like instance for a bound method
target -- the instance-method target for the weak
reference, must have im_self and im_func attributes
and be reconstructable via:
target.im_func.__get__( target.im_self )
which is true of built-in instance methods.
onDelete -- optional callback which will be called
when this weak reference ceases to be valid
(i.e. either the object or the function is garbage
collected). Should take a single argument,
which will be passed a pointer to this object.
"""
def remove(weak, self=self):
"""Set self.isDead to true when method or instance is destroyed"""
methods = self.deletionMethods[:]
del self.deletionMethods[:]
try:
del self.__class__._allInstances[self.key]
except KeyError:
pass
for function in methods:
try:
if callable(function):
function(self)
except Exception as e:
try:
traceback.print_exc()
except AttributeError as err:
print(
'''Exception during saferef %s cleanup function %s: %s''' % (
self, function, e
))
self.deletionMethods = [onDelete]
self.key = self.calculateKey(target)
self.weakSelf = weakref.ref(target.im_self, remove)
self.weakFunc = weakref.ref(target.im_func, remove)
self.selfName = target.im_self.__class__.__name__
self.funcName = str(target.im_func.__name__)
def calculateKey(cls, target):
"""Calculate the reference key for this reference
Currently this is a two-tuple of the id()'s of the
target object and the target function respectively.
"""
return (id(target.im_self), id(target.im_func))
calculateKey = classmethod(calculateKey)
def __str__(self):
"""Give a friendly representation of the object"""
return """%s( %s.%s )""" % (
self.__class__.__name__,
self.selfName,
self.funcName,
)
__repr__ = __str__
def __nonzero__(self):
"""Whether we are still a valid reference"""
return self() is not None
def __cmp__(self, other):
"""Compare with another reference"""
if not isinstance(other, self.__class__):
return cmp(self.__class__, type(other))
return cmp(self.key, other.key)
def __call__(self):
"""Return a strong reference to the bound method
If the target cannot be retrieved, then will
return None, otherwise returns a bound instance
method for our object and function.
Note:
You may call this method any number of times,
as it does not invalidate the reference.
"""
target = self.weakSelf()
if target is not None:
function = self.weakFunc()
if function is not None:
return function.__get__(target)
return None

View File

@ -44,5 +44,6 @@ setup(
'pyOpenSSL', 'pyOpenSSL',
'cssselect>=0.9', 'cssselect>=0.9',
'six>=1.5.2', 'six>=1.5.2',
'PyDispatcher>=2.0.5',
], ],
) )

View File

@ -12,7 +12,6 @@ tests/test_crawler.py
tests/test_downloader_handlers.py tests/test_downloader_handlers.py
tests/test_downloadermiddleware_ajaxcrawlable.py tests/test_downloadermiddleware_ajaxcrawlable.py
tests/test_downloadermiddleware_cookies.py tests/test_downloadermiddleware_cookies.py
tests/test_downloadermiddleware_decompression.py
tests/test_downloadermiddleware_defaultheaders.py tests/test_downloadermiddleware_defaultheaders.py
tests/test_downloadermiddleware_downloadtimeout.py tests/test_downloadermiddleware_downloadtimeout.py
tests/test_downloadermiddleware_httpauth.py tests/test_downloadermiddleware_httpauth.py
@ -22,50 +21,30 @@ tests/test_downloadermiddleware_httpproxy.py
tests/test_downloadermiddleware.py tests/test_downloadermiddleware.py
tests/test_downloadermiddleware_redirect.py tests/test_downloadermiddleware_redirect.py
tests/test_downloadermiddleware_retry.py tests/test_downloadermiddleware_retry.py
tests/test_downloadermiddleware_robotstxt.py
tests/test_downloadermiddleware_stats.py tests/test_downloadermiddleware_stats.py
tests/test_downloadermiddleware_useragent.py tests/test_downloadermiddleware_useragent.py
tests/test_dupefilters.py
tests/test_engine.py tests/test_engine.py
tests/test_http_cookies.py tests/test_http_cookies.py
tests/test_http_request.py
tests/test_http_response.py
tests/test_logformatter.py tests/test_logformatter.py
tests/test_mail.py tests/test_mail.py
tests/test_middleware.py
tests/test_pipeline_files.py tests/test_pipeline_files.py
tests/test_pipeline_images.py tests/test_pipeline_images.py
tests/test_pipeline_media.py
tests/test_proxy_connect.py tests/test_proxy_connect.py
tests/test_responsetypes.py tests/test_responsetypes.py
tests/test_selector_csstranslator.py tests/test_selector_csstranslator.py
tests/test_selector_lxmldocument.py tests/test_selector_lxmldocument.py
tests/test_selector.py tests/test_selector.py
tests/test_settings/__init__.py
tests/test_spiderloader/__init__.py
tests/test_spiderloader/test_spiders/__init__.py
tests/test_spiderloader/test_spiders/spider0.py
tests/test_spiderloader/test_spiders/spider1.py
tests/test_spiderloader/test_spiders/spider2.py
tests/test_spiderloader/test_spiders/spider3.py
tests/test_spiderloader/test_spiders/spider4.py
tests/test_spidermiddleware_depth.py tests/test_spidermiddleware_depth.py
tests/test_spidermiddleware_httperror.py tests/test_spidermiddleware_httperror.py
tests/test_spidermiddleware_offsite.py tests/test_spidermiddleware_offsite.py
tests/test_spidermiddleware_referer.py tests/test_spidermiddleware_referer.py
tests/test_spider.py tests/test_spider.py
tests/test_stats.py tests/test_stats.py
tests/test_utils_defer.py
tests/test_utils_iterators.py tests/test_utils_iterators.py
tests/test_utils_jsonrpc.py
tests/test_utils_log.py tests/test_utils_log.py
tests/test_utils_reqser.py tests/test_utils_reqser.py
tests/test_utils_request.py
tests/test_utils_response.py tests/test_utils_response.py
tests/test_utils_serialize.py
tests/test_utils_signal.py
tests/test_utils_template.py tests/test_utils_template.py
tests/test_utils_url.py
tests/test_webclient.py tests/test_webclient.py
scrapy/xlib/tx/iweb.py scrapy/xlib/tx/iweb.py
@ -93,6 +72,5 @@ scrapy/downloadermiddlewares/httpproxy.py
scrapy/downloadermiddlewares/cookies.py scrapy/downloadermiddlewares/cookies.py
scrapy/extensions/statsmailer.py scrapy/extensions/statsmailer.py
scrapy/extensions/memusage.py scrapy/extensions/memusage.py
scrapy/commands/deploy.py
scrapy/commands/bench.py scrapy/commands/bench.py
scrapy/mail.py scrapy/mail.py

View File

@ -39,7 +39,7 @@ class DecompressionMiddlewareTest(TestCase):
assert_samelines(self, new.body, rsp.body) assert_samelines(self, new.body, rsp.body)
def test_empty_response(self): def test_empty_response(self):
rsp = Response(url='http://test.com', body='') rsp = Response(url='http://test.com', body=b'')
new = self.mw.process_response(None, rsp, self.spider) new = self.mw.process_response(None, rsp, self.spider)
assert new is rsp assert new is rsp
assert not rsp.body assert not rsp.body

View File

@ -6,15 +6,44 @@ from twisted.python import failure
from twisted.trial import unittest from twisted.trial import unittest
from scrapy.downloadermiddlewares.robotstxt import RobotsTxtMiddleware from scrapy.downloadermiddlewares.robotstxt import RobotsTxtMiddleware
from scrapy.exceptions import IgnoreRequest, NotConfigured from scrapy.exceptions import IgnoreRequest, NotConfigured
from scrapy.http import Request, Response from scrapy.http import Request, Response, TextResponse
from scrapy.settings import Settings from scrapy.settings import Settings
from tests import mock from tests import mock
class RobotsTxtMiddlewareTest(unittest.TestCase): class RobotsTxtMiddlewareTest(unittest.TestCase):
def setUp(self):
self.crawler = mock.MagicMock()
self.crawler.settings = Settings()
self.crawler.engine.download = mock.MagicMock()
def tearDown(self):
del self.crawler
def test_robotstxt_settings(self):
self.crawler.settings = Settings()
self.crawler.settings.set('USER_AGENT', 'CustomAgent')
self.assertRaises(NotConfigured, RobotsTxtMiddleware, self.crawler)
def _get_successful_crawler(self):
crawler = self.crawler
crawler.settings.set('ROBOTSTXT_OBEY', True)
ROBOTS = re.sub(b'^\s+(?m)', b'', b'''
User-Agent: *
Disallow: /admin/
Disallow: /static/
''')
response = TextResponse('http://site.local/robots.txt', body=ROBOTS)
def return_response(request, spider):
deferred = Deferred()
reactor.callFromThread(deferred.callback, response)
return deferred
crawler.engine.download.side_effect = return_response
return crawler
def test_robotstxt(self): def test_robotstxt(self):
middleware = self._get_middleware() middleware = RobotsTxtMiddleware(self._get_successful_crawler())
# There is a bit of neglect in robotstxt.py: robots.txt is fetched asynchronously, # There is a bit of neglect in robotstxt.py: robots.txt is fetched asynchronously,
# and it is actually fetched only *after* first process_request completes. # and it is actually fetched only *after* first process_request completes.
# So, first process_request will always succeed. # So, first process_request will always succeed.
@ -30,8 +59,8 @@ class RobotsTxtMiddlewareTest(unittest.TestCase):
return deferred return deferred
def test_robotstxt_meta(self): def test_robotstxt_meta(self):
middleware = RobotsTxtMiddleware(self._get_successful_crawler())
meta = {'dont_obey_robotstxt': True} meta = {'dont_obey_robotstxt': True}
middleware = self._get_middleware()
self.assertNotIgnored(Request('http://site.local', meta=meta), middleware) self.assertNotIgnored(Request('http://site.local', meta=meta), middleware)
def test(r): def test(r):
self.assertNotIgnored(Request('http://site.local/allowed', meta=meta), middleware) self.assertNotIgnored(Request('http://site.local/allowed', meta=meta), middleware)
@ -42,19 +71,67 @@ class RobotsTxtMiddlewareTest(unittest.TestCase):
reactor.callFromThread(deferred.callback, None) reactor.callFromThread(deferred.callback, None)
return deferred return deferred
def test_robotstxt_error(self): def _get_garbage_crawler(self):
crawler = mock.MagicMock() crawler = self.crawler
crawler.settings = Settings()
crawler.settings.set('ROBOTSTXT_OBEY', True) crawler.settings.set('ROBOTSTXT_OBEY', True)
crawler.engine.download = mock.MagicMock() response = Response('http://site.local/robots.txt', body=b'GIF89a\xd3\x00\xfe\x00\xa2')
def return_response(request, spider):
deferred = Deferred()
reactor.callFromThread(deferred.callback, response)
return deferred
crawler.engine.download.side_effect = return_response
return crawler
def test_robotstxt_garbage(self):
# garbage response should be discarded, equal 'allow all'
middleware = RobotsTxtMiddleware(self._get_garbage_crawler())
middleware._logerror = mock.MagicMock()
middleware.process_request(Request('http://site.local'), None)
self.assertNotIgnored(Request('http://site.local'), middleware)
def test(r):
self.assertNotIgnored(Request('http://site.local/allowed'), middleware)
self.assertNotIgnored(Request('http://site.local/admin/main'), middleware)
self.assertNotIgnored(Request('http://site.local/static/'), middleware)
deferred = Deferred()
deferred.addCallback(test)
deferred.addErrback(lambda _: self.assertIsNone(middleware._logerror.assert_any_call()))
reactor.callFromThread(deferred.callback, None)
return deferred
def _get_emptybody_crawler(self):
crawler = self.crawler
crawler.settings.set('ROBOTSTXT_OBEY', True)
response = Response('http://site.local/robots.txt')
def return_response(request, spider):
deferred = Deferred()
reactor.callFromThread(deferred.callback, response)
return deferred
crawler.engine.download.side_effect = return_response
return crawler
def test_robotstxt_empty_response(self):
# empty response should equal 'allow all'
middleware = RobotsTxtMiddleware(self._get_emptybody_crawler())
self.assertNotIgnored(Request('http://site.local'), middleware)
def test(r):
self.assertNotIgnored(Request('http://site.local/allowed'), middleware)
self.assertNotIgnored(Request('http://site.local/admin/main'), middleware)
self.assertNotIgnored(Request('http://site.local/static/'), middleware)
deferred = Deferred()
deferred.addCallback(test)
reactor.callFromThread(deferred.callback, None)
return deferred
def test_robotstxt_error(self):
self.crawler.settings.set('ROBOTSTXT_OBEY', True)
err = error.DNSLookupError('Robotstxt address not found') err = error.DNSLookupError('Robotstxt address not found')
def return_failure(request, spider): def return_failure(request, spider):
deferred = Deferred() deferred = Deferred()
reactor.callFromThread(deferred.errback, failure.Failure(err)) reactor.callFromThread(deferred.errback, failure.Failure(err))
return deferred return deferred
crawler.engine.download.side_effect = return_failure self.crawler.engine.download.side_effect = return_failure
middleware = RobotsTxtMiddleware(crawler) middleware = RobotsTxtMiddleware(self.crawler)
middleware._logerror = mock.MagicMock() middleware._logerror = mock.MagicMock()
middleware.process_request(Request('http://site.local'), None) middleware.process_request(Request('http://site.local'), None)
deferred = Deferred() deferred = Deferred()
@ -69,27 +146,3 @@ class RobotsTxtMiddlewareTest(unittest.TestCase):
def assertIgnored(self, request, middleware): def assertIgnored(self, request, middleware):
spider = None # not actually used spider = None # not actually used
self.assertRaises(IgnoreRequest, middleware.process_request, request, spider) self.assertRaises(IgnoreRequest, middleware.process_request, request, spider)
def _get_crawler(self):
crawler = mock.MagicMock()
crawler.settings = Settings()
crawler.settings.set('USER_AGENT', 'CustomAgent')
self.assertRaises(NotConfigured, RobotsTxtMiddleware, crawler)
crawler.settings.set('ROBOTSTXT_OBEY', True)
crawler.engine.download = mock.MagicMock()
ROBOTS = re.sub(r'^\s+(?m)', '', '''
User-Agent: *
Disallow: /admin/
Disallow: /static/
''')
response = Response('http://site.local/robots.txt', body=ROBOTS)
def return_response(request, spider):
deferred = Deferred()
reactor.callFromThread(deferred.callback, response)
return deferred
crawler.engine.download.side_effect = return_response
return crawler
def _get_middleware(self):
crawler = self._get_crawler()
return RobotsTxtMiddleware(crawler)

View File

@ -3,6 +3,7 @@ import unittest
from scrapy.dupefilters import RFPDupeFilter from scrapy.dupefilters import RFPDupeFilter
from scrapy.http import Request from scrapy.http import Request
from scrapy.utils.python import to_bytes
class RFPDupeFilterTest(unittest.TestCase): class RFPDupeFilterTest(unittest.TestCase):
@ -43,7 +44,7 @@ class RFPDupeFilterTest(unittest.TestCase):
def request_fingerprint(self, request): def request_fingerprint(self, request):
fp = hashlib.sha1() fp = hashlib.sha1()
fp.update(request.url.lower()) fp.update(to_bytes(request.url.lower()))
return fp.hexdigest() return fp.hexdigest()
case_insensitive_dupefilter = CaseInsensitiveRFPDupeFilter() case_insensitive_dupefilter = CaseInsensitiveRFPDupeFilter()

View File

@ -20,7 +20,7 @@ from twisted.trial import unittest
from scrapy import signals from scrapy import signals
from scrapy.utils.test import get_crawler from scrapy.utils.test import get_crawler
from scrapy.xlib.pydispatch import dispatcher from pydispatch import dispatcher
from tests import tests_datadir from tests import tests_datadir
from scrapy.spiders import Spider from scrapy.spiders import Spider
from scrapy.item import Item, Field from scrapy.item import Item, Field

View File

@ -1,9 +1,12 @@
import cgi import cgi
import unittest import unittest
import six
from six.moves import xmlrpc_client as xmlrpclib from six.moves import xmlrpc_client as xmlrpclib
from six.moves.urllib.parse import urlparse from six.moves.urllib.parse import urlparse
from scrapy.http import Request, FormRequest, XmlRpcRequest, Headers, HtmlResponse from scrapy.http import Request, FormRequest, XmlRpcRequest, Headers, HtmlResponse
from scrapy.utils.python import to_bytes, to_native_str
class RequestTest(unittest.TestCase): class RequestTest(unittest.TestCase):
@ -31,13 +34,13 @@ class RequestTest(unittest.TestCase):
self.assertEqual(r.meta, self.default_meta) self.assertEqual(r.meta, self.default_meta)
meta = {"lala": "lolo"} meta = {"lala": "lolo"}
headers = {"caca": "coco"} headers = {b"caca": b"coco"}
r = self.request_class("http://www.example.com", meta=meta, headers=headers, body="a body") r = self.request_class("http://www.example.com", meta=meta, headers=headers, body="a body")
assert r.meta is not meta assert r.meta is not meta
self.assertEqual(r.meta, meta) self.assertEqual(r.meta, meta)
assert r.headers is not headers assert r.headers is not headers
self.assertEqual(r.headers["caca"], "coco") self.assertEqual(r.headers[b"caca"], b"coco")
def test_url_no_scheme(self): def test_url_no_scheme(self):
self.assertRaises(ValueError, self.request_class, 'foo') self.assertRaises(ValueError, self.request_class, 'foo')
@ -45,7 +48,7 @@ class RequestTest(unittest.TestCase):
def test_headers(self): def test_headers(self):
# Different ways of setting headers attribute # Different ways of setting headers attribute
url = 'http://www.scrapy.org' url = 'http://www.scrapy.org'
headers = {'Accept':'gzip', 'Custom-Header':'nothing to tell you'} headers = {b'Accept':'gzip', b'Custom-Header':'nothing to tell you'}
r = self.request_class(url=url, headers=headers) r = self.request_class(url=url, headers=headers)
p = self.request_class(url=url, headers=r.headers) p = self.request_class(url=url, headers=r.headers)
@ -57,9 +60,9 @@ class RequestTest(unittest.TestCase):
h = Headers({'key1': u'val1', u'key2': 'val2'}) h = Headers({'key1': u'val1', u'key2': 'val2'})
h[u'newkey'] = u'newval' h[u'newkey'] = u'newval'
for k, v in h.iteritems(): for k, v in h.iteritems():
self.assert_(isinstance(k, str)) self.assert_(isinstance(k, bytes))
for s in v: for s in v:
self.assert_(isinstance(s, str)) self.assert_(isinstance(s, bytes))
def test_eq(self): def test_eq(self):
url = 'http://www.scrapy.org' url = 'http://www.scrapy.org'
@ -73,17 +76,17 @@ class RequestTest(unittest.TestCase):
self.assertEqual(len(set_), 2) self.assertEqual(len(set_), 2)
def test_url(self): def test_url(self):
"""Request url tests"""
r = self.request_class(url="http://www.scrapy.org/path") r = self.request_class(url="http://www.scrapy.org/path")
self.assertEqual(r.url, "http://www.scrapy.org/path") self.assertEqual(r.url, "http://www.scrapy.org/path")
# url quoting on creation def test_url_quoting(self):
r = self.request_class(url="http://www.scrapy.org/blank%20space") r = self.request_class(url="http://www.scrapy.org/blank%20space")
self.assertEqual(r.url, "http://www.scrapy.org/blank%20space") self.assertEqual(r.url, "http://www.scrapy.org/blank%20space")
r = self.request_class(url="http://www.scrapy.org/blank space") r = self.request_class(url="http://www.scrapy.org/blank space")
self.assertEqual(r.url, "http://www.scrapy.org/blank%20space") self.assertEqual(r.url, "http://www.scrapy.org/blank%20space")
# url encoding @unittest.skipUnless(six.PY2, "TODO")
def test_url_encoding(self):
r1 = self.request_class(url=u"http://www.scrapy.org/price/\xa3", encoding="utf-8") r1 = self.request_class(url=u"http://www.scrapy.org/price/\xa3", encoding="utf-8")
r2 = self.request_class(url=u"http://www.scrapy.org/price/\xa3", encoding="latin1") r2 = self.request_class(url=u"http://www.scrapy.org/price/\xa3", encoding="latin1")
self.assertEqual(r1.url, "http://www.scrapy.org/price/%C2%A3") self.assertEqual(r1.url, "http://www.scrapy.org/price/%C2%A3")
@ -91,19 +94,19 @@ class RequestTest(unittest.TestCase):
def test_body(self): def test_body(self):
r1 = self.request_class(url="http://www.example.com/") r1 = self.request_class(url="http://www.example.com/")
assert r1.body == '' assert r1.body == b''
r2 = self.request_class(url="http://www.example.com/", body="") r2 = self.request_class(url="http://www.example.com/", body=b"")
assert isinstance(r2.body, str) assert isinstance(r2.body, bytes)
self.assertEqual(r2.encoding, 'utf-8') # default encoding self.assertEqual(r2.encoding, 'utf-8') # default encoding
r3 = self.request_class(url="http://www.example.com/", body=u"Price: \xa3100", encoding='utf-8') r3 = self.request_class(url="http://www.example.com/", body=u"Price: \xa3100", encoding='utf-8')
assert isinstance(r3.body, str) assert isinstance(r3.body, bytes)
self.assertEqual(r3.body, "Price: \xc2\xa3100") self.assertEqual(r3.body, b"Price: \xc2\xa3100")
r4 = self.request_class(url="http://www.example.com/", body=u"Price: \xa3100", encoding='latin1') r4 = self.request_class(url="http://www.example.com/", body=u"Price: \xa3100", encoding='latin1')
assert isinstance(r4.body, str) assert isinstance(r4.body, bytes)
self.assertEqual(r4.body, "Price: \xa3100") self.assertEqual(r4.body, b"Price: \xa3100")
def test_ajax_url(self): def test_ajax_url(self):
# ascii url # ascii url
@ -155,18 +158,19 @@ class RequestTest(unittest.TestCase):
def test_replace(self): def test_replace(self):
"""Test Request.replace() method""" """Test Request.replace() method"""
r1 = self.request_class("http://www.example.com", method='GET') r1 = self.request_class("http://www.example.com", method='GET')
hdrs = Headers(dict(r1.headers, key='value')) hdrs = Headers(r1.headers)
hdrs[b'key'] = b'value'
r2 = r1.replace(method="POST", body="New body", headers=hdrs) r2 = r1.replace(method="POST", body="New body", headers=hdrs)
self.assertEqual(r1.url, r2.url) self.assertEqual(r1.url, r2.url)
self.assertEqual((r1.method, r2.method), ("GET", "POST")) self.assertEqual((r1.method, r2.method), ("GET", "POST"))
self.assertEqual((r1.body, r2.body), ('', "New body")) self.assertEqual((r1.body, r2.body), (b'', b"New body"))
self.assertEqual((r1.headers, r2.headers), (self.default_headers, hdrs)) self.assertEqual((r1.headers, r2.headers), (self.default_headers, hdrs))
# Empty attributes (which may fail if not compared properly) # Empty attributes (which may fail if not compared properly)
r3 = self.request_class("http://www.example.com", meta={'a': 1}, dont_filter=True) r3 = self.request_class("http://www.example.com", meta={'a': 1}, dont_filter=True)
r4 = r3.replace(url="http://www.example.com/2", body='', meta={}, dont_filter=False) r4 = r3.replace(url="http://www.example.com/2", body=b'', meta={}, dont_filter=False)
self.assertEqual(r4.url, "http://www.example.com/2") self.assertEqual(r4.url, "http://www.example.com/2")
self.assertEqual(r4.body, '') self.assertEqual(r4.body, b'')
self.assertEqual(r4.meta, {}) self.assertEqual(r4.meta, {})
assert r4.dont_filter is False assert r4.dont_filter is False
@ -184,39 +188,41 @@ class FormRequestTest(RequestTest):
request_class = FormRequest request_class = FormRequest
def assertSortedEqual(self, first, second, msg=None): def assertQueryEqual(self, first, second, msg=None):
first = to_native_str(first).split("&")
second = to_native_str(second).split("&")
return self.assertEqual(sorted(first), sorted(second), msg) return self.assertEqual(sorted(first), sorted(second), msg)
def test_empty_formdata(self): def test_empty_formdata(self):
r1 = self.request_class("http://www.example.com", formdata={}) r1 = self.request_class("http://www.example.com", formdata={})
self.assertEqual(r1.body, '') self.assertEqual(r1.body, b'')
@unittest.skipUnless(six.PY2, "TODO")
def test_default_encoding(self): def test_default_encoding(self):
# using default encoding (utf-8) # using default encoding (utf-8)
data = {'one': 'two', 'price': '\xc2\xa3 100'} data = {'one': 'two', 'price': '\xc2\xa3 100'}
r2 = self.request_class("http://www.example.com", formdata=data) r2 = self.request_class("http://www.example.com", formdata=data)
self.assertEqual(r2.method, 'POST') self.assertEqual(r2.method, 'POST')
self.assertEqual(r2.encoding, 'utf-8') self.assertEqual(r2.encoding, 'utf-8')
self.assertSortedEqual(r2.body.split('&'), self.assertQueryEqual(r2.body, b'price=%C2%A3+100&one=two')
'price=%C2%A3+100&one=two'.split('&')) self.assertEqual(r2.headers[b'Content-Type'], b'application/x-www-form-urlencoded')
self.assertEqual(r2.headers['Content-Type'], 'application/x-www-form-urlencoded')
def test_custom_encoding(self): def test_custom_encoding(self):
data = {'price': u'\xa3 100'} data = {'price': u'\xa3 100'}
r3 = self.request_class("http://www.example.com", formdata=data, encoding='latin1') r3 = self.request_class("http://www.example.com", formdata=data, encoding='latin1')
self.assertEqual(r3.encoding, 'latin1') self.assertEqual(r3.encoding, 'latin1')
self.assertEqual(r3.body, 'price=%A3+100') self.assertEqual(r3.body, b'price=%A3+100')
def test_multi_key_values(self): def test_multi_key_values(self):
# using multiples values for a single key # using multiples values for a single key
data = {'price': u'\xa3 100', 'colours': ['red', 'blue', 'green']} data = {'price': u'\xa3 100', 'colours': ['red', 'blue', 'green']}
r3 = self.request_class("http://www.example.com", formdata=data) r3 = self.request_class("http://www.example.com", formdata=data)
self.assertSortedEqual(r3.body.split('&'), self.assertQueryEqual(r3.body,
'colours=red&colours=blue&colours=green&price=%C2%A3+100'.split('&')) b'colours=red&colours=blue&colours=green&price=%C2%A3+100')
def test_from_response_post(self): def test_from_response_post(self):
response = _buildresponse( response = _buildresponse(
"""<form action="post.php" method="POST"> b"""<form action="post.php" method="POST">
<input type="hidden" name="test" value="val1"> <input type="hidden" name="test" value="val1">
<input type="hidden" name="test" value="val2"> <input type="hidden" name="test" value="val2">
<input type="hidden" name="test2" value="xxx"> <input type="hidden" name="test2" value="xxx">
@ -225,13 +231,13 @@ class FormRequestTest(RequestTest):
req = self.request_class.from_response(response, req = self.request_class.from_response(response,
formdata={'one': ['two', 'three'], 'six': 'seven'}) formdata={'one': ['two', 'three'], 'six': 'seven'})
self.assertEqual(req.method, 'POST') self.assertEqual(req.method, 'POST')
self.assertEqual(req.headers['Content-type'], 'application/x-www-form-urlencoded') self.assertEqual(req.headers[b'Content-type'], b'application/x-www-form-urlencoded')
self.assertEqual(req.url, "http://www.example.com/this/post.php") self.assertEqual(req.url, "http://www.example.com/this/post.php")
fs = _qs(req) fs = _qs(req)
self.assertEqual(set(fs["test"]), set(["val1", "val2"])) self.assertEqual(set(fs[b"test"]), {b"val1", b"val2"})
self.assertEqual(set(fs["one"]), set(["two", "three"])) self.assertEqual(set(fs[b"one"]), {b"two", b"three"})
self.assertEqual(fs['test2'], ['xxx']) self.assertEqual(fs[b'test2'], [b'xxx'])
self.assertEqual(fs['six'], ['seven']) self.assertEqual(fs[b'six'], [b'seven'])
def test_from_response_extra_headers(self): def test_from_response_extra_headers(self):
response = _buildresponse( response = _buildresponse(
@ -244,8 +250,8 @@ class FormRequestTest(RequestTest):
formdata={'one': ['two', 'three'], 'six': 'seven'}, formdata={'one': ['two', 'three'], 'six': 'seven'},
headers={"Accept-Encoding": "gzip,deflate"}) headers={"Accept-Encoding": "gzip,deflate"})
self.assertEqual(req.method, 'POST') self.assertEqual(req.method, 'POST')
self.assertEqual(req.headers['Content-type'], 'application/x-www-form-urlencoded') self.assertEqual(req.headers['Content-type'], b'application/x-www-form-urlencoded')
self.assertEqual(req.headers['Accept-Encoding'], 'gzip,deflate') self.assertEqual(req.headers['Accept-Encoding'], b'gzip,deflate')
def test_from_response_get(self): def test_from_response_get(self):
response = _buildresponse( response = _buildresponse(
@ -274,8 +280,8 @@ class FormRequestTest(RequestTest):
</form>""") </form>""")
req = self.request_class.from_response(response, formdata={'two': '2'}) req = self.request_class.from_response(response, formdata={'two': '2'})
fs = _qs(req) fs = _qs(req)
self.assertEqual(fs['one'], ['1']) self.assertEqual(fs[b'one'], [b'1'])
self.assertEqual(fs['two'], ['2']) self.assertEqual(fs[b'two'], [b'2'])
def test_from_response_override_method(self): def test_from_response_override_method(self):
response = _buildresponse( response = _buildresponse(
@ -379,7 +385,7 @@ class FormRequestTest(RequestTest):
req = self.request_class.from_response(response, \ req = self.request_class.from_response(response, \
clickdata={'name': u'price in \u00a3'}) clickdata={'name': u'price in \u00a3'})
fs = _qs(req) fs = _qs(req)
self.assertTrue(fs[u'price in \u00a3'.encode('utf-8')]) self.assertTrue(fs[to_native_str(u'price in \u00a3')])
def test_from_response_multiple_forms_clickdata(self): def test_from_response_multiple_forms_clickdata(self):
response = _buildresponse( response = _buildresponse(
@ -489,9 +495,9 @@ class FormRequestTest(RequestTest):
</form>""") </form>""")
r1 = self.request_class.from_response(response, formdata={'two':'3'}) r1 = self.request_class.from_response(response, formdata={'two':'3'})
self.assertEqual(r1.method, 'POST') self.assertEqual(r1.method, 'POST')
self.assertEqual(r1.headers['Content-type'], 'application/x-www-form-urlencoded') self.assertEqual(r1.headers['Content-type'], b'application/x-www-form-urlencoded')
fs = _qs(r1) fs = _qs(r1)
self.assertEqual(fs, {'one': ['1'], 'two': ['3']}) self.assertEqual(fs, {b'one': [b'1'], b'two': [b'3']})
def test_from_response_formname_exists(self): def test_from_response_formname_exists(self):
response = _buildresponse( response = _buildresponse(
@ -506,7 +512,7 @@ class FormRequestTest(RequestTest):
r1 = self.request_class.from_response(response, formname="form2") r1 = self.request_class.from_response(response, formname="form2")
self.assertEqual(r1.method, 'POST') self.assertEqual(r1.method, 'POST')
fs = _qs(r1) fs = _qs(r1)
self.assertEqual(fs, {'four': ['4'], 'three': ['3']}) self.assertEqual(fs, {b'four': [b'4'], b'three': [b'3']})
def test_from_response_formname_notexist(self): def test_from_response_formname_notexist(self):
response = _buildresponse( response = _buildresponse(
@ -519,7 +525,7 @@ class FormRequestTest(RequestTest):
r1 = self.request_class.from_response(response, formname="form3") r1 = self.request_class.from_response(response, formname="form3")
self.assertEqual(r1.method, 'POST') self.assertEqual(r1.method, 'POST')
fs = _qs(r1) fs = _qs(r1)
self.assertEqual(fs, {'one': ['1']}) self.assertEqual(fs, {b'one': [b'1']})
def test_from_response_formname_errors_formnumber(self): def test_from_response_formname_errors_formnumber(self):
response = _buildresponse( response = _buildresponse(
@ -664,11 +670,11 @@ class FormRequestTest(RequestTest):
</form>""") </form>""")
r1 = self.request_class.from_response(response, formxpath="//form[@action='post.php']") r1 = self.request_class.from_response(response, formxpath="//form[@action='post.php']")
fs = _qs(r1) fs = _qs(r1)
self.assertEqual(fs['one'], ['1']) self.assertEqual(fs[b'one'], [b'1'])
r1 = self.request_class.from_response(response, formxpath="//form/input[@name='four']") r1 = self.request_class.from_response(response, formxpath="//form/input[@name='four']")
fs = _qs(r1) fs = _qs(r1)
self.assertEqual(fs['three'], ['3']) self.assertEqual(fs[b'three'], [b'3'])
self.assertRaises(ValueError, self.request_class.from_response, self.assertRaises(ValueError, self.request_class.from_response,
response, formxpath="//form/input[@name='abc']") response, formxpath="//form/input[@name='abc']")
@ -691,12 +697,12 @@ class XmlRpcRequestTest(RequestTest):
request_class = XmlRpcRequest request_class = XmlRpcRequest
default_method = 'POST' default_method = 'POST'
default_headers = {'Content-Type': ['text/xml']} default_headers = {b'Content-Type': [b'text/xml']}
def _test_request(self, **kwargs): def _test_request(self, **kwargs):
r = self.request_class('http://scrapytest.org/rpc2', **kwargs) r = self.request_class('http://scrapytest.org/rpc2', **kwargs)
self.assertEqual(r.headers['Content-Type'], 'text/xml') self.assertEqual(r.headers[b'Content-Type'], b'text/xml')
self.assertEqual(r.body, xmlrpclib.dumps(**kwargs)) self.assertEqual(r.body, to_bytes(xmlrpclib.dumps(**kwargs)))
self.assertEqual(r.method, 'POST') self.assertEqual(r.method, 'POST')
self.assertEqual(r.encoding, kwargs.get('encoding', 'utf-8')) self.assertEqual(r.encoding, kwargs.get('encoding', 'utf-8'))
self.assertTrue(r.dont_filter, True) self.assertTrue(r.dont_filter, True)
@ -706,11 +712,14 @@ class XmlRpcRequestTest(RequestTest):
self._test_request(params=('username', 'password'), methodname='login') self._test_request(params=('username', 'password'), methodname='login')
self._test_request(params=('response', ), methodresponse='login') self._test_request(params=('response', ), methodresponse='login')
self._test_request(params=(u'pas\xa3',), encoding='utf-8') self._test_request(params=(u'pas\xa3',), encoding='utf-8')
self._test_request(params=(u'pas\xa3',), encoding='latin')
self._test_request(params=(None,), allow_none=1) self._test_request(params=(None,), allow_none=1)
self.assertRaises(TypeError, self._test_request) self.assertRaises(TypeError, self._test_request)
self.assertRaises(TypeError, self._test_request, params=(None,)) self.assertRaises(TypeError, self._test_request, params=(None,))
@unittest.skipUnless(six.PY2, "TODO")
def test_latin1(self):
self._test_request(params=(u'pas\xa3',), encoding='latin')
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -1,8 +1,12 @@
import unittest import unittest
import six
from w3lib.encoding import resolve_encoding from w3lib.encoding import resolve_encoding
from scrapy.http import Request, Response, TextResponse, HtmlResponse, XmlResponse, Headers
from scrapy.http import (Request, Response, TextResponse, HtmlResponse,
XmlResponse, Headers)
from scrapy.selector import Selector from scrapy.selector import Selector
from scrapy.utils.python import to_native_str
class BaseResponseTest(unittest.TestCase): class BaseResponseTest(unittest.TestCase):
@ -14,10 +18,10 @@ class BaseResponseTest(unittest.TestCase):
self.assertRaises(Exception, self.response_class) self.assertRaises(Exception, self.response_class)
self.assertTrue(isinstance(self.response_class('http://example.com/'), self.response_class)) self.assertTrue(isinstance(self.response_class('http://example.com/'), self.response_class))
# body can be str or None # body can be str or None
self.assertTrue(isinstance(self.response_class('http://example.com/', body=''), self.response_class)) self.assertTrue(isinstance(self.response_class('http://example.com/', body=b''), self.response_class))
self.assertTrue(isinstance(self.response_class('http://example.com/', body='body'), self.response_class)) self.assertTrue(isinstance(self.response_class('http://example.com/', body=b'body'), self.response_class))
# test presence of all optional parameters # test presence of all optional parameters
self.assertTrue(isinstance(self.response_class('http://example.com/', headers={}, status=200, body=''), self.response_class)) self.assertTrue(isinstance(self.response_class('http://example.com/', body=b'', headers={}, status=200), self.response_class))
r = self.response_class("http://www.example.com") r = self.response_class("http://www.example.com")
assert isinstance(r.url, str) assert isinstance(r.url, str)
@ -27,12 +31,12 @@ class BaseResponseTest(unittest.TestCase):
assert isinstance(r.headers, Headers) assert isinstance(r.headers, Headers)
self.assertEqual(r.headers, {}) self.assertEqual(r.headers, {})
headers = {"caca": "coco"} headers = {"foo": "bar"}
body = "a body" body = b"a body"
r = self.response_class("http://www.example.com", headers=headers, body=body) r = self.response_class("http://www.example.com", headers=headers, body=body)
assert r.headers is not headers assert r.headers is not headers
self.assertEqual(r.headers["caca"], "coco") self.assertEqual(r.headers[b"foo"], b"bar")
r = self.response_class("http://www.example.com", status=301) r = self.response_class("http://www.example.com", status=301)
self.assertEqual(r.status, 301) self.assertEqual(r.status, 301)
@ -43,7 +47,7 @@ class BaseResponseTest(unittest.TestCase):
def test_copy(self): def test_copy(self):
"""Test Response copy""" """Test Response copy"""
r1 = self.response_class("http://www.example.com", body="Some body") r1 = self.response_class("http://www.example.com", body=b"Some body")
r1.flags.append('cached') r1.flags.append('cached')
r2 = r1.copy() r2 = r1.copy()
@ -61,7 +65,7 @@ class BaseResponseTest(unittest.TestCase):
def test_copy_meta(self): def test_copy_meta(self):
req = Request("http://www.example.com") req = Request("http://www.example.com")
req.meta['foo'] = 'bar' req.meta['foo'] = 'bar'
r1 = self.response_class("http://www.example.com", body="Some body", request=req) r1 = self.response_class("http://www.example.com", body=b"Some body", request=req)
assert r1.meta is req.meta assert r1.meta is req.meta
def test_copy_inherited_classes(self): def test_copy_inherited_classes(self):
@ -79,30 +83,30 @@ class BaseResponseTest(unittest.TestCase):
"""Test Response.replace() method""" """Test Response.replace() method"""
hdrs = Headers({"key": "value"}) hdrs = Headers({"key": "value"})
r1 = self.response_class("http://www.example.com") r1 = self.response_class("http://www.example.com")
r2 = r1.replace(status=301, body="New body", headers=hdrs) r2 = r1.replace(status=301, body=b"New body", headers=hdrs)
assert r1.body == '' assert r1.body == b''
self.assertEqual(r1.url, r2.url) self.assertEqual(r1.url, r2.url)
self.assertEqual((r1.status, r2.status), (200, 301)) self.assertEqual((r1.status, r2.status), (200, 301))
self.assertEqual((r1.body, r2.body), ('', "New body")) self.assertEqual((r1.body, r2.body), (b'', b"New body"))
self.assertEqual((r1.headers, r2.headers), ({}, hdrs)) self.assertEqual((r1.headers, r2.headers), ({}, hdrs))
# Empty attributes (which may fail if not compared properly) # Empty attributes (which may fail if not compared properly)
r3 = self.response_class("http://www.example.com", flags=['cached']) r3 = self.response_class("http://www.example.com", flags=['cached'])
r4 = r3.replace(body='', flags=[]) r4 = r3.replace(body=b'', flags=[])
self.assertEqual(r4.body, '') self.assertEqual(r4.body, b'')
self.assertEqual(r4.flags, []) self.assertEqual(r4.flags, [])
def _assert_response_values(self, response, encoding, body): def _assert_response_values(self, response, encoding, body):
if isinstance(body, unicode): if isinstance(body, six.text_type):
body_unicode = body body_unicode = body
body_str = body.encode(encoding) body_bytes = body.encode(encoding)
else: else:
body_unicode = body.decode(encoding) body_unicode = body.decode(encoding)
body_str = body body_bytes = body
assert isinstance(response.body, str) assert isinstance(response.body, bytes)
self._assert_response_encoding(response, encoding) self._assert_response_encoding(response, encoding)
self.assertEqual(response.body, body_str) self.assertEqual(response.body, body_bytes)
self.assertEqual(response.body_as_unicode(), body_unicode) self.assertEqual(response.body_as_unicode(), body_unicode)
def _assert_response_encoding(self, response, encoding): def _assert_response_encoding(self, response, encoding):
@ -120,12 +124,6 @@ class BaseResponseTest(unittest.TestCase):
self.assertEqual(joined, absolute) self.assertEqual(joined, absolute)
class ResponseText(BaseResponseTest):
def test_no_unicode_url(self):
self.assertRaises(TypeError, self.response_class, u'http://www.example.com')
class TextResponseTest(BaseResponseTest): class TextResponseTest(BaseResponseTest):
response_class = TextResponse response_class = TextResponse
@ -152,11 +150,11 @@ class TextResponseTest(BaseResponseTest):
assert isinstance(resp.url, str) assert isinstance(resp.url, str)
resp = self.response_class(url=u"http://www.example.com/price/\xa3", encoding='utf-8') resp = self.response_class(url=u"http://www.example.com/price/\xa3", encoding='utf-8')
self.assertEqual(resp.url, 'http://www.example.com/price/\xc2\xa3') self.assertEqual(resp.url, to_native_str(b'http://www.example.com/price/\xc2\xa3'))
resp = self.response_class(url=u"http://www.example.com/price/\xa3", encoding='latin-1') resp = self.response_class(url=u"http://www.example.com/price/\xa3", encoding='latin-1')
self.assertEqual(resp.url, 'http://www.example.com/price/\xa3') self.assertEqual(resp.url, 'http://www.example.com/price/\xa3')
resp = self.response_class(u"http://www.example.com/price/\xa3", headers={"Content-type": ["text/html; charset=utf-8"]}) resp = self.response_class(u"http://www.example.com/price/\xa3", headers={"Content-type": ["text/html; charset=utf-8"]})
self.assertEqual(resp.url, 'http://www.example.com/price/\xc2\xa3') self.assertEqual(resp.url, to_native_str(b'http://www.example.com/price/\xc2\xa3'))
resp = self.response_class(u"http://www.example.com/price/\xa3", headers={"Content-type": ["text/html; charset=iso-8859-1"]}) resp = self.response_class(u"http://www.example.com/price/\xa3", headers={"Content-type": ["text/html; charset=iso-8859-1"]})
self.assertEqual(resp.url, 'http://www.example.com/price/\xa3') self.assertEqual(resp.url, 'http://www.example.com/price/\xa3')
@ -168,17 +166,17 @@ class TextResponseTest(BaseResponseTest):
r1 = self.response_class('http://www.example.com', body=original_string, encoding='cp1251') r1 = self.response_class('http://www.example.com', body=original_string, encoding='cp1251')
# check body_as_unicode # check body_as_unicode
self.assertTrue(isinstance(r1.body_as_unicode(), unicode)) self.assertTrue(isinstance(r1.body_as_unicode(), six.text_type))
self.assertEqual(r1.body_as_unicode(), unicode_string) self.assertEqual(r1.body_as_unicode(), unicode_string)
def test_encoding(self): def test_encoding(self):
r1 = self.response_class("http://www.example.com", headers={"Content-type": ["text/html; charset=utf-8"]}, body="\xc2\xa3") r1 = self.response_class("http://www.example.com", headers={"Content-type": ["text/html; charset=utf-8"]}, body=b"\xc2\xa3")
r2 = self.response_class("http://www.example.com", encoding='utf-8', body=u"\xa3") r2 = self.response_class("http://www.example.com", encoding='utf-8', body=u"\xa3")
r3 = self.response_class("http://www.example.com", headers={"Content-type": ["text/html; charset=iso-8859-1"]}, body="\xa3") r3 = self.response_class("http://www.example.com", headers={"Content-type": ["text/html; charset=iso-8859-1"]}, body=b"\xa3")
r4 = self.response_class("http://www.example.com", body="\xa2\xa3") r4 = self.response_class("http://www.example.com", body=b"\xa2\xa3")
r5 = self.response_class("http://www.example.com", headers={"Content-type": ["text/html; charset=None"]}, body="\xc2\xa3") r5 = self.response_class("http://www.example.com", headers={"Content-type": ["text/html; charset=None"]}, body=b"\xc2\xa3")
r6 = self.response_class("http://www.example.com", headers={"Content-type": ["text/html; charset=gb2312"]}, body="\xa8D") r6 = self.response_class("http://www.example.com", headers={"Content-type": ["text/html; charset=gb2312"]}, body=b"\xa8D")
r7 = self.response_class("http://www.example.com", headers={"Content-type": ["text/html; charset=gbk"]}, body="\xa8D") r7 = self.response_class("http://www.example.com", headers={"Content-type": ["text/html; charset=gbk"]}, body=b"\xa8D")
self.assertEqual(r1._headers_encoding(), "utf-8") self.assertEqual(r1._headers_encoding(), "utf-8")
self.assertEqual(r2._headers_encoding(), None) self.assertEqual(r2._headers_encoding(), None)
@ -203,21 +201,21 @@ class TextResponseTest(BaseResponseTest):
"""Check that unknown declared encodings are ignored""" """Check that unknown declared encodings are ignored"""
r = self.response_class("http://www.example.com", r = self.response_class("http://www.example.com",
headers={"Content-type": ["text/html; charset=UKNOWN"]}, headers={"Content-type": ["text/html; charset=UKNOWN"]},
body="\xc2\xa3") body=b"\xc2\xa3")
self.assertEqual(r._declared_encoding(), None) self.assertEqual(r._declared_encoding(), None)
self._assert_response_values(r, 'utf-8', u"\xa3") self._assert_response_values(r, 'utf-8', u"\xa3")
def test_utf16(self): def test_utf16(self):
"""Test utf-16 because UnicodeDammit is known to have problems with""" """Test utf-16 because UnicodeDammit is known to have problems with"""
r = self.response_class("http://www.example.com", r = self.response_class("http://www.example.com",
body='\xff\xfeh\x00i\x00', body=b'\xff\xfeh\x00i\x00',
encoding='utf-16') encoding='utf-16')
self._assert_response_values(r, 'utf-16', u"hi") self._assert_response_values(r, 'utf-16', u"hi")
def test_invalid_utf8_encoded_body_with_valid_utf8_BOM(self): def test_invalid_utf8_encoded_body_with_valid_utf8_BOM(self):
r6 = self.response_class("http://www.example.com", r6 = self.response_class("http://www.example.com",
headers={"Content-type": ["text/html; charset=utf-8"]}, headers={"Content-type": ["text/html; charset=utf-8"]},
body="\xef\xbb\xbfWORD\xe3\xab") body=b"\xef\xbb\xbfWORD\xe3\xab")
self.assertEqual(r6.encoding, 'utf-8') self.assertEqual(r6.encoding, 'utf-8')
self.assertEqual(r6.body_as_unicode(), u'WORD\ufffd\ufffd') self.assertEqual(r6.body_as_unicode(), u'WORD\ufffd\ufffd')
@ -227,7 +225,7 @@ class TextResponseTest(BaseResponseTest):
# response.body_as_unicode() in indistint order doesn't affect final # response.body_as_unicode() in indistint order doesn't affect final
# values for encoding and decoded body. # values for encoding and decoded body.
url = 'http://example.com' url = 'http://example.com'
body = "\xef\xbb\xbfWORD" body = b"\xef\xbb\xbfWORD"
headers = {"Content-type": ["text/html; charset=utf-8"]} headers = {"Content-type": ["text/html; charset=utf-8"]}
# Test response without content-type and BOM encoding # Test response without content-type and BOM encoding
@ -250,7 +248,7 @@ class TextResponseTest(BaseResponseTest):
def test_replace_wrong_encoding(self): def test_replace_wrong_encoding(self):
"""Test invalid chars are replaced properly""" """Test invalid chars are replaced properly"""
r = self.response_class("http://www.example.com", encoding='utf-8', body='PREFIX\xe3\xabSUFFIX') r = self.response_class("http://www.example.com", encoding='utf-8', body=b'PREFIX\xe3\xabSUFFIX')
# XXX: Policy for replacing invalid chars may suffer minor variations # XXX: Policy for replacing invalid chars may suffer minor variations
# but it should always contain the unicode replacement char (u'\ufffd') # but it should always contain the unicode replacement char (u'\ufffd')
assert u'\ufffd' in r.body_as_unicode(), repr(r.body_as_unicode()) assert u'\ufffd' in r.body_as_unicode(), repr(r.body_as_unicode())
@ -259,7 +257,7 @@ class TextResponseTest(BaseResponseTest):
# Do not destroy html tags due to encoding bugs # Do not destroy html tags due to encoding bugs
r = self.response_class("http://example.com", encoding='utf-8', \ r = self.response_class("http://example.com", encoding='utf-8', \
body='\xf0<span>value</span>') body=b'\xf0<span>value</span>')
assert u'<span>value</span>' in r.body_as_unicode(), repr(r.body_as_unicode()) assert u'<span>value</span>' in r.body_as_unicode(), repr(r.body_as_unicode())
# FIXME: This test should pass once we stop using BeautifulSoup's UnicodeDammit in TextResponse # FIXME: This test should pass once we stop using BeautifulSoup's UnicodeDammit in TextResponse
@ -267,7 +265,7 @@ class TextResponseTest(BaseResponseTest):
#assert u'\ufffd' in r.body_as_unicode(), repr(r.body_as_unicode()) #assert u'\ufffd' in r.body_as_unicode(), repr(r.body_as_unicode())
def test_selector(self): def test_selector(self):
body = "<html><head><title>Some page</title><body></body></html>" body = b"<html><head><title>Some page</title><body></body></html>"
response = self.response_class("http://www.example.com", body=body) response = self.response_class("http://www.example.com", body=body)
self.assertIsInstance(response.selector, Selector) self.assertIsInstance(response.selector, Selector)
@ -289,7 +287,7 @@ class TextResponseTest(BaseResponseTest):
) )
def test_selector_shortcuts(self): def test_selector_shortcuts(self):
body = "<html><head><title>Some page</title><body></body></html>" body = b"<html><head><title>Some page</title><body></body></html>"
response = self.response_class("http://www.example.com", body=body) response = self.response_class("http://www.example.com", body=body)
self.assertEqual( self.assertEqual(
@ -303,17 +301,17 @@ class TextResponseTest(BaseResponseTest):
def test_urljoin_with_base_url(self): def test_urljoin_with_base_url(self):
"""Test urljoin shortcut which also evaluates base-url through get_base_url().""" """Test urljoin shortcut which also evaluates base-url through get_base_url()."""
body = '<html><body><base href="https://example.net"></body></html>' body = b'<html><body><base href="https://example.net"></body></html>'
joined = self.response_class('http://www.example.com', body=body).urljoin('/test') joined = self.response_class('http://www.example.com', body=body).urljoin('/test')
absolute = 'https://example.net/test' absolute = 'https://example.net/test'
self.assertEqual(joined, absolute) self.assertEqual(joined, absolute)
body = '<html><body><base href="/elsewhere"></body></html>' body = b'<html><body><base href="/elsewhere"></body></html>'
joined = self.response_class('http://www.example.com', body=body).urljoin('test') joined = self.response_class('http://www.example.com', body=body).urljoin('test')
absolute = 'http://www.example.com/test' absolute = 'http://www.example.com/test'
self.assertEqual(joined, absolute) self.assertEqual(joined, absolute)
body = '<html><body><base href="/elsewhere/"></body></html>' body = b'<html><body><base href="/elsewhere/"></body></html>'
joined = self.response_class('http://www.example.com', body=body).urljoin('test') joined = self.response_class('http://www.example.com', body=body).urljoin('test')
absolute = 'http://www.example.com/elsewhere/test' absolute = 'http://www.example.com/elsewhere/test'
self.assertEqual(joined, absolute) self.assertEqual(joined, absolute)
@ -325,13 +323,13 @@ class HtmlResponseTest(TextResponseTest):
def test_html_encoding(self): def test_html_encoding(self):
body = """<html><head><title>Some page</title><meta http-equiv="Content-Type" content="text/html; charset=iso-8859-1"> body = b"""<html><head><title>Some page</title><meta http-equiv="Content-Type" content="text/html; charset=iso-8859-1">
</head><body>Price: \xa3100</body></html>' </head><body>Price: \xa3100</body></html>'
""" """
r1 = self.response_class("http://www.example.com", body=body) r1 = self.response_class("http://www.example.com", body=body)
self._assert_response_values(r1, 'iso-8859-1', body) self._assert_response_values(r1, 'iso-8859-1', body)
body = """<?xml version="1.0" encoding="iso-8859-1"?> body = b"""<?xml version="1.0" encoding="iso-8859-1"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd"> <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
Price: \xa3100 Price: \xa3100
""" """
@ -339,19 +337,19 @@ class HtmlResponseTest(TextResponseTest):
self._assert_response_values(r2, 'iso-8859-1', body) self._assert_response_values(r2, 'iso-8859-1', body)
# for conflicting declarations headers must take precedence # for conflicting declarations headers must take precedence
body = """<html><head><title>Some page</title><meta http-equiv="Content-Type" content="text/html; charset=utf-8"> body = b"""<html><head><title>Some page</title><meta http-equiv="Content-Type" content="text/html; charset=utf-8">
</head><body>Price: \xa3100</body></html>' </head><body>Price: \xa3100</body></html>'
""" """
r3 = self.response_class("http://www.example.com", headers={"Content-type": ["text/html; charset=iso-8859-1"]}, body=body) r3 = self.response_class("http://www.example.com", headers={"Content-type": ["text/html; charset=iso-8859-1"]}, body=body)
self._assert_response_values(r3, 'iso-8859-1', body) self._assert_response_values(r3, 'iso-8859-1', body)
# make sure replace() preserves the encoding of the original response # make sure replace() preserves the encoding of the original response
body = "New body \xa3" body = b"New body \xa3"
r4 = r3.replace(body=body) r4 = r3.replace(body=body)
self._assert_response_values(r4, 'iso-8859-1', body) self._assert_response_values(r4, 'iso-8859-1', body)
def test_html5_meta_charset(self): def test_html5_meta_charset(self):
body = """<html><head><meta charset="gb2312" /><title>Some page</title><body>bla bla</body>""" body = b"""<html><head><meta charset="gb2312" /><title>Some page</title><body>bla bla</body>"""
r1 = self.response_class("http://www.example.com", body=body) r1 = self.response_class("http://www.example.com", body=body)
self._assert_response_values(r1, 'gb2312', body) self._assert_response_values(r1, 'gb2312', body)
@ -361,26 +359,25 @@ class XmlResponseTest(TextResponseTest):
response_class = XmlResponse response_class = XmlResponse
def test_xml_encoding(self): def test_xml_encoding(self):
body = b"<xml></xml>"
body = "<xml></xml>"
r1 = self.response_class("http://www.example.com", body=body) r1 = self.response_class("http://www.example.com", body=body)
self._assert_response_values(r1, self.response_class._DEFAULT_ENCODING, body) self._assert_response_values(r1, self.response_class._DEFAULT_ENCODING, body)
body = """<?xml version="1.0" encoding="iso-8859-1"?><xml></xml>""" body = b"""<?xml version="1.0" encoding="iso-8859-1"?><xml></xml>"""
r2 = self.response_class("http://www.example.com", body=body) r2 = self.response_class("http://www.example.com", body=body)
self._assert_response_values(r2, 'iso-8859-1', body) self._assert_response_values(r2, 'iso-8859-1', body)
# make sure replace() preserves the explicit encoding passed in the constructor # make sure replace() preserves the explicit encoding passed in the constructor
body = """<?xml version="1.0" encoding="iso-8859-1"?><xml></xml>""" body = b"""<?xml version="1.0" encoding="iso-8859-1"?><xml></xml>"""
r3 = self.response_class("http://www.example.com", body=body, encoding='utf-8') r3 = self.response_class("http://www.example.com", body=body, encoding='utf-8')
body2 = "New body" body2 = b"New body"
r4 = r3.replace(body=body2) r4 = r3.replace(body=body2)
self._assert_response_values(r4, 'utf-8', body2) self._assert_response_values(r4, 'utf-8', body2)
def test_replace_encoding(self): def test_replace_encoding(self):
# make sure replace() keeps the previous encoding unless overridden explicitly # make sure replace() keeps the previous encoding unless overridden explicitly
body = """<?xml version="1.0" encoding="iso-8859-1"?><xml></xml>""" body = b"""<?xml version="1.0" encoding="iso-8859-1"?><xml></xml>"""
body2 = """<?xml version="1.0" encoding="utf-8"?><xml></xml>""" body2 = b"""<?xml version="1.0" encoding="utf-8"?><xml></xml>"""
r5 = self.response_class("http://www.example.com", body=body) r5 = self.response_class("http://www.example.com", body=body)
r6 = r5.replace(body=body2) r6 = r5.replace(body=body2)
r7 = r5.replace(body=body2, encoding='utf-8') r7 = r5.replace(body=body2, encoding='utf-8')
@ -389,7 +386,7 @@ class XmlResponseTest(TextResponseTest):
self._assert_response_values(r7, 'utf-8', body2) self._assert_response_values(r7, 'utf-8', body2)
def test_selector(self): def test_selector(self):
body = '<?xml version="1.0" encoding="utf-8"?><xml><elem>value</elem></xml>' body = b'<?xml version="1.0" encoding="utf-8"?><xml><elem>value</elem></xml>'
response = self.response_class("http://www.example.com", body=body) response = self.response_class("http://www.example.com", body=body)
self.assertIsInstance(response.selector, Selector) self.assertIsInstance(response.selector, Selector)
@ -403,15 +400,10 @@ class XmlResponseTest(TextResponseTest):
) )
def test_selector_shortcuts(self): def test_selector_shortcuts(self):
body = '<?xml version="1.0" encoding="utf-8"?><xml><elem>value</elem></xml>' body = b'<?xml version="1.0" encoding="utf-8"?><xml><elem>value</elem></xml>'
response = self.response_class("http://www.example.com", body=body) response = self.response_class("http://www.example.com", body=body)
self.assertEqual( self.assertEqual(
response.xpath("//elem/text()").extract(), response.xpath("//elem/text()").extract(),
response.selector.xpath("//elem/text()").extract(), response.selector.xpath("//elem/text()").extract(),
) )
if __name__ == "__main__":
unittest.main()

View File

@ -3,6 +3,7 @@ from twisted.trial import unittest
from scrapy.settings import Settings from scrapy.settings import Settings
from scrapy.exceptions import NotConfigured from scrapy.exceptions import NotConfigured
from scrapy.middleware import MiddlewareManager from scrapy.middleware import MiddlewareManager
import six
class M1(object): class M1(object):
@ -65,12 +66,20 @@ class MiddlewareManagerTest(unittest.TestCase):
def test_methods(self): def test_methods(self):
mwman = TestMiddlewareManager(M1(), M2(), M3()) mwman = TestMiddlewareManager(M1(), M2(), M3())
self.assertEqual([x.im_class for x in mwman.methods['open_spider']], if six.PY2:
[M1, M2]) self.assertEqual([x.im_class for x in mwman.methods['open_spider']],
self.assertEqual([x.im_class for x in mwman.methods['close_spider']], [M1, M2])
[M2, M1]) self.assertEqual([x.im_class for x in mwman.methods['close_spider']],
self.assertEqual([x.im_class for x in mwman.methods['process']], [M2, M1])
[M1, M3]) self.assertEqual([x.im_class for x in mwman.methods['process']],
[M1, M3])
else:
self.assertEqual([x.__self__.__class__ for x in mwman.methods['open_spider']],
[M1, M2])
self.assertEqual([x.__self__.__class__ for x in mwman.methods['close_spider']],
[M2, M1])
self.assertEqual([x.__self__.__class__ for x in mwman.methods['process']],
[M1, M3])
def test_enabled(self): def test_enabled(self):
m1, m2, m3 = M1(), M2(), M3() m1, m2, m3 = M1(), M2(), M3()

View File

@ -192,7 +192,7 @@ def _create_item_with_files(*files):
def _prepare_request_object(item_url): def _prepare_request_object(item_url):
return Request( return Request(
item_url, item_url,
meta={'response': Response(item_url, status=200, body='data')}) meta={'response': Response(item_url, status=200, body=b'data')})
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -44,7 +44,7 @@ class BaseMediaPipelineTestCase(unittest.TestCase):
def test_default_media_downloaded(self): def test_default_media_downloaded(self):
request = Request('http://url') request = Request('http://url')
response = Response('http://url', body='') response = Response('http://url', body=b'')
assert self.pipe.media_downloaded(response, request, self.info) is response assert self.pipe.media_downloaded(response, request, self.info) is response
def test_default_media_failed(self): def test_default_media_failed(self):

View File

@ -51,7 +51,7 @@ class ResponseTypesTest(unittest.TestCase):
for source, cls in mappings: for source, cls in mappings:
retcls = responsetypes.from_body(source) retcls = responsetypes.from_body(source)
assert retcls is cls, "%s ==> %s != %s" % (source, retcls, cls) assert retcls is cls, "%s ==> %s != %s" % (source, retcls, cls)
def test_from_headers(self): def test_from_headers(self):
mappings = [ mappings = [
({'Content-Type': ['text/html; charset=utf-8']}, HtmlResponse), ({'Content-Type': ['text/html; charset=utf-8']}, HtmlResponse),

View File

@ -5,6 +5,8 @@ from twisted.python.failure import Failure
from scrapy.utils.defer import mustbe_deferred, process_chain, \ from scrapy.utils.defer import mustbe_deferred, process_chain, \
process_chain_both, process_parallel, iter_errback process_chain_both, process_parallel, iter_errback
from six.moves import xrange
class MustbeDeferredTest(unittest.TestCase): class MustbeDeferredTest(unittest.TestCase):
def test_success_function(self): def test_success_function(self):
@ -86,7 +88,7 @@ class IterErrbackTest(unittest.TestCase):
errors = [] errors = []
out = list(iter_errback(itergood(), errors.append)) out = list(iter_errback(itergood(), errors.append))
self.assertEqual(out, range(10)) self.assertEqual(out, list(range(10)))
self.failIf(errors) self.failIf(errors)
def test_iter_errback_bad(self): def test_iter_errback_bad(self):

View File

@ -21,15 +21,15 @@ class UtilsRequestTest(unittest.TestCase):
r1 = Request("http://www.example.com/members/offers.html") r1 = Request("http://www.example.com/members/offers.html")
r2 = Request("http://www.example.com/members/offers.html") r2 = Request("http://www.example.com/members/offers.html")
r2.headers['SESSIONID'] = "somehash" r2.headers['SESSIONID'] = b"somehash"
self.assertEqual(request_fingerprint(r1), request_fingerprint(r2)) self.assertEqual(request_fingerprint(r1), request_fingerprint(r2))
r1 = Request("http://www.example.com/") r1 = Request("http://www.example.com/")
r2 = Request("http://www.example.com/") r2 = Request("http://www.example.com/")
r2.headers['Accept-Language'] = 'en' r2.headers['Accept-Language'] = b'en'
r3 = Request("http://www.example.com/") r3 = Request("http://www.example.com/")
r3.headers['Accept-Language'] = 'en' r3.headers['Accept-Language'] = b'en'
r3.headers['SESSIONID'] = "somehash" r3.headers['SESSIONID'] = b"somehash"
self.assertEqual(request_fingerprint(r1), request_fingerprint(r2), request_fingerprint(r3)) self.assertEqual(request_fingerprint(r1), request_fingerprint(r2), request_fingerprint(r3))
@ -44,7 +44,7 @@ class UtilsRequestTest(unittest.TestCase):
r1 = Request("http://www.example.com") r1 = Request("http://www.example.com")
r2 = Request("http://www.example.com", method='POST') r2 = Request("http://www.example.com", method='POST')
r3 = Request("http://www.example.com", method='POST', body='request body') r3 = Request("http://www.example.com", method='POST', body=b'request body')
self.assertNotEqual(request_fingerprint(r1), request_fingerprint(r2)) self.assertNotEqual(request_fingerprint(r1), request_fingerprint(r2))
self.assertNotEqual(request_fingerprint(r2), request_fingerprint(r3)) self.assertNotEqual(request_fingerprint(r2), request_fingerprint(r3))
@ -52,24 +52,24 @@ class UtilsRequestTest(unittest.TestCase):
# cached fingerprint must be cleared on request copy # cached fingerprint must be cleared on request copy
r1 = Request("http://www.example.com") r1 = Request("http://www.example.com")
fp1 = request_fingerprint(r1) fp1 = request_fingerprint(r1)
r2 = r1.replace(url = "http://www.example.com/other") r2 = r1.replace(url="http://www.example.com/other")
fp2 = request_fingerprint(r2) fp2 = request_fingerprint(r2)
self.assertNotEqual(fp1, fp2) self.assertNotEqual(fp1, fp2)
def test_request_authenticate(self): def test_request_authenticate(self):
r = Request("http://www.example.com") r = Request("http://www.example.com")
request_authenticate(r, 'someuser', 'somepass') request_authenticate(r, 'someuser', 'somepass')
self.assertEqual(r.headers['Authorization'], 'Basic c29tZXVzZXI6c29tZXBhc3M=') self.assertEqual(r.headers['Authorization'], b'Basic c29tZXVzZXI6c29tZXBhc3M=')
def test_request_httprepr(self): def test_request_httprepr(self):
r1 = Request("http://www.example.com") r1 = Request("http://www.example.com")
self.assertEqual(request_httprepr(r1), 'GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n') self.assertEqual(request_httprepr(r1), b'GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n')
r1 = Request("http://www.example.com/some/page.html?arg=1") r1 = Request("http://www.example.com/some/page.html?arg=1")
self.assertEqual(request_httprepr(r1), 'GET /some/page.html?arg=1 HTTP/1.1\r\nHost: www.example.com\r\n\r\n') self.assertEqual(request_httprepr(r1), b'GET /some/page.html?arg=1 HTTP/1.1\r\nHost: www.example.com\r\n\r\n')
r1 = Request("http://www.example.com", method='POST', headers={"Content-type": "text/html"}, body="Some body") r1 = Request("http://www.example.com", method='POST', headers={"Content-type": b"text/html"}, body=b"Some body")
self.assertEqual(request_httprepr(r1), 'POST / HTTP/1.1\r\nHost: www.example.com\r\nContent-Type: text/html\r\n\r\nSome body') self.assertEqual(request_httprepr(r1), b'POST / HTTP/1.1\r\nHost: www.example.com\r\nContent-Type: text/html\r\n\r\nSome body')
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -2,8 +2,8 @@ from testfixtures import LogCapture
from twisted.trial import unittest from twisted.trial import unittest
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from pydispatch import dispatcher
from scrapy.xlib.pydispatch import dispatcher
from scrapy.utils.signal import send_catch_log, send_catch_log_deferred from scrapy.utils.signal import send_catch_log, send_catch_log_deferred

View File

@ -1,7 +1,10 @@
# -*- coding: utf-8 -*-
import unittest import unittest
import six
from scrapy.spiders import Spider from scrapy.spiders import Spider
from scrapy.utils.url import url_is_from_any_domain, url_is_from_spider, canonicalize_url from scrapy.utils.url import (url_is_from_any_domain, url_is_from_spider,
canonicalize_url)
__doctests__ = ['scrapy.utils.url'] __doctests__ = ['scrapy.utils.url']
@ -70,18 +73,23 @@ class UrlUtilsTest(unittest.TestCase):
self.assertTrue(url_is_from_spider('http://www.example.net/some/page.html', MySpider)) self.assertTrue(url_is_from_spider('http://www.example.net/some/page.html', MySpider))
self.assertFalse(url_is_from_spider('http://www.example.us/some/page.html', MySpider)) self.assertFalse(url_is_from_spider('http://www.example.us/some/page.html', MySpider))
class CanonicalizeUrlTest(unittest.TestCase):
def test_canonicalize_url(self): def test_canonicalize_url(self):
# simplest case # simplest case
self.assertEqual(canonicalize_url("http://www.example.com/"), self.assertEqual(canonicalize_url("http://www.example.com/"),
"http://www.example.com/") "http://www.example.com/")
# always return a str def test_return_str(self):
assert isinstance(canonicalize_url(u"http://www.example.com"), str) assert isinstance(canonicalize_url(u"http://www.example.com"), str)
assert isinstance(canonicalize_url(b"http://www.example.com"), str)
# append missing path def test_append_missing_path(self):
self.assertEqual(canonicalize_url("http://www.example.com"), self.assertEqual(canonicalize_url("http://www.example.com"),
"http://www.example.com/") "http://www.example.com/")
# typical usage
def test_typical_usage(self):
self.assertEqual(canonicalize_url("http://www.example.com/do?a=1&b=2&c=3"), self.assertEqual(canonicalize_url("http://www.example.com/do?a=1&b=2&c=3"),
"http://www.example.com/do?a=1&b=2&c=3") "http://www.example.com/do?a=1&b=2&c=3")
self.assertEqual(canonicalize_url("http://www.example.com/do?c=1&b=2&a=3"), self.assertEqual(canonicalize_url("http://www.example.com/do?c=1&b=2&a=3"),
@ -89,11 +97,11 @@ class UrlUtilsTest(unittest.TestCase):
self.assertEqual(canonicalize_url("http://www.example.com/do?&a=1"), self.assertEqual(canonicalize_url("http://www.example.com/do?&a=1"),
"http://www.example.com/do?a=1") "http://www.example.com/do?a=1")
# sorting by argument values def test_sorting(self):
self.assertEqual(canonicalize_url("http://www.example.com/do?c=3&b=5&b=2&a=50"), self.assertEqual(canonicalize_url("http://www.example.com/do?c=3&b=5&b=2&a=50"),
"http://www.example.com/do?a=50&b=2&b=5&c=3") "http://www.example.com/do?a=50&b=2&b=5&c=3")
# using keep_blank_values def test_keep_blank_values(self):
self.assertEqual(canonicalize_url("http://www.example.com/do?b=&a=2", keep_blank_values=False), self.assertEqual(canonicalize_url("http://www.example.com/do?b=&a=2", keep_blank_values=False),
"http://www.example.com/do?a=2") "http://www.example.com/do?a=2")
self.assertEqual(canonicalize_url("http://www.example.com/do?b=&a=2"), self.assertEqual(canonicalize_url("http://www.example.com/do?b=&a=2"),
@ -106,7 +114,7 @@ class UrlUtilsTest(unittest.TestCase):
self.assertEqual(canonicalize_url(u'http://www.example.com/do?1750,4'), self.assertEqual(canonicalize_url(u'http://www.example.com/do?1750,4'),
'http://www.example.com/do?1750%2C4=') 'http://www.example.com/do?1750%2C4=')
# spaces def test_spaces(self):
self.assertEqual(canonicalize_url("http://www.example.com/do?q=a space&a=1"), self.assertEqual(canonicalize_url("http://www.example.com/do?q=a space&a=1"),
"http://www.example.com/do?a=1&q=a+space") "http://www.example.com/do?a=1&q=a+space")
self.assertEqual(canonicalize_url("http://www.example.com/do?q=a+space&a=1"), self.assertEqual(canonicalize_url("http://www.example.com/do?q=a+space&a=1"),
@ -114,43 +122,52 @@ class UrlUtilsTest(unittest.TestCase):
self.assertEqual(canonicalize_url("http://www.example.com/do?q=a%20space&a=1"), self.assertEqual(canonicalize_url("http://www.example.com/do?q=a%20space&a=1"),
"http://www.example.com/do?a=1&q=a+space") "http://www.example.com/do?a=1&q=a+space")
# normalize percent-encoding case (in paths) @unittest.skipUnless(six.PY2, "TODO")
def test_normalize_percent_encoding_in_paths(self):
self.assertEqual(canonicalize_url("http://www.example.com/a%a3do"), self.assertEqual(canonicalize_url("http://www.example.com/a%a3do"),
"http://www.example.com/a%A3do"), "http://www.example.com/a%A3do"),
# normalize percent-encoding case (in query arguments)
@unittest.skipUnless(six.PY2, "TODO")
def test_normalize_percent_encoding_in_query_arguments(self):
self.assertEqual(canonicalize_url("http://www.example.com/do?k=b%a3"), self.assertEqual(canonicalize_url("http://www.example.com/do?k=b%a3"),
"http://www.example.com/do?k=b%A3") "http://www.example.com/do?k=b%A3")
# non-ASCII percent-encoding in paths def test_non_ascii_percent_encoding_in_paths(self):
self.assertEqual(canonicalize_url("http://www.example.com/a do?a=1"), self.assertEqual(canonicalize_url("http://www.example.com/a do?a=1"),
"http://www.example.com/a%20do?a=1"), "http://www.example.com/a%20do?a=1"),
self.assertEqual(canonicalize_url("http://www.example.com/a %20do?a=1"), self.assertEqual(canonicalize_url("http://www.example.com/a %20do?a=1"),
"http://www.example.com/a%20%20do?a=1"), "http://www.example.com/a%20%20do?a=1"),
self.assertEqual(canonicalize_url("http://www.example.com/a do\xc2\xa3.html?a=1"), self.assertEqual(canonicalize_url(u"http://www.example.com/a do£.html?a=1"),
"http://www.example.com/a%20do%C2%A3.html?a=1") "http://www.example.com/a%20do%C2%A3.html?a=1")
# non-ASCII percent-encoding in query arguments self.assertEqual(canonicalize_url(b"http://www.example.com/a do\xc2\xa3.html?a=1"),
"http://www.example.com/a%20do%C2%A3.html?a=1")
def test_non_ascii_percent_encoding_in_query_arguments(self):
self.assertEqual(canonicalize_url(u"http://www.example.com/do?price=\xa3500&a=5&z=3"), self.assertEqual(canonicalize_url(u"http://www.example.com/do?price=\xa3500&a=5&z=3"),
u"http://www.example.com/do?a=5&price=%C2%A3500&z=3") u"http://www.example.com/do?a=5&price=%C2%A3500&z=3")
self.assertEqual(canonicalize_url("http://www.example.com/do?price=\xc2\xa3500&a=5&z=3"), self.assertEqual(canonicalize_url(b"http://www.example.com/do?price=\xc2\xa3500&a=5&z=3"),
"http://www.example.com/do?a=5&price=%C2%A3500&z=3") "http://www.example.com/do?a=5&price=%C2%A3500&z=3")
self.assertEqual(canonicalize_url("http://www.example.com/do?price(\xc2\xa3)=500&a=1"), self.assertEqual(canonicalize_url(b"http://www.example.com/do?price(\xc2\xa3)=500&a=1"),
"http://www.example.com/do?a=1&price%28%C2%A3%29=500") "http://www.example.com/do?a=1&price%28%C2%A3%29=500")
# urls containing auth and ports def test_urls_with_auth_and_ports(self):
self.assertEqual(canonicalize_url(u"http://user:pass@www.example.com:81/do?now=1"), self.assertEqual(canonicalize_url(u"http://user:pass@www.example.com:81/do?now=1"),
u"http://user:pass@www.example.com:81/do?now=1") u"http://user:pass@www.example.com:81/do?now=1")
# remove fragments def test_remove_fragments(self):
self.assertEqual(canonicalize_url(u"http://user:pass@www.example.com/do?a=1#frag"), self.assertEqual(canonicalize_url(u"http://user:pass@www.example.com/do?a=1#frag"),
u"http://user:pass@www.example.com/do?a=1") u"http://user:pass@www.example.com/do?a=1")
self.assertEqual(canonicalize_url(u"http://user:pass@www.example.com/do?a=1#frag", keep_fragments=True), self.assertEqual(canonicalize_url(u"http://user:pass@www.example.com/do?a=1#frag", keep_fragments=True),
u"http://user:pass@www.example.com/do?a=1#frag") u"http://user:pass@www.example.com/do?a=1#frag")
def test_dont_convert_safe_characters(self):
# dont convert safe characters to percent encoding representation # dont convert safe characters to percent encoding representation
self.assertEqual(canonicalize_url( self.assertEqual(canonicalize_url(
"http://www.simplybedrooms.com/White-Bedroom-Furniture/Bedroom-Mirror:-Josephine-Cheval-Mirror.html"), "http://www.simplybedrooms.com/White-Bedroom-Furniture/Bedroom-Mirror:-Josephine-Cheval-Mirror.html"),
"http://www.simplybedrooms.com/White-Bedroom-Furniture/Bedroom-Mirror:-Josephine-Cheval-Mirror.html") "http://www.simplybedrooms.com/White-Bedroom-Furniture/Bedroom-Mirror:-Josephine-Cheval-Mirror.html")
@unittest.skipUnless(six.PY2, "TODO")
def test_safe_characters_unicode(self):
# urllib.quote uses a mapping cache of encoded characters. when parsing # urllib.quote uses a mapping cache of encoded characters. when parsing
# an already percent-encoded url, it will fail if that url was not # an already percent-encoded url, it will fail if that url was not
# percent-encoded as utf-8, that's why canonicalize_url must always # percent-encoded as utf-8, that's why canonicalize_url must always
@ -159,11 +176,11 @@ class UrlUtilsTest(unittest.TestCase):
self.assertEqual(canonicalize_url(u'http://www.example.com/caf%E9-con-leche.htm'), self.assertEqual(canonicalize_url(u'http://www.example.com/caf%E9-con-leche.htm'),
'http://www.example.com/caf%E9-con-leche.htm') 'http://www.example.com/caf%E9-con-leche.htm')
# domains are case insensitive def test_domains_are_case_insensitive(self):
self.assertEqual(canonicalize_url("http://www.EXAMPLE.com/"), self.assertEqual(canonicalize_url("http://www.EXAMPLE.com/"),
"http://www.example.com/") "http://www.example.com/")
# quoted slash and question sign def test_quoted_slash_and_question_sign(self):
self.assertEqual(canonicalize_url("http://foo.com/AC%2FDC+rocks%3f/?yeah=1"), self.assertEqual(canonicalize_url("http://foo.com/AC%2FDC+rocks%3f/?yeah=1"),
"http://foo.com/AC%2FDC+rocks%3F/?yeah=1") "http://foo.com/AC%2FDC+rocks%3F/?yeah=1")
self.assertEqual(canonicalize_url("http://foo.com/AC%2FDC/"), self.assertEqual(canonicalize_url("http://foo.com/AC%2FDC/"),