"""
A proxy interface to the central dispatcher that is used by
branch-and-bound workers.
Copyright by Gabriel A. Hackebeil (gabe.hackebeil@gmail.com).
"""
import array
import collections
import marshal
from pybnb.configuration import config
from pybnb.common import _int_to_termination_condition
from pybnb.node import _SerializedNode
from pybnb.problem import _SolveInfo
from pybnb.mpi_utils import send_nothing, recv_data
try:
import mpi4py
except ImportError: # pragma:nocover
pass
import six
_ProcessType = collections.namedtuple("_ProcessType", ["worker", "dispatcher"])
ProcessType = _ProcessType(worker=0, dispatcher=1)
"""A namespace of typecodes that are used to categorize
processes during dispatcher startup."""
_DispatcherAction = collections.namedtuple(
"_DispatcherAction",
[
"update",
"finalize",
"log_info",
"log_warning",
"log_debug",
"log_error",
"log_critical",
"stop_listen",
],
)
DispatcherAction = _DispatcherAction(
update=111,
finalize=211,
log_info=311,
log_warning=411,
log_debug=511,
log_error=611,
log_critical=711,
stop_listen=811,
)
"""A namespace of typecodes that are used to categorize
messages received by the dispatcher from workers."""
_DispatcherResponse = collections.namedtuple("_DispatcherResponse", ["work", "nowork"])
DispatcherResponse = _DispatcherResponse(work=1111, nowork=2111)
"""A namespace of typecodes that are used to categorize
responses received by workers from the dispatcher."""
[docs]class DispatcherProxy(object):
"""A proxy class for interacting with the central
dispatcher via message passing."""
@staticmethod
def _init(comm, ptype):
"""Broadcasts the dispatcher rank to everyone and
sets up a worker communicator that excludes the
single dispatcher."""
import mpi4py.MPI
assert mpi4py.MPI.Is_initialized()
assert len(ProcessType) == 2
assert ProcessType.dispatcher == 1
assert ProcessType.worker == 0
assert ptype in ProcessType
ptype_, dispatcher_rank = comm.allreduce(
sendobj=(ptype, comm.rank), op=mpi4py.MPI.MAXLOC
)
assert ptype_ == ProcessType.dispatcher
color = None
if ptype == ProcessType.dispatcher:
assert dispatcher_rank == comm.rank
color = 1
else:
assert dispatcher_rank != comm.rank
color = 0
assert color is not None
worker_comm = comm.Split(color)
if color == 1:
worker_comm.Free()
return dispatcher_rank
else:
return dispatcher_rank, worker_comm
def __init__(self, comm):
import mpi4py.MPI
assert mpi4py.MPI.Is_initialized()
self.comm = comm
self.worker_comm = None
self._status = mpi4py.MPI.Status()
(self.dispatcher_rank, self.worker_comm) = self._init(comm, ProcessType.worker)
def __del__(self):
if self.worker_comm is not None:
self.worker_comm.Free()
self.worker_comm = None
[docs] def update(self, best_objective, best_node, previous_bound, solve_info, node_list_):
"""A proxy to :func:`pybnb.dispatcher.Dispatcher.update`."""
if best_node is not None:
best_node = _SerializedNode.to_slots(best_node)
node_list = []
for node_ in node_list_:
# make sure the user-defined queue_priority
# can be safely marshaled
assert (
node_.queue_priority is None
) or node_.queue_priority == marshal.loads(
marshal.dumps(node_.queue_priority, config.MARSHAL_PROTOCOL_VERSION)
)
node_list.append(_SerializedNode.to_slots(node_))
data = marshal.dumps(
(best_objective, best_node, previous_bound, solve_info.data, node_list),
config.MARSHAL_PROTOCOL_VERSION,
)
self.comm.Send(
[data, mpi4py.MPI.BYTE], self.dispatcher_rank, tag=DispatcherAction.update
)
self.comm.Probe(status=self._status)
assert not self._status.Get_error()
tag = self._status.Get_tag()
recv_size = self._status.Get_count(mpi4py.MPI.BYTE)
data = bytearray(recv_size)
recv_data(self.comm, self._status, datatype=mpi4py.MPI.BYTE, out=data)
if tag == DispatcherResponse.nowork:
if six.PY2:
data_ = str(data)
else:
data_ = data
(
best_objective,
best_node_slots,
global_bound,
termination_condition_int,
solve_info_data,
) = marshal.loads(data_)
best_node = None
if best_node_slots is not None:
best_node = _SerializedNode.restore_node(best_node_slots)
solve_info = _SolveInfo()
solve_info.data = array.array("d", solve_info_data)
return (
True,
best_objective,
best_node,
(
global_bound,
_int_to_termination_condition[termination_condition_int],
solve_info,
),
)
else:
assert tag == DispatcherResponse.work
if six.PY2:
data_ = str(data)
else:
data_ = data
(best_objective, best_node_slots, node_slots) = marshal.loads(data_)
best_node = None
if best_node_slots is not None:
best_node = _SerializedNode.restore_node(best_node_slots)
node = _SerializedNode.restore_node(node_slots)
return False, best_objective, best_node, node
[docs] def log_info(self, msg):
"""A proxy to :func:`pybnb.dispatcher.Dispatcher.log_info`."""
self.comm.Ssend(
[msg.encode("utf8"), mpi4py.MPI.CHAR],
self.dispatcher_rank,
tag=DispatcherAction.log_info,
)
[docs] def log_warning(self, msg):
"""A proxy to :func:`pybnb.dispatcher.Dispatcher.log_warning`."""
self.comm.Ssend(
[msg.encode("utf8"), mpi4py.MPI.CHAR],
self.dispatcher_rank,
tag=DispatcherAction.log_warning,
)
[docs] def log_debug(self, msg):
"""A proxy to :func:`pybnb.dispatcher.Dispatcher.log_debug`."""
self.comm.Ssend(
[msg.encode("utf8"), mpi4py.MPI.CHAR],
self.dispatcher_rank,
tag=DispatcherAction.log_debug,
)
[docs] def log_error(self, msg):
"""A proxy to :func:`pybnb.dispatcher.Dispatcher.log_error`."""
self.comm.Ssend(
[msg.encode("utf8"), mpi4py.MPI.CHAR],
self.dispatcher_rank,
tag=DispatcherAction.log_error,
)
[docs] def log_critical(self, msg):
"""A proxy to :func:`pybnb.dispatcher.Dispatcher.log_critical`."""
self.comm.Ssend(
[msg.encode("utf8"), mpi4py.MPI.CHAR],
self.dispatcher_rank,
tag=DispatcherAction.log_critical,
)
[docs] def stop_listen(self):
"""Tell the dispatcher to abruptly stop the listen loop."""
assert self.worker_comm.rank == 0
send_nothing(self.comm, self.dispatcher_rank, tag=DispatcherAction.stop_listen)