1
0
mirror of https://github.com/scrapy/scrapy.git synced 2025-02-22 05:33:28 +00:00

- added worker to master notifications.

- deleted statistics code. will change approach

--HG--
extra : convert_revision : svn%3Ab85faa78-f9eb-468e-a121-7cced6da292c%4099
This commit is contained in:
olveyra 2008-07-23 17:15:16 +00:00
parent 980369ba60
commit b5b79042ab
2 changed files with 48 additions and 61 deletions

View File

@ -27,15 +27,21 @@ def my_import(name):
mod = getattr(mod, comp)
return mod
class Node:
def __init__(self, remote, status, name, master):
class Node(pb.Referenceable):
def __init__(self, remote, name, master):
self.__remote = remote
self._set_status(status)
self.alive = False
self.name = name
self.master = master
self.available = True
self.statistics = {"domains": {}, "scraped_total": 0 }
try:
deferred = self.__remote.callRemote("set_master", self)
except pb.DeadReferenceError:
self._set_status(None)
log.msg("Lost connection to node %s." % (self.name), log.ERROR)
else:
deferred.addCallbacks(callback=self._set_status, errback=lambda reason: log.msg(reason, log.ERROR))
def status_as_dict(self, verbosity=0):
status = {"alive": self.alive}
if self.alive:
@ -92,19 +98,6 @@ class Node:
else:
deferred.addCallbacks(callback=self._set_status, errback=lambda reason: log.msg(reason, log.ERROR))
def update_statistics(self):
def _set_statistics(statistics):
self.statistics = statistics
try:
deferred = self.__remote.callRemote("statistics")
except pb.DeadReferenceError:
self._set_status(None)
log.msg("Lost connection to node %s." % (self.name), log.ERROR)
else:
deferred.addCallbacks(callback=_set_statistics, errback=lambda reason: log.msg(reason, log.ERROR))
def stop(self, domain):
try:
deferred = self.__remote.callRemote("stop", domain)
@ -142,8 +135,10 @@ class Node:
else:
deferred.addCallbacks(callback=_run_callback, errback=_run_errback)
def remote_update(self, status):
self._set_status(status)
class ClusterMaster(pb.Root):
class ClusterMaster:
def __init__(self):
@ -170,18 +165,14 @@ class ClusterMaster(pb.Root):
dispatcher.connect(self._engine_started, signal=signals.engine_started)
dispatcher.connect(self._engine_stopped, signal=signals.engine_stopped)
port = settings.getint('CLUSTER_MASTER_PORT')
scrapyengine.listenTCP(port, pb.PBServerFactory(self))
def load_nodes(self):
"""Loads nodes from the CLUSTER_MASTER_NODES setting"""
"""Loads nodes listed in CLUSTER_MASTER_NODES setting"""
for name, url in settings.get('CLUSTER_MASTER_NODES', {}).iteritems():
self.load_node(name, url)
def load_node(self, name, url):
"""Creates the remote reference for each worker node"""
def _make_callback(_factory, _name, _url):
def _errback(_reason):
@ -203,25 +194,17 @@ class ClusterMaster(pb.Root):
_make_callback(factory, name, url)
def update_nodes(self):
self.statistics["scraped_total"] = 0
for name, url in settings.get('CLUSTER_MASTER_NODES', {}).iteritems():
if name in self.nodes and self.nodes[name].alive:
log.msg("Updating node. name: %s, url: %s" % (name, url) )
self.nodes[name].update_status()
self.nodes[name].update_statistics()
self.statistics["scraped_total"] += self.nodes[name].statistics["scraped_total"]
self.statistics["timestamp"] = datetime.datetime.utcnow()
for domain in self.nodes[name].statistics["domains"]:
if not domain in self.statistics["domains"] or self.nodes[name].statistics["domains"][domain]["last_start_time"] >= self.statistics["domains"][domain]["last_start_time"]:
self.statistics["domains"][domain] = self.nodes[name].statistics["domains"][domain]
else:
log.msg("Reloading node. name: %s, url: %s" % (name, url) )
self.load_node(name, url)
def add_node(self, cworker, name):
"""Add node given its node"""
node = Node(cworker, None, name, self)
node.update_status()
node = Node(cworker, name, self)
self.nodes[name] = node
log.msg("Added cluster worker %s" % name)
@ -242,7 +225,7 @@ class ClusterMaster(pb.Root):
else:
break
for domain in domains:
pd = self.find_ifpending(domain)
pd = self.find_inpending(domain)
if pd: #domain already pending, so just change priority if new is higher
if priority < pd['priority']:
self.pending.remove(pd)
@ -300,7 +283,7 @@ class ClusterMaster(pb.Root):
def available_nodes(self):
return (node for node in self.nodes.itervalues() if node.available)
def find_ifpending(self, domain):
def find_inpending(self, domain):
for p in self.pending:
if domain == p['domain']:
return p

View File

@ -35,13 +35,12 @@ class ScrapyProcessProtocol(protocol.ProcessProtocol):
log.msg("ClusterWorker: started domain=%s, pid=%d, log=%s" % (self.domain, self.pid, self.logfile))
self.transport.closeStdin()
self.status = "running"
self.procman.statistics["domains"][self.domain] = {"status": "running", "last_start_time": self.start_time}
self.procman.update_master()
def processEnded(self, status_object):
log.msg("ClusterWorker: finished domain=%s, pid=%d, log=%s" % (self.domain, self.pid, self.logfile))
del self.procman.running[self.domain]
self.procman.statistics["domains"][self.domain].update({"status": "scraped", "last_end_time": datetime.datetime.utcnow()})
self.procman.statistics["scraped_total"] += 1
self.procman.update_master()
class ClusterWorker(pb.Root):
@ -53,11 +52,34 @@ class ClusterWorker(pb.Root):
self.logdir = settings['CLUSTER_LOGDIR']
self.running = {}
self.starttime = datetime.datetime.utcnow()
self.statistics = {"domains": {}, "scraped_total": 0}
port = settings.getint('CLUSTER_WORKER_PORT')
scrapyengine.listenTCP(port, pb.PBServerFactory(self))
log.msg("PYTHONPATH: %s" % repr(sys.path))
def status(self, rcode=0, rstring=None):
status = {}
status["running"] = [ self.running[k].as_dict() for k in self.running.keys() ]
status["starttime"] = self.starttime
status["timestamp"] = datetime.datetime.utcnow()
status["maxproc"] = self.maxproc
status["loadavg"] = os.getloadavg()
status["logdir"] = self.logdir
status["callresponse"] = (rcode, rstring) if rstring else (0, "Status Response.")
return status
def update_master(self):
try:
deferred = self.__master.callRemote("update", self.status())
except pb.DeadReferenceError:
self.__master = None
log.msg("Lost connection to node %s." % (self.name), log.ERROR)
else:
deferred.addCallbacks(callback=lambda x: x, errback=lambda reason: log.msg(reason, log.ERROR))
def remote_set_master(self, master):
self.__master = master
return self.status()
def remote_stop(self, domain):
"""Stop running domain."""
if domain in self.running:
@ -71,26 +93,6 @@ class ClusterWorker(pb.Root):
def remote_status(self):
return self.status()
def remote_statistics(self):
#This can detect processes that were abnormally killed (for example, by the kernel
#because of a memory ran out.)
for domain in self.statistics["domains"]:
if self.statistics["domains"][domain]["status"] == "running" and not domain in self.running:
self.statistics["domains"][domain]["status"] = "lost"
return self.statistics
def status(self, rcode=0, rstring=None):
status = {}
status["running"] = [ self.running[k].as_dict() for k in self.running.keys() ]
status["starttime"] = self.starttime
status["timestamp"] = datetime.datetime.utcnow()
status["maxproc"] = self.maxproc
status["loadavg"] = os.getloadavg()
status["logdir"] = self.logdir
status["callresponse"] = (rcode, rstring) if rstring else (0, "Status Response.")
return status
def remote_run(self, domain, spider_settings=None):
"""Spawn process to run the given domain."""
@ -108,7 +110,9 @@ class ClusterWorker(pb.Root):
r = c.update(settings.get("CLUSTER_WORKER_SVNWORKDIR", "."))
log.msg("Updated to revision %s." %r[0].number, level=log.DEBUG)
except pysvn.ClientError, e:
log.msg("Unable to svn update: %s" % e)
log.msg("Unable to svn update: %s" % e, level=log.WARNING)
except ImportError:
log.msg("pysvn module not available.", level=log.WARNING)
proc = reactor.spawnProcess(scrapy_proc, sys.executable, args=args, env=scrapy_proc.env)
return self.status(0, "Started process %s." % scrapy_proc)
return self.status(2, "Domain %s already running." % domain )