Source code for spyder.core.worker
#
# Copyright (c) 2011 Daniel Truemper truemped@googlemail.com
#
# worker.py 10-Jan-2011
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains a ZeroMQ based Worker abstraction.
The `ZmqWorker` class expects an incoming and one outgoing `zmq.socket` as well
as an instance of the `spyder.core.mgmt.ZmqMgmt` class.
"""
import traceback
from zmq.eventloop.ioloop import IOLoop
from zmq.eventloop.zmqstream import ZMQStream
from spyder.core.constants import ZMQ_SPYDER_MGMT_WORKER
from spyder.core.constants import ZMQ_SPYDER_MGMT_WORKER_QUIT
from spyder.core.log import LoggingMixin
from spyder.core.messages import DataMessage
[docs]class ZmqWorker(object, LoggingMixin):
"""
This is the ZMQ worker implementation.
The worker will register a :class:`ZMQStream` with the configured
:class:`zmq.Socket` and :class:`zmq.eventloop.ioloop.IOLoop` instance.
Upon `ZMQStream.on_recv` the configured `processors` will be executed
with the deserialized context and the result will be published through the
configured `zmq.socket`.
"""
def __init__(self, insocket, outsocket, mgmt, processing, log_handler,
log_level, io_loop=None):
"""
Initialize the `ZMQStream` with the `insocket` and `io_loop` and store
the `outsocket`.
`insocket` should be of the type `zmq.socket.PULL` `outsocket` should
be of the type `zmq.socket.PUB`
`mgmt` is an instance of `spyder.core.mgmt.ZmqMgmt` that handles
communication between master and worker processes.
"""
LoggingMixin.__init__(self, log_handler, log_level)
self._insocket = insocket
self._io_loop = io_loop or IOLoop.instance()
self._outsocket = outsocket
self._processing = processing
self._mgmt = mgmt
self._in_stream = ZMQStream(self._insocket, self._io_loop)
self._out_stream = ZMQStream(self._outsocket, self._io_loop)
def _quit(self, msg):
"""
The worker is quitting, stop receiving messages.
"""
if ZMQ_SPYDER_MGMT_WORKER_QUIT == msg.data:
self.stop()
def _receive(self, msg):
"""
We have a message!
`msg` is a serialized version of a `DataMessage`.
"""
message = DataMessage(msg)
try:
# this is the real work we want to do
curi = self._processing(message.curi)
message.curi = curi
except:
# catch any uncaught exception and only log it as CRITICAL
self._logger.critical(
"worker::Uncaught exception executing the worker for URL %s!" %
(message.curi.url,))
self._logger.critical("worker::%s" % (traceback.format_exc(),))
# finished, now send the result back to the master
self._out_stream.send_multipart(message.serialize())
[docs] def start(self):
"""
Start the worker.
"""
self._mgmt.add_callback(ZMQ_SPYDER_MGMT_WORKER, self._quit)
self._in_stream.on_recv(self._receive)
[docs] def stop(self):
"""
Stop the worker.
"""
# stop receiving
self._in_stream.stop_on_recv()
self._mgmt.remove_callback(ZMQ_SPYDER_MGMT_WORKER, self._quit)
# but work on anything we might already have
self._in_stream.flush()
self._out_stream.flush()
[docs] def close(self):
"""
Close all open sockets.
"""
self._in_stream.close()
self._insocket.close()
self._out_stream.close()
self._outsocket.close()
[docs]class AsyncZmqWorker(ZmqWorker):
"""
Asynchronous version of the `ZmqWorker`.
This worker differs in that the `self._processing` method should have two
arguments: the message and the socket where the result should be sent to!
"""
def _receive(self, msg):
"""
We have a message!
Instead of the synchronous version we do not handle serializing and
sending the result to the `self._outsocket`. This has to be handled by
the `self._processing` method.
"""
message = DataMessage(msg)
try:
self._processing(message, self._out_stream)
except:
# catch any uncaught exception and only log it as CRITICAL
self._logger.critical("Uncaught exception executing the worker!")
self._logger.critical(traceback.format_exc())