From f3f34d0816863ea78d10e9db8d94c14e9d866f3a Mon Sep 17 00:00:00 2001 From: olveyra Date: Thu, 24 Jul 2008 15:18:29 +0000 Subject: [PATCH] - added basic statistics (needs to be improved) - node remotion when connection lost event --HG-- extra : convert_revision : svn%3Ab85faa78-f9eb-468e-a121-7cced6da292c%40103 --- .../contrib/pbcluster/master/manager.py | 39 +++++++++++++++---- .../contrib/pbcluster/worker/manager.py | 11 +++--- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/scrapy/trunk/scrapy/contrib/pbcluster/master/manager.py b/scrapy/trunk/scrapy/contrib/pbcluster/master/manager.py index a03de4abc..20737275c 100644 --- a/scrapy/trunk/scrapy/contrib/pbcluster/master/manager.py +++ b/scrapy/trunk/scrapy/contrib/pbcluster/master/manager.py @@ -68,9 +68,6 @@ class Node(pb.Referenceable): else: self.alive = True self.running = status['running'] - for proc in self.running: - if proc["domain"] in self.master.loading: - self.master.loading.remove(proc["domain"]) self.maxproc = status['maxproc'] self.starttime = status['starttime'] self.timestamp = status['timestamp'] @@ -137,8 +134,28 @@ class Node(pb.Referenceable): else: deferred.addCallbacks(callback=_run_callback, errback=_run_errback) - def remote_update(self, status): + def remote_update(self, status, domain, domain_status): self._set_status(status) + if domain in self.master.loading and domain_status == "running": + self.master.loading.remove(domain) + self.master.statistics["domains"]["running"].add(domain) + elif domain_status == "scraped": + self.master.statistics["domains"]["running"].remove(domain) + self.master.statistics["domains"]["scraped"][domain] = self.master.statistics["domains"]["scraped"].get(domain, 0) + 1 + self.master.statistics["scraped_count"] = self.master.statistics.get("scraped_count", 0) + 1 + if domain in self.master.statistics["domains"]["lost"]: + self.master.statistics["domains"]["lost"].remove(domain) + +class ScrapyPBClientFactory(pb.PBClientFactory): + def __init__(self, master, nodename): + pb.PBClientFactory.__init__(self) + self.master = master + self.nodename = nodename + + def clientConnectionLost(self, *args, **kargs): + pb.PBClientFactory.clientConnectionLost(self, *args, **kargs) + del self.master.nodes[self.nodename] + log.msg("Removed node %s." % self.nodename ) class ClusterMaster: @@ -159,7 +176,9 @@ class ClusterMaster: self.pending = [] self.loading = [] self.nodes = {} - self.statistics = {"domains": {}, "scraped_total": 0, "start_time": datetime.datetime.utcnow()} + self.start_time = datetime.datetime.utcnow() + #on how statistics works, see self.update_nodes() and Nodes.remote_update() + self.statistics = {"domains": {"running": set(), "scraped": {}, "lost_count": {}, "lost": set()}, "scraped_count": 0 } self.global_settings = {} #load cluster global settings for sname in settings.getlist('GLOBAL_CLUSTER_SETTINGS'): @@ -187,7 +206,7 @@ class ClusterMaster: port = eval(port) log.msg("Connecting to cluster worker %s..." % name) log.msg("Server: %s, Port: %s" % (server, port)) - factory = pb.PBClientFactory() + factory = ScrapyPBClientFactory(self, name) try: reactor.connectTCP(server, port, factory) except Exception, err: @@ -203,7 +222,13 @@ class ClusterMaster: else: log.msg("Reloading node. name: %s, url: %s" % (name, url) ) self.load_node(name, url) - + + real_running = set(self.running.keys()) + lost = self.statistics["domains"]["running"].difference(real_running) + for domain in lost: + self.statistics["domains"]["lost_count"][domain] = self.statistics["domains"]["lost_count"].get(domain, 0) + 1 + self.statistics["domains"]["lost"] = self.statistics["domains"]["lost"].union(lost) + def add_node(self, cworker, name): """Add node given its node""" node = Node(cworker, name, self) diff --git a/scrapy/trunk/scrapy/contrib/pbcluster/worker/manager.py b/scrapy/trunk/scrapy/contrib/pbcluster/worker/manager.py index ddb92c6f8..981e1263e 100644 --- a/scrapy/trunk/scrapy/contrib/pbcluster/worker/manager.py +++ b/scrapy/trunk/scrapy/contrib/pbcluster/worker/manager.py @@ -35,12 +35,13 @@ class ScrapyProcessProtocol(protocol.ProcessProtocol): log.msg("ClusterWorker: started domain=%s, pid=%d, log=%s" % (self.domain, self.pid, self.logfile)) self.transport.closeStdin() self.status = "running" - self.procman.update_master() + self.procman.update_master(self.domain, "running") - def processEnded(self, status_object): + def processEnded(self, reason): log.msg("ClusterWorker: finished domain=%s, pid=%d, log=%s" % (self.domain, self.pid, self.logfile)) + log.msg("Reason type: %s. value: %s" % (reason.type, reason.value) ) del self.procman.running[self.domain] - self.procman.update_master() + self.procman.update_master(self.domain, "scraped") class ClusterWorker(pb.Root): @@ -67,9 +68,9 @@ class ClusterWorker(pb.Root): status["callresponse"] = (rcode, rstring) if rstring else (0, "Status Response.") return status - def update_master(self): + def update_master(self, domain, domain_status): try: - deferred = self.__master.callRemote("update", self.status()) + deferred = self.__master.callRemote("update", self.status(), domain, domain_status) except pb.DeadReferenceError: self.__master = None log.msg("Lost connection to node %s." % (self.name), log.ERROR)