Source code for spyder.core.master
#
# Copyright (c) 2011 Daniel Truemper truemped@googlemail.com
#
# master.py 31-Jan-2011
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
A ZeroMQ master, i.e. the producer of URIs.
"""
import traceback
from Queue import Empty
from zmq.eventloop.ioloop import IOLoop, PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream
from spyder.core.constants import ZMQ_SPYDER_MGMT_WORKER
from spyder.core.constants import ZMQ_SPYDER_MGMT_WORKER_AVAIL
from spyder.core.constants import ZMQ_SPYDER_MGMT_WORKER_QUIT
from spyder.core.constants import ZMQ_SPYDER_MGMT_WORKER_QUIT_ACK
from spyder.core.messages import DataMessage
from spyder.core.log import LoggingMixin
[docs]class ZmqMaster(object, LoggingMixin):
"""
This is the ZMQ Master implementation.
The master will send :class:`DataMessage` object to the workers and receive
the processed messages. Unknown links will then be added to the frontier.
"""
def __init__(self, settings, identity, insocket, outsocket, mgmt, frontier,
log_handler, log_level, io_loop):
"""
Initialize the master.
"""
LoggingMixin.__init__(self, log_handler, log_level)
self._identity = identity
self._io_loop = io_loop or IOLoop.instance()
self._in_stream = ZMQStream(insocket, io_loop)
self._out_stream = ZMQStream(outsocket, io_loop)
self._mgmt = mgmt
self._frontier = frontier
self._running = False
self._available_workers = []
# periodically check if there are pending URIs to crawl
self._periodic_update = PeriodicCallback(self._send_next_uri,
settings.MASTER_PERIODIC_UPDATE_INTERVAL, io_loop=io_loop)
# start this periodic callback when you are waiting for the workers to
# finish
self._periodic_shutdown = PeriodicCallback(self._shutdown_wait, 500,
io_loop=io_loop)
self._shutdown_counter = 0
self._logger.debug("zmqmaster::initialized")
[docs] def start(self):
"""
Start the master.
"""
self._mgmt.add_callback(ZMQ_SPYDER_MGMT_WORKER, self._worker_msg)
self._in_stream.on_recv(self._receive_processed_uri)
self._periodic_update.start()
self._running = True
self._logger.debug("zmqmaster::starting...")
[docs] def stop(self):
"""
Stop the master gracefully, i.e. stop sending more URIs that should get
processed.
"""
self._logger.debug("zmqmaster::stopping...")
self._running = False
self._periodic_update.stop()
[docs] def shutdown(self):
"""
Shutdown the master and notify the workers.
"""
self._logger.debug("zmqmaster::shutdown...")
self.stop()
self._mgmt.publish(topic=ZMQ_SPYDER_MGMT_WORKER,
identity=self._identity, data=ZMQ_SPYDER_MGMT_WORKER_QUIT)
self._frontier.close()
self._periodic_shutdown.start()
def _shutdown_wait(self):
"""
Callback called from `self._periodic_shutdown` in order to wait for the
workers to finish.
"""
self._shutdown_counter += 1
if 0 == len(self._available_workers) or self._shutdown_counter > 5:
self._periodic_shutdown.stop()
self._logger.debug("zmqmaster::bye bye...")
self._io_loop.stop()
[docs] def close(self):
"""
Close all open sockets.
"""
self._in_stream.close()
self._out_stream.close()
[docs] def finished(self):
"""
Return true if all uris have been processed and the master is ready to
be shut down.
"""
return not self._running
def _worker_msg(self, msg):
"""
Called when a worker has sent a :class:`MgmtMessage`.
"""
if ZMQ_SPYDER_MGMT_WORKER_AVAIL == msg.data:
self._available_workers.append(msg.identity)
self._logger.info("zmqmaster::A new worker is available (%s)" %
msg.identity)
self._send_next_uri()
if ZMQ_SPYDER_MGMT_WORKER_QUIT_ACK == msg.data:
if msg.identity in self._available_workers:
self._available_workers.remove(msg.identity)
self._logger.info("zmqmaster::Removing worker (%s)" %
msg.identity)
def _send_next_uri(self):
"""
See if there are more uris to process and send them to the workers if
there are any.
At this point there is a very small heuristic in order to maximize the
throughput: try to keep the `self._out_stream._send_queue` full.
"""
if not self._running:
self._logger.error("Master is not running, not sending more uris")
return
num_workers = len(self._available_workers)
if self._running and num_workers > 0:
while self._out_stream._send_queue.qsize() < num_workers * 4:
try:
next_curi = self._frontier.get_next()
except Empty:
# well, frontier has nothing to process right now
self._logger.debug("zmqmaster::Nothing to crawl right now")
break
self._logger.info("zmqmaster::Begin crawling next URL (%s)" %
next_curi.url)
msg = DataMessage(identity=self._identity, curi=next_curi)
self._out_stream.send_multipart(msg.serialize())
def _receive_processed_uri(self, raw_msg):
"""
Receive and reschedule an URI that has been processed. Additionally add
all extracted URLs to the frontier.
"""
msg = DataMessage(raw_msg)
self._logger.info("zmqmaster::Crawling URL (%s) finished" %
msg.curi.url)
try:
if 200 <= msg.curi.status_code < 300:
# we have some kind of success code! yay
self._frontier.process_successful_crawl(msg.curi)
elif 300 <= msg.curi.status_code < 400:
# Some kind of redirect code. This will only happen if the number
# of redirects exceeds settings.MAX_REDIRECTS
self._frontier.process_redirect(msg.curi)
elif 400 <= msg.curi.status_code < 500:
# some kind of error where the resource could not be found.
self._frontier.process_not_found(msg.curi)
elif 500 <= msg.curi.status_code < 600:
# some kind of server error
self._frontier.process_server_error(msg.curi)
except:
self._logger.critical("zmqmaster::Uncaught exception in the sink")
self._logger.critical("zmqmaster::%s" % (traceback.format_exc(),))
self.stop()
self._send_next_uri()