diff --git a/scrapy/trunk/scrapy/contrib/pbcluster/master/manager.py b/scrapy/trunk/scrapy/contrib/pbcluster/master/manager.py index 2d29a4b2d..08f0a99fe 100644 --- a/scrapy/trunk/scrapy/contrib/pbcluster/master/manager.py +++ b/scrapy/trunk/scrapy/contrib/pbcluster/master/manager.py @@ -27,15 +27,21 @@ def my_import(name): mod = getattr(mod, comp) return mod -class Node: - def __init__(self, remote, status, name, master): +class Node(pb.Referenceable): + def __init__(self, remote, name, master): self.__remote = remote - self._set_status(status) + self.alive = False self.name = name self.master = master self.available = True - self.statistics = {"domains": {}, "scraped_total": 0 } - + try: + deferred = self.__remote.callRemote("set_master", self) + except pb.DeadReferenceError: + self._set_status(None) + log.msg("Lost connection to node %s." % (self.name), log.ERROR) + else: + deferred.addCallbacks(callback=self._set_status, errback=lambda reason: log.msg(reason, log.ERROR)) + def status_as_dict(self, verbosity=0): status = {"alive": self.alive} if self.alive: @@ -92,19 +98,6 @@ class Node: else: deferred.addCallbacks(callback=self._set_status, errback=lambda reason: log.msg(reason, log.ERROR)) - def update_statistics(self): - - def _set_statistics(statistics): - self.statistics = statistics - - try: - deferred = self.__remote.callRemote("statistics") - except pb.DeadReferenceError: - self._set_status(None) - log.msg("Lost connection to node %s." % (self.name), log.ERROR) - else: - deferred.addCallbacks(callback=_set_statistics, errback=lambda reason: log.msg(reason, log.ERROR)) - def stop(self, domain): try: deferred = self.__remote.callRemote("stop", domain) @@ -142,8 +135,10 @@ class Node: else: deferred.addCallbacks(callback=_run_callback, errback=_run_errback) + def remote_update(self, status): + self._set_status(status) -class ClusterMaster(pb.Root): +class ClusterMaster: def __init__(self): @@ -170,18 +165,14 @@ class ClusterMaster(pb.Root): dispatcher.connect(self._engine_started, signal=signals.engine_started) dispatcher.connect(self._engine_stopped, signal=signals.engine_stopped) - port = settings.getint('CLUSTER_MASTER_PORT') - scrapyengine.listenTCP(port, pb.PBServerFactory(self)) def load_nodes(self): - - """Loads nodes from the CLUSTER_MASTER_NODES setting""" - + """Loads nodes listed in CLUSTER_MASTER_NODES setting""" for name, url in settings.get('CLUSTER_MASTER_NODES', {}).iteritems(): self.load_node(name, url) def load_node(self, name, url): - + """Creates the remote reference for each worker node""" def _make_callback(_factory, _name, _url): def _errback(_reason): @@ -203,25 +194,17 @@ class ClusterMaster(pb.Root): _make_callback(factory, name, url) def update_nodes(self): - self.statistics["scraped_total"] = 0 for name, url in settings.get('CLUSTER_MASTER_NODES', {}).iteritems(): if name in self.nodes and self.nodes[name].alive: log.msg("Updating node. name: %s, url: %s" % (name, url) ) self.nodes[name].update_status() - self.nodes[name].update_statistics() - self.statistics["scraped_total"] += self.nodes[name].statistics["scraped_total"] - self.statistics["timestamp"] = datetime.datetime.utcnow() - for domain in self.nodes[name].statistics["domains"]: - if not domain in self.statistics["domains"] or self.nodes[name].statistics["domains"][domain]["last_start_time"] >= self.statistics["domains"][domain]["last_start_time"]: - self.statistics["domains"][domain] = self.nodes[name].statistics["domains"][domain] else: log.msg("Reloading node. name: %s, url: %s" % (name, url) ) self.load_node(name, url) def add_node(self, cworker, name): """Add node given its node""" - node = Node(cworker, None, name, self) - node.update_status() + node = Node(cworker, name, self) self.nodes[name] = node log.msg("Added cluster worker %s" % name) @@ -242,7 +225,7 @@ class ClusterMaster(pb.Root): else: break for domain in domains: - pd = self.find_ifpending(domain) + pd = self.find_inpending(domain) if pd: #domain already pending, so just change priority if new is higher if priority < pd['priority']: self.pending.remove(pd) @@ -300,7 +283,7 @@ class ClusterMaster(pb.Root): def available_nodes(self): return (node for node in self.nodes.itervalues() if node.available) - def find_ifpending(self, domain): + def find_inpending(self, domain): for p in self.pending: if domain == p['domain']: return p diff --git a/scrapy/trunk/scrapy/contrib/pbcluster/worker/manager.py b/scrapy/trunk/scrapy/contrib/pbcluster/worker/manager.py index d47d74627..ddb92c6f8 100644 --- a/scrapy/trunk/scrapy/contrib/pbcluster/worker/manager.py +++ b/scrapy/trunk/scrapy/contrib/pbcluster/worker/manager.py @@ -35,13 +35,12 @@ class ScrapyProcessProtocol(protocol.ProcessProtocol): log.msg("ClusterWorker: started domain=%s, pid=%d, log=%s" % (self.domain, self.pid, self.logfile)) self.transport.closeStdin() self.status = "running" - self.procman.statistics["domains"][self.domain] = {"status": "running", "last_start_time": self.start_time} + self.procman.update_master() def processEnded(self, status_object): log.msg("ClusterWorker: finished domain=%s, pid=%d, log=%s" % (self.domain, self.pid, self.logfile)) del self.procman.running[self.domain] - self.procman.statistics["domains"][self.domain].update({"status": "scraped", "last_end_time": datetime.datetime.utcnow()}) - self.procman.statistics["scraped_total"] += 1 + self.procman.update_master() class ClusterWorker(pb.Root): @@ -53,11 +52,34 @@ class ClusterWorker(pb.Root): self.logdir = settings['CLUSTER_LOGDIR'] self.running = {} self.starttime = datetime.datetime.utcnow() - self.statistics = {"domains": {}, "scraped_total": 0} port = settings.getint('CLUSTER_WORKER_PORT') scrapyengine.listenTCP(port, pb.PBServerFactory(self)) log.msg("PYTHONPATH: %s" % repr(sys.path)) + def status(self, rcode=0, rstring=None): + status = {} + status["running"] = [ self.running[k].as_dict() for k in self.running.keys() ] + status["starttime"] = self.starttime + status["timestamp"] = datetime.datetime.utcnow() + status["maxproc"] = self.maxproc + status["loadavg"] = os.getloadavg() + status["logdir"] = self.logdir + status["callresponse"] = (rcode, rstring) if rstring else (0, "Status Response.") + return status + + def update_master(self): + try: + deferred = self.__master.callRemote("update", self.status()) + except pb.DeadReferenceError: + self.__master = None + log.msg("Lost connection to node %s." % (self.name), log.ERROR) + else: + deferred.addCallbacks(callback=lambda x: x, errback=lambda reason: log.msg(reason, log.ERROR)) + + def remote_set_master(self, master): + self.__master = master + return self.status() + def remote_stop(self, domain): """Stop running domain.""" if domain in self.running: @@ -71,26 +93,6 @@ class ClusterWorker(pb.Root): def remote_status(self): return self.status() - - def remote_statistics(self): - #This can detect processes that were abnormally killed (for example, by the kernel - #because of a memory ran out.) - for domain in self.statistics["domains"]: - if self.statistics["domains"][domain]["status"] == "running" and not domain in self.running: - self.statistics["domains"][domain]["status"] = "lost" - - return self.statistics - - def status(self, rcode=0, rstring=None): - status = {} - status["running"] = [ self.running[k].as_dict() for k in self.running.keys() ] - status["starttime"] = self.starttime - status["timestamp"] = datetime.datetime.utcnow() - status["maxproc"] = self.maxproc - status["loadavg"] = os.getloadavg() - status["logdir"] = self.logdir - status["callresponse"] = (rcode, rstring) if rstring else (0, "Status Response.") - return status def remote_run(self, domain, spider_settings=None): """Spawn process to run the given domain.""" @@ -108,7 +110,9 @@ class ClusterWorker(pb.Root): r = c.update(settings.get("CLUSTER_WORKER_SVNWORKDIR", ".")) log.msg("Updated to revision %s." %r[0].number, level=log.DEBUG) except pysvn.ClientError, e: - log.msg("Unable to svn update: %s" % e) + log.msg("Unable to svn update: %s" % e, level=log.WARNING) + except ImportError: + log.msg("pysvn module not available.", level=log.WARNING) proc = reactor.spawnProcess(scrapy_proc, sys.executable, args=args, env=scrapy_proc.env) return self.status(0, "Started process %s." % scrapy_proc) return self.status(2, "Domain %s already running." % domain )