1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-26 15:04:37 +00:00
scrapy/scrapyd/launcher.py
Pablo Hoffman df54ed0041 Some Scrapyd enhancements:
* added minimal web ui
* return unique id per job (spider scheduled)
* store one log per spider run (job) and rotate them, keeping the last N logs (where N is configurable through settings)
2010-11-30 02:26:31 -02:00

102 lines
3.5 KiB
Python

import sys, os
from shutil import copyfileobj
from tempfile import mkstemp
from datetime import datetime
from twisted.internet import reactor, defer, protocol, error
from twisted.application.service import Service
from twisted.python import log
from scrapy.utils.py26 import cpu_count
from scrapy.utils.python import stringify_dict
from scrapyd.utils import get_crawl_args
from .interfaces import IPoller, IEggStorage, IEnvironment
class Launcher(Service):
name = 'launcher'
def __init__(self, config, app):
self.processes = {}
self.max_proc = config.getint('max_proc', 0)
if not self.max_proc:
self.max_proc = cpu_count() * config.getint('max_proc_per_cpu', 4)
self.egg_runner = config.get('egg_runner', 'scrapyd.eggrunner')
self.app = app
def startService(self):
for slot in range(self.max_proc):
self._wait_for_project(slot)
log.msg("%s started: max_proc=%r, egg_runner=%r" % (self.parent.name, \
self.max_proc, self.egg_runner), system="Launcher")
def _wait_for_project(self, slot):
poller = self.app.getComponent(IPoller)
poller.next().addCallback(self._spawn_process, slot)
def _get_eggpath(self, project):
eggstorage = self.app.getComponent(IEggStorage)
version, eggf = eggstorage.get(project)
if eggf is None:
return
prefix = '%s-%s-' % (project, version)
fd, eggpath = mkstemp(prefix=prefix, suffix='.egg')
lf = os.fdopen(fd, 'wb')
copyfileobj(eggf, lf)
lf.close()
return eggpath
def _spawn_process(self, message, slot):
msg = stringify_dict(message, keys_only=False)
project = msg['project']
eggpath = self._get_eggpath(project)
args = [sys.executable, '-m', self.egg_runner, 'crawl']
args += get_crawl_args(msg)
e = self.app.getComponent(IEnvironment)
env = e.get_environment(msg, slot, eggpath)
pp = ScrapyProcessProtocol(eggpath, slot, project, msg['spider'], msg['_id'])
pp.deferred.addBoth(self._process_finished, eggpath, slot)
reactor.spawnProcess(pp, sys.executable, args=args, env=env)
self.processes[slot] = pp
def _process_finished(self, _, eggpath, slot):
if eggpath:
os.remove(eggpath)
self.processes.pop(slot)
self._wait_for_project(slot)
class ScrapyProcessProtocol(protocol.ProcessProtocol):
def __init__(self, eggfile, slot, project, spider, job):
self.eggfile = eggfile
self.slot = slot
self.pid = None
self.project = project
self.spider = spider
self.job = job
self.start_time = datetime.now()
self.deferred = defer.Deferred()
def outReceived(self, data):
log.msg(data.rstrip(), system="Launcher,%d/stdout" % self.pid)
def errReceived(self, data):
log.msg(data.rstrip(), system="Launcher,%d/stderr" % self.pid)
def connectionMade(self):
self.pid = self.transport.pid
self.log("Process started: ")
def processEnded(self, status):
if isinstance(status.value, error.ProcessDone):
self.log("Process finished: ")
else:
self.log("Process died: exitstatus=%r " % status.value.exitCode)
self.deferred.callback(self)
def log(self, msg):
msg += "project=%r spider=%r job=%r pid=%r egg=%r" % (self.project, \
self.spider, self.job, self.pid, self.eggfile)
log.msg(msg, system="Launcher")