mirror of
https://github.com/scrapy/scrapy.git
synced 2025-02-22 06:52:53 +00:00
- added basic statistics (needs to be improved)
- node remotion when connection lost event --HG-- extra : convert_revision : svn%3Ab85faa78-f9eb-468e-a121-7cced6da292c%40103
This commit is contained in:
parent
e86ee64727
commit
f3f34d0816
@ -68,9 +68,6 @@ class Node(pb.Referenceable):
|
||||
else:
|
||||
self.alive = True
|
||||
self.running = status['running']
|
||||
for proc in self.running:
|
||||
if proc["domain"] in self.master.loading:
|
||||
self.master.loading.remove(proc["domain"])
|
||||
self.maxproc = status['maxproc']
|
||||
self.starttime = status['starttime']
|
||||
self.timestamp = status['timestamp']
|
||||
@ -137,8 +134,28 @@ class Node(pb.Referenceable):
|
||||
else:
|
||||
deferred.addCallbacks(callback=_run_callback, errback=_run_errback)
|
||||
|
||||
def remote_update(self, status):
|
||||
def remote_update(self, status, domain, domain_status):
|
||||
self._set_status(status)
|
||||
if domain in self.master.loading and domain_status == "running":
|
||||
self.master.loading.remove(domain)
|
||||
self.master.statistics["domains"]["running"].add(domain)
|
||||
elif domain_status == "scraped":
|
||||
self.master.statistics["domains"]["running"].remove(domain)
|
||||
self.master.statistics["domains"]["scraped"][domain] = self.master.statistics["domains"]["scraped"].get(domain, 0) + 1
|
||||
self.master.statistics["scraped_count"] = self.master.statistics.get("scraped_count", 0) + 1
|
||||
if domain in self.master.statistics["domains"]["lost"]:
|
||||
self.master.statistics["domains"]["lost"].remove(domain)
|
||||
|
||||
class ScrapyPBClientFactory(pb.PBClientFactory):
|
||||
def __init__(self, master, nodename):
|
||||
pb.PBClientFactory.__init__(self)
|
||||
self.master = master
|
||||
self.nodename = nodename
|
||||
|
||||
def clientConnectionLost(self, *args, **kargs):
|
||||
pb.PBClientFactory.clientConnectionLost(self, *args, **kargs)
|
||||
del self.master.nodes[self.nodename]
|
||||
log.msg("Removed node %s." % self.nodename )
|
||||
|
||||
class ClusterMaster:
|
||||
|
||||
@ -159,7 +176,9 @@ class ClusterMaster:
|
||||
self.pending = []
|
||||
self.loading = []
|
||||
self.nodes = {}
|
||||
self.statistics = {"domains": {}, "scraped_total": 0, "start_time": datetime.datetime.utcnow()}
|
||||
self.start_time = datetime.datetime.utcnow()
|
||||
#on how statistics works, see self.update_nodes() and Nodes.remote_update()
|
||||
self.statistics = {"domains": {"running": set(), "scraped": {}, "lost_count": {}, "lost": set()}, "scraped_count": 0 }
|
||||
self.global_settings = {}
|
||||
#load cluster global settings
|
||||
for sname in settings.getlist('GLOBAL_CLUSTER_SETTINGS'):
|
||||
@ -187,7 +206,7 @@ class ClusterMaster:
|
||||
port = eval(port)
|
||||
log.msg("Connecting to cluster worker %s..." % name)
|
||||
log.msg("Server: %s, Port: %s" % (server, port))
|
||||
factory = pb.PBClientFactory()
|
||||
factory = ScrapyPBClientFactory(self, name)
|
||||
try:
|
||||
reactor.connectTCP(server, port, factory)
|
||||
except Exception, err:
|
||||
@ -203,7 +222,13 @@ class ClusterMaster:
|
||||
else:
|
||||
log.msg("Reloading node. name: %s, url: %s" % (name, url) )
|
||||
self.load_node(name, url)
|
||||
|
||||
|
||||
real_running = set(self.running.keys())
|
||||
lost = self.statistics["domains"]["running"].difference(real_running)
|
||||
for domain in lost:
|
||||
self.statistics["domains"]["lost_count"][domain] = self.statistics["domains"]["lost_count"].get(domain, 0) + 1
|
||||
self.statistics["domains"]["lost"] = self.statistics["domains"]["lost"].union(lost)
|
||||
|
||||
def add_node(self, cworker, name):
|
||||
"""Add node given its node"""
|
||||
node = Node(cworker, name, self)
|
||||
|
@ -35,12 +35,13 @@ class ScrapyProcessProtocol(protocol.ProcessProtocol):
|
||||
log.msg("ClusterWorker: started domain=%s, pid=%d, log=%s" % (self.domain, self.pid, self.logfile))
|
||||
self.transport.closeStdin()
|
||||
self.status = "running"
|
||||
self.procman.update_master()
|
||||
self.procman.update_master(self.domain, "running")
|
||||
|
||||
def processEnded(self, status_object):
|
||||
def processEnded(self, reason):
|
||||
log.msg("ClusterWorker: finished domain=%s, pid=%d, log=%s" % (self.domain, self.pid, self.logfile))
|
||||
log.msg("Reason type: %s. value: %s" % (reason.type, reason.value) )
|
||||
del self.procman.running[self.domain]
|
||||
self.procman.update_master()
|
||||
self.procman.update_master(self.domain, "scraped")
|
||||
|
||||
class ClusterWorker(pb.Root):
|
||||
|
||||
@ -67,9 +68,9 @@ class ClusterWorker(pb.Root):
|
||||
status["callresponse"] = (rcode, rstring) if rstring else (0, "Status Response.")
|
||||
return status
|
||||
|
||||
def update_master(self):
|
||||
def update_master(self, domain, domain_status):
|
||||
try:
|
||||
deferred = self.__master.callRemote("update", self.status())
|
||||
deferred = self.__master.callRemote("update", self.status(), domain, domain_status)
|
||||
except pb.DeadReferenceError:
|
||||
self.__master = None
|
||||
log.msg("Lost connection to node %s." % (self.name), log.ERROR)
|
||||
|
Loading…
x
Reference in New Issue
Block a user