mirror of
https://github.com/scrapy/scrapy.git
synced 2025-02-06 11:00:46 +00:00
452 lines
14 KiB
Python
452 lines
14 KiB
Python
import argparse
|
|
import os
|
|
from pathlib import Path
|
|
|
|
from twisted.internet import defer
|
|
|
|
from scrapy.commands import parse
|
|
from scrapy.settings import Settings
|
|
from scrapy.utils.python import to_unicode
|
|
from scrapy.utils.testproc import ProcessTest
|
|
from scrapy.utils.testsite import SiteTest
|
|
from tests.test_commands import CommandTest
|
|
|
|
|
|
def _textmode(bstr):
|
|
"""Normalize input the same as writing to a file
|
|
and reading from it in text mode"""
|
|
return to_unicode(bstr).replace(os.linesep, "\n")
|
|
|
|
|
|
class ParseCommandTest(ProcessTest, SiteTest, CommandTest):
|
|
command = "parse"
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.spider_name = "parse_spider"
|
|
(self.proj_mod_path / "spiders" / "myspider.py").write_text(
|
|
f"""
|
|
import scrapy
|
|
from scrapy.linkextractors import LinkExtractor
|
|
from scrapy.spiders import CrawlSpider, Rule
|
|
from scrapy.utils.test import get_from_asyncio_queue
|
|
import asyncio
|
|
|
|
|
|
class AsyncDefAsyncioReturnSpider(scrapy.Spider):
|
|
name = "asyncdef_asyncio_return"
|
|
|
|
async def parse(self, response):
|
|
await asyncio.sleep(0.2)
|
|
status = await get_from_asyncio_queue(response.status)
|
|
self.logger.info(f"Got response {{status}}")
|
|
return [{{'id': 1}}, {{'id': 2}}]
|
|
|
|
class AsyncDefAsyncioReturnSingleElementSpider(scrapy.Spider):
|
|
name = "asyncdef_asyncio_return_single_element"
|
|
|
|
async def parse(self, response):
|
|
await asyncio.sleep(0.1)
|
|
status = await get_from_asyncio_queue(response.status)
|
|
self.logger.info(f"Got response {{status}}")
|
|
return {{'foo': 42}}
|
|
|
|
class AsyncDefAsyncioGenLoopSpider(scrapy.Spider):
|
|
name = "asyncdef_asyncio_gen_loop"
|
|
|
|
async def parse(self, response):
|
|
for i in range(10):
|
|
await asyncio.sleep(0.1)
|
|
yield {{'foo': i}}
|
|
self.logger.info(f"Got response {{response.status}}")
|
|
|
|
class AsyncDefAsyncioSpider(scrapy.Spider):
|
|
name = "asyncdef_asyncio"
|
|
|
|
async def parse(self, response):
|
|
await asyncio.sleep(0.2)
|
|
status = await get_from_asyncio_queue(response.status)
|
|
self.logger.debug(f"Got response {{status}}")
|
|
|
|
class AsyncDefAsyncioGenExcSpider(scrapy.Spider):
|
|
name = "asyncdef_asyncio_gen_exc"
|
|
|
|
async def parse(self, response):
|
|
for i in range(10):
|
|
await asyncio.sleep(0.1)
|
|
yield {{'foo': i}}
|
|
if i > 5:
|
|
raise ValueError("Stopping the processing")
|
|
|
|
class CallbackSignatureDownloaderMiddleware:
|
|
def process_request(self, request, spider):
|
|
from inspect import signature
|
|
spider.logger.debug(f"request.callback signature: {{signature(request.callback)}}")
|
|
|
|
|
|
class MySpider(scrapy.Spider):
|
|
name = '{self.spider_name}'
|
|
|
|
custom_settings = {{
|
|
"DOWNLOADER_MIDDLEWARES": {{
|
|
CallbackSignatureDownloaderMiddleware: 0,
|
|
}}
|
|
}}
|
|
|
|
def parse(self, response):
|
|
if getattr(self, 'test_arg', None):
|
|
self.logger.debug('It Works!')
|
|
return [scrapy.Item(), dict(foo='bar')]
|
|
|
|
def parse_request_with_meta(self, response):
|
|
foo = response.meta.get('foo', 'bar')
|
|
|
|
if foo == 'bar':
|
|
self.logger.debug('It Does Not Work :(')
|
|
else:
|
|
self.logger.debug('It Works!')
|
|
|
|
def parse_request_with_cb_kwargs(self, response, foo=None, key=None):
|
|
if foo == 'bar' and key == 'value':
|
|
self.logger.debug('It Works!')
|
|
else:
|
|
self.logger.debug('It Does Not Work :(')
|
|
|
|
def parse_request_without_meta(self, response):
|
|
foo = response.meta.get('foo', 'bar')
|
|
|
|
if foo == 'bar':
|
|
self.logger.debug('It Works!')
|
|
else:
|
|
self.logger.debug('It Does Not Work :(')
|
|
|
|
class MyGoodCrawlSpider(CrawlSpider):
|
|
name = 'goodcrawl{self.spider_name}'
|
|
|
|
rules = (
|
|
Rule(LinkExtractor(allow=r'/html'), callback='parse_item', follow=True),
|
|
Rule(LinkExtractor(allow=r'/text'), follow=True),
|
|
)
|
|
|
|
def parse_item(self, response):
|
|
return [scrapy.Item(), dict(foo='bar')]
|
|
|
|
def parse(self, response):
|
|
return [scrapy.Item(), dict(nomatch='default')]
|
|
|
|
|
|
class MyBadCrawlSpider(CrawlSpider):
|
|
'''Spider which doesn't define a parse_item callback while using it in a rule.'''
|
|
name = 'badcrawl{self.spider_name}'
|
|
|
|
rules = (
|
|
Rule(LinkExtractor(allow=r'/html'), callback='parse_item', follow=True),
|
|
)
|
|
|
|
def parse(self, response):
|
|
return [scrapy.Item(), dict(foo='bar')]
|
|
""",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
(self.proj_mod_path / "pipelines.py").write_text(
|
|
"""
|
|
import logging
|
|
|
|
class MyPipeline:
|
|
component_name = 'my_pipeline'
|
|
|
|
def process_item(self, item, spider):
|
|
logging.info('It Works!')
|
|
return item
|
|
""",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
with (self.proj_mod_path / "settings.py").open("a", encoding="utf-8") as f:
|
|
f.write(
|
|
f"""
|
|
ITEM_PIPELINES = {{'{self.project_name}.pipelines.MyPipeline': 1}}
|
|
"""
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_spider_arguments(self):
|
|
_, _, stderr = yield self.execute(
|
|
[
|
|
"--spider",
|
|
self.spider_name,
|
|
"-a",
|
|
"test_arg=1",
|
|
"-c",
|
|
"parse",
|
|
"--verbose",
|
|
self.url("/html"),
|
|
]
|
|
)
|
|
self.assertIn("DEBUG: It Works!", _textmode(stderr))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_request_with_meta(self):
|
|
raw_json_string = '{"foo" : "baz"}'
|
|
_, _, stderr = yield self.execute(
|
|
[
|
|
"--spider",
|
|
self.spider_name,
|
|
"--meta",
|
|
raw_json_string,
|
|
"-c",
|
|
"parse_request_with_meta",
|
|
"--verbose",
|
|
self.url("/html"),
|
|
]
|
|
)
|
|
self.assertIn("DEBUG: It Works!", _textmode(stderr))
|
|
|
|
_, _, stderr = yield self.execute(
|
|
[
|
|
"--spider",
|
|
self.spider_name,
|
|
"-m",
|
|
raw_json_string,
|
|
"-c",
|
|
"parse_request_with_meta",
|
|
"--verbose",
|
|
self.url("/html"),
|
|
]
|
|
)
|
|
self.assertIn("DEBUG: It Works!", _textmode(stderr))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_request_with_cb_kwargs(self):
|
|
raw_json_string = '{"foo" : "bar", "key": "value"}'
|
|
_, _, stderr = yield self.execute(
|
|
[
|
|
"--spider",
|
|
self.spider_name,
|
|
"--cbkwargs",
|
|
raw_json_string,
|
|
"-c",
|
|
"parse_request_with_cb_kwargs",
|
|
"--verbose",
|
|
self.url("/html"),
|
|
]
|
|
)
|
|
log = _textmode(stderr)
|
|
self.assertIn("DEBUG: It Works!", log)
|
|
self.assertIn(
|
|
"DEBUG: request.callback signature: (response, foo=None, key=None)", log
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_request_without_meta(self):
|
|
_, _, stderr = yield self.execute(
|
|
[
|
|
"--spider",
|
|
self.spider_name,
|
|
"-c",
|
|
"parse_request_without_meta",
|
|
"--nolinks",
|
|
self.url("/html"),
|
|
]
|
|
)
|
|
self.assertIn("DEBUG: It Works!", _textmode(stderr))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_pipelines(self):
|
|
_, _, stderr = yield self.execute(
|
|
[
|
|
"--spider",
|
|
self.spider_name,
|
|
"--pipelines",
|
|
"-c",
|
|
"parse",
|
|
"--verbose",
|
|
self.url("/html"),
|
|
]
|
|
)
|
|
self.assertIn("INFO: It Works!", _textmode(stderr))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_async_def_asyncio_parse_items_list(self):
|
|
status, out, stderr = yield self.execute(
|
|
[
|
|
"--spider",
|
|
"asyncdef_asyncio_return",
|
|
"-c",
|
|
"parse",
|
|
self.url("/html"),
|
|
]
|
|
)
|
|
self.assertIn("INFO: Got response 200", _textmode(stderr))
|
|
self.assertIn("{'id': 1}", _textmode(out))
|
|
self.assertIn("{'id': 2}", _textmode(out))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_async_def_asyncio_parse_items_single_element(self):
|
|
status, out, stderr = yield self.execute(
|
|
[
|
|
"--spider",
|
|
"asyncdef_asyncio_return_single_element",
|
|
"-c",
|
|
"parse",
|
|
self.url("/html"),
|
|
]
|
|
)
|
|
self.assertIn("INFO: Got response 200", _textmode(stderr))
|
|
self.assertIn("{'foo': 42}", _textmode(out))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_async_def_asyncgen_parse_loop(self):
|
|
status, out, stderr = yield self.execute(
|
|
[
|
|
"--spider",
|
|
"asyncdef_asyncio_gen_loop",
|
|
"-c",
|
|
"parse",
|
|
self.url("/html"),
|
|
]
|
|
)
|
|
self.assertIn("INFO: Got response 200", _textmode(stderr))
|
|
for i in range(10):
|
|
self.assertIn(f"{{'foo': {i}}}", _textmode(out))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_async_def_asyncgen_parse_exc(self):
|
|
status, out, stderr = yield self.execute(
|
|
[
|
|
"--spider",
|
|
"asyncdef_asyncio_gen_exc",
|
|
"-c",
|
|
"parse",
|
|
self.url("/html"),
|
|
]
|
|
)
|
|
self.assertIn("ValueError", _textmode(stderr))
|
|
for i in range(7):
|
|
self.assertIn(f"{{'foo': {i}}}", _textmode(out))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_async_def_asyncio_parse(self):
|
|
_, _, stderr = yield self.execute(
|
|
[
|
|
"--spider",
|
|
"asyncdef_asyncio",
|
|
"-c",
|
|
"parse",
|
|
self.url("/html"),
|
|
]
|
|
)
|
|
self.assertIn("DEBUG: Got response 200", _textmode(stderr))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_parse_items(self):
|
|
status, out, stderr = yield self.execute(
|
|
["--spider", self.spider_name, "-c", "parse", self.url("/html")]
|
|
)
|
|
self.assertIn("""[{}, {'foo': 'bar'}]""", _textmode(out))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_parse_items_no_callback_passed(self):
|
|
status, out, stderr = yield self.execute(
|
|
["--spider", self.spider_name, self.url("/html")]
|
|
)
|
|
self.assertIn("""[{}, {'foo': 'bar'}]""", _textmode(out))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_wrong_callback_passed(self):
|
|
status, out, stderr = yield self.execute(
|
|
["--spider", self.spider_name, "-c", "dummy", self.url("/html")]
|
|
)
|
|
self.assertRegex(_textmode(out), r"""# Scraped Items -+\n\[\]""")
|
|
self.assertIn("""Cannot find callback""", _textmode(stderr))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_crawlspider_matching_rule_callback_set(self):
|
|
"""If a rule matches the URL, use it's defined callback."""
|
|
status, out, stderr = yield self.execute(
|
|
["--spider", "goodcrawl" + self.spider_name, "-r", self.url("/html")]
|
|
)
|
|
self.assertIn("""[{}, {'foo': 'bar'}]""", _textmode(out))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_crawlspider_matching_rule_default_callback(self):
|
|
"""If a rule match but it has no callback set, use the 'parse' callback."""
|
|
status, out, stderr = yield self.execute(
|
|
["--spider", "goodcrawl" + self.spider_name, "-r", self.url("/text")]
|
|
)
|
|
self.assertIn("""[{}, {'nomatch': 'default'}]""", _textmode(out))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_spider_with_no_rules_attribute(self):
|
|
"""Using -r with a spider with no rule should not produce items."""
|
|
status, out, stderr = yield self.execute(
|
|
["--spider", self.spider_name, "-r", self.url("/html")]
|
|
)
|
|
self.assertRegex(_textmode(out), r"""# Scraped Items -+\n\[\]""")
|
|
self.assertIn("""No CrawlSpider rules found""", _textmode(stderr))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_crawlspider_missing_callback(self):
|
|
status, out, stderr = yield self.execute(
|
|
["--spider", "badcrawl" + self.spider_name, "-r", self.url("/html")]
|
|
)
|
|
self.assertRegex(_textmode(out), r"""# Scraped Items -+\n\[\]""")
|
|
|
|
@defer.inlineCallbacks
|
|
def test_crawlspider_no_matching_rule(self):
|
|
"""The requested URL has no matching rule, so no items should be scraped"""
|
|
status, out, stderr = yield self.execute(
|
|
["--spider", "badcrawl" + self.spider_name, "-r", self.url("/enc-gb18030")]
|
|
)
|
|
self.assertRegex(_textmode(out), r"""# Scraped Items -+\n\[\]""")
|
|
self.assertIn("""Cannot find a rule that matches""", _textmode(stderr))
|
|
|
|
@defer.inlineCallbacks
|
|
def test_crawlspider_not_exists_with_not_matched_url(self):
|
|
status, out, stderr = yield self.execute([self.url("/invalid_url")])
|
|
self.assertEqual(status, 0)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_output_flag(self):
|
|
"""Checks if a file was created successfully having
|
|
correct format containing correct data in it.
|
|
"""
|
|
file_name = "data.json"
|
|
file_path = Path(self.proj_path, file_name)
|
|
yield self.execute(
|
|
[
|
|
"--spider",
|
|
self.spider_name,
|
|
"-c",
|
|
"parse",
|
|
"-o",
|
|
file_name,
|
|
self.url("/html"),
|
|
]
|
|
)
|
|
|
|
self.assertTrue(file_path.exists())
|
|
self.assertTrue(file_path.is_file())
|
|
|
|
content = '[\n{},\n{"foo": "bar"}\n]'
|
|
self.assertEqual(file_path.read_text(encoding="utf-8"), content)
|
|
|
|
def test_parse_add_options(self):
|
|
command = parse.Command()
|
|
command.settings = Settings()
|
|
parser = argparse.ArgumentParser(
|
|
prog="scrapy",
|
|
formatter_class=argparse.HelpFormatter,
|
|
conflict_handler="resolve",
|
|
prefix_chars="-",
|
|
)
|
|
command.add_options(parser)
|
|
namespace = parser.parse_args(
|
|
["--verbose", "--nolinks", "-d", "2", "--spider", self.spider_name]
|
|
)
|
|
self.assertTrue(namespace.nolinks)
|
|
self.assertEqual(namespace.depth, 2)
|
|
self.assertEqual(namespace.spider, self.spider_name)
|
|
self.assertTrue(namespace.verbose)
|