Source code for spyder.masterprocess

#
# Copyright (c) 2011 Daniel Truemper truemped@googlemail.com
#
# masterprocess.py 31-Jan-2011
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains the default architecture for master process.

The main task for masterprocesses is to create and run the **Frontier**.
Starting a master involves the following steps:

1. Bind to the configured |zmq| sockets
2. Start the management interface
3. Create the frontier
4. Start the master

Once the master is up and you have configured a ``settings.MASTER_CALLBACK``,
this method will be called before the master is really started, i.e. before the
``IOLoop.start()`` is called. This will allow you to insert *Seed* |urls|, e.g.
"""

import logging
import os
import signal
import socket
import traceback

import zmq
from zmq.core.error import ZMQError
from zmq.eventloop.ioloop import IOLoop
from zmq.log.handlers import PUBHandler

from spyder.import_util import import_class
from spyder.core.master import ZmqMaster
from spyder.core.mgmt import ZmqMgmt


[docs]def create_master_management(settings, zmq_context, io_loop): """ Create the management interface for master processes. """ listening_socket = zmq_context.socket(zmq.SUB) listening_socket.setsockopt(zmq.SUBSCRIBE, "") listening_socket.bind(settings.ZEROMQ_MGMT_WORKER) publishing_socket = zmq_context.socket(zmq.PUB) publishing_socket.bind(settings.ZEROMQ_MGMT_MASTER) return ZmqMgmt(listening_socket, publishing_socket, io_loop=io_loop)
[docs]def create_frontier(settings, log_handler): """ Create the frontier to use. """ frontier = import_class(settings.FRONTIER_CLASS) return frontier(settings, log_handler)
[docs]def main(settings): """ Main method for master processes. """ # create my own identity identity = "master:%s:%s" % (socket.gethostname(), os.getpid()) ctx = zmq.Context() io_loop = IOLoop.instance() # initialize the logging subsystem log_pub = ctx.socket(zmq.PUB) log_pub.connect(settings.ZEROMQ_LOGGING) zmq_logging_handler = PUBHandler(log_pub) zmq_logging_handler.root_topic = "spyder.master" logger = logging.getLogger() logger.addHandler(zmq_logging_handler) logger.setLevel(settings.LOG_LEVEL_MASTER) logger.info("process::Starting up the master") mgmt = create_master_management(settings, ctx, io_loop) frontier = create_frontier(settings, zmq_logging_handler) publishing_socket = ctx.socket(zmq.PUSH) publishing_socket.setsockopt(zmq.HWM, settings.ZEROMQ_MASTER_PUSH_HWM) publishing_socket.bind(settings.ZEROMQ_MASTER_PUSH) receiving_socket = ctx.socket(zmq.SUB) receiving_socket.setsockopt(zmq.SUBSCRIBE, "") receiving_socket.bind(settings.ZEROMQ_MASTER_SUB) master = ZmqMaster(settings, identity, receiving_socket, publishing_socket, mgmt, frontier, zmq_logging_handler, settings.LOG_LEVEL_MASTER, io_loop) def handle_shutdown_signal(_sig, _frame): """ Called from the os when a shutdown signal is fired. """ master.shutdown() # zmq 2.1 stops blocking calls, restart the ioloop io_loop.start() # handle kill signals signal.signal(signal.SIGINT, handle_shutdown_signal) signal.signal(signal.SIGTERM, handle_shutdown_signal) if settings.MASTER_CALLBACK: callback = import_class(settings.MASTER_CALLBACK) callback(settings, ctx, io_loop, frontier) mgmt.start() master.start() # this will block until the master stops try: io_loop.start() except ZMQError: logger.debug("Caught a ZMQError. Hopefully during shutdown") logger.debug(traceback.format_exc()) master.close() mgmt.close() logger.info("process::Master is down.") log_pub.close() ctx.term()