mirror of
https://github.com/scrapy/scrapy.git
synced 2025-02-23 16:44:17 +00:00
some more fixes to cluster worker
--HG-- extra : convert_revision : svn%3Ab85faa78-f9eb-468e-a121-7cced6da292c%40333
This commit is contained in:
parent
3e1ad8d653
commit
52596f350c
@ -16,8 +16,8 @@ from scrapy.conf import settings
|
||||
|
||||
class ScrapyProcessProtocol(protocol.ProcessProtocol):
|
||||
|
||||
def __init__(self, procman, domain, logfile=None, spider_settings=None):
|
||||
self.procman = procman
|
||||
def __init__(self, worker, domain, logfile=None, spider_settings=None):
|
||||
self.worker = worker
|
||||
self.domain = domain
|
||||
self.logfile = logfile
|
||||
self.start_time = datetime.datetime.utcnow()
|
||||
@ -70,7 +70,7 @@ class ScrapyProcessProtocol(protocol.ProcessProtocol):
|
||||
log.msg("ClusterWorker: started domain=%s pid=%d log=%s" % (self.domain, self.pid, self.logfile))
|
||||
self.transport.closeStdin()
|
||||
self.status = "running"
|
||||
self.procman.update_master(self.domain, "running")
|
||||
self.worker.update_master(self.domain, "running")
|
||||
|
||||
def processEnded(self, status):
|
||||
if isinstance(status.value, ProcessDone):
|
||||
@ -80,9 +80,9 @@ class ScrapyProcessProtocol(protocol.ProcessProtocol):
|
||||
st = "terminated"
|
||||
er = ", error=%s" % str(status.value)
|
||||
log.msg("ClusterWorker: finished domain=%s status=%s pid=%d log=%s%s" % (self.domain, st, self.pid, self.logfile, er))
|
||||
del self.procman.running[self.domain]
|
||||
del self.procman.crawlers[self.pid]
|
||||
self.procman.update_master(self.domain, "scraped")
|
||||
del self.worker.running[self.domain]
|
||||
del self.worker.crawlers[self.pid]
|
||||
self.worker.update_master(self.domain, st)
|
||||
|
||||
class ClusterWorker(pb.Root):
|
||||
|
||||
@ -135,9 +135,9 @@ class ClusterWorker(pb.Root):
|
||||
|
||||
def update_master(self, domain, domain_status):
|
||||
try:
|
||||
deferred = self.__master.callRemote("update", self.status(), domain, domain_status)
|
||||
deferred = self._master.callRemote("update", self.status(), domain, domain_status)
|
||||
except pb.DeadReferenceError:
|
||||
self.__master = None
|
||||
self._master = None
|
||||
log.msg("Lost connection to master", log.ERROR)
|
||||
else:
|
||||
deferred.addCallbacks(callback=lambda x: x, errback=lambda reason: log.msg(reason, log.ERROR))
|
||||
@ -145,7 +145,7 @@ class ClusterWorker(pb.Root):
|
||||
def remote_set_master(self, master):
|
||||
"""Set the master for this worker"""
|
||||
log.msg("ClusterWorker: ClusterMaster connected from %s:%s" % master.broker.transport.client)
|
||||
self.__master = master
|
||||
self._master = master
|
||||
return self.status()
|
||||
|
||||
def remote_stop(self, domain):
|
||||
@ -173,13 +173,13 @@ class ClusterWorker(pb.Root):
|
||||
logfile = os.path.join(self.logdir, domain, time.strftime("%FT%T.log"))
|
||||
if not os.path.exists(os.path.dirname(logfile)):
|
||||
os.makedirs(os.path.dirname(logfile))
|
||||
scrapy_proc = ScrapyProcessProtocol(self, domain, logfile, spider_settings)
|
||||
args = [sys.executable, sys.argv[0], 'crawl', domain]
|
||||
self.running[domain] = scrapy_proc
|
||||
|
||||
for prerun_hook in self.prerun_hooks:
|
||||
prerun_hook(domain, spider_settings)
|
||||
|
||||
scrapy_proc = ScrapyProcessProtocol(self, domain, logfile, spider_settings)
|
||||
args = [sys.executable, sys.argv[0], 'crawl', domain]
|
||||
self.running[domain] = scrapy_proc
|
||||
reactor.spawnProcess(scrapy_proc, sys.executable, args=args, env=scrapy_proc.env)
|
||||
return self.status(ResponseCode.DOMAIN_STARTED, "Started process %s" % scrapy_proc)
|
||||
else:
|
||||
|
Loading…
x
Reference in New Issue
Block a user