Source code for pybnb.dispatcher

"""
Branch-and-bound dispatcher implementation.

Copyright by Gabriel A. Hackebeil (gabe.hackebeil@gmail.com).
"""
import array
import time
import collections
import os
import socket
import logging
import marshal
import math

from pybnb.configuration import config
from pybnb.common import (
    minimize,
    maximize,
    inf,
    TerminationCondition,
    _termination_condition_to_int,
)
from pybnb.misc import get_gap_labels
from pybnb.dispatcher_proxy import (
    ProcessType,
    DispatcherAction,
    DispatcherResponse,
    DispatcherProxy,
)
from pybnb.node import _SerializedNode
from pybnb.problem import _SolveInfo
from pybnb.mpi_utils import Message
from pybnb.priority_queue import PriorityQueueFactory

try:
    import mpi4py
except ImportError:  # pragma:nocover
    pass

from sortedcontainers import SortedList

import six


[docs]class DispatcherQueueData(object): """A namedtuple storing data that can be used to initialize a dispatcher queue. Attributes ---------- nodes : list A list of :class:`Node <pybnb.node.Node>` objects. worst_terminal_bound : float or None The worst bound of any node where branching did not continue. sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`} The objective sense for the problem that produced this queue. """ __slots__ = ("nodes", "worst_terminal_bound", "sense") def __init__(self, nodes, worst_terminal_bound, sense): self.nodes = nodes self.worst_terminal_bound = worst_terminal_bound self.sense = sense assert sense in (minimize, maximize)
[docs] def bound(self): """Returns the global bound defined by this queue data.""" bound = None if len(self.nodes) > 0: if self.sense == minimize: bound = min(n_.bound for n_ in self.nodes) else: assert self.sense == maximize bound = max(n_.bound for n_ in self.nodes) if self.worst_terminal_bound is not None: if bound is not None: if self.sense == minimize: bound = min(bound, self.worst_terminal_bound) else: assert self.sense == maximize bound = max(bound, self.worst_terminal_bound) else: bound = self.worst_terminal_bound return bound
[docs]class StatusPrinter(object): """Logs status information about the branch-and-bound solve. Parameters ---------- dispatcher : :class:`pybnb.dispatcher.Dispatcher` The central dispatcher that will be monitored. log : :class:`logging.Logger` A log object where solver output should be sent. log_interval_seconds : float The approximate maximum time (in seconds) between solver log updates. More time may pass between log updates if no updates have been received from any workers, and less time may pass if a new incumbent is found. (default: 1.0) """ def __init__(self, dispatcher, log, log_interval_seconds=1.0): assert log_interval_seconds >= 0 self._dispatcher = dispatcher self._log_interval_seconds = log_interval_seconds self._log = log percent_relative_gap_tol = 1e-4 if ( self._dispatcher.converger.relative_gap is not None ) and self._dispatcher.converger.relative_gap != 0: percent_relative_gap_tol = 100.0 * self._dispatcher.converger.relative_gap rgap_str_length, rgap_label_str, rgap_number_str = get_gap_labels( percent_relative_gap_tol, key="rgap" ) absolute_gap_tol = 1e-8 if ( self._dispatcher.converger.absolute_gap is not None ) and self._dispatcher.converger.absolute_gap != 0: absolute_gap_tol = self._dispatcher.converger.absolute_gap agap_str_length, agap_label_str, agap_number_str = get_gap_labels( absolute_gap_tol, key="agap", format="g" ) assert rgap_str_length >= 10 assert agap_str_length >= 10 extra_space = (rgap_str_length - 10) + (agap_str_length - 10) extra_space_left = extra_space // 2 extra_space_right = (extra_space // 2) + (extra_space % 2) self._lines = ( "--------------------" "--------------------" "--------------------" "--------------------" "--------------------" "----------------" ) + ("-" * extra_space) self._initial_header_line = ( self._lines + "\n" " Nodes |" + (" " * extra_space_left) + " Objective Bounds " + (" " * extra_space_right) + "| Work " ) self._header_line = ( " {explored:>9} {unexplored:>9} |{objective:>15} " "{bound:>15} " + rgap_label_str + " " + agap_label_str + " |{runtime:>9} {rate:>10} " "{imbalance:>9} {idle:>5}" ).format( explored="Expl", unexplored="Unexpl", objective="Incumbent", bound="Bound", runtime="Time (s)", rgap="Rel. Gap", agap="Abs. Gap", rate="Nodes/Sec", imbalance="Imbalance", idle="Idle", ) self._line_template = ( "{tag:>1}{explored:>9d} {unexplored:>9d} |{objective:>15.7g} " "{bound:>15.7g} " + rgap_number_str + "% " + agap_number_str + " |{runtime:>9.1f} {rate:>10.2f} " "{imbalance:>8.2f}% {idle:>6d}" ) self._line_template_big_gap = ( "{tag:>1}{explored:>9d} {unexplored:>9d} |{objective:>15.7g} " "{bound:>15.7g} " + rgap_label_str + "% " + agap_number_str + " |{runtime:>9.1f} {rate:>10.2f} " "{imbalance:>8.2f}% {idle:>6d}" ) self._last_print_time = float("-inf") served, explored, unexplored = self._dispatcher._get_node_counts() assert served == 0 assert explored == 0 assert unexplored == 0 self._last_explored_nodes_count = 0 self._last_rgap = None self._smoothing = 0.95 self._avg_time_per_node = None self._print_count = 0 self._new_objective = False self._report_new_objective = False
[docs] def log_info(self, msg): """Pass a message to ``log.info``""" self._log.info(msg)
[docs] def log_warning(self, msg): """Pass a message to ``log.warning``""" self._log.warning(msg)
[docs] def log_debug(self, msg): """Pass a message to ``log.debug``""" self._log.debug(msg)
[docs] def log_error(self, msg): """Pass a message to ``log.error``""" self._log.error(msg)
[docs] def log_critical(self, msg): """Pass a message to ``log.critical``""" self._log.critical(msg)
[docs] def new_objective(self, report=True): """Indicate that a new objective has been found Parameters ---------- report : bool, optional Indicate whether or not to force the next `tic` log output. (default: False) """ self._new_objective = True self._report_new_objective = report
[docs] def tic(self, force=False): """Provide an opportunity to log output if certain criteria are met. Parameters ---------- force : bool, optional Indicate whether or not to force logging of output, even if logging criteria are not met. (default: False) """ if not self._log.isEnabledFor(logging.INFO): return current_time = self._dispatcher.clock() new_objective = self._new_objective report_new_objective = self._report_new_objective delta_t = current_time - self._last_print_time self._report_new_objective = False if not force: if not report_new_objective: if delta_t < self._log_interval_seconds: return self._new_objective = False ( served_nodes_count, explored_nodes_count, unexplored_nodes_count, ) = self._dispatcher._get_node_counts() delta_n = explored_nodes_count - self._last_explored_nodes_count if delta_t and delta_n: if self._avg_time_per_node is None: self._avg_time_per_node = delta_t / float(delta_n) else: self._avg_time_per_node = ( self._smoothing * delta_t / float(delta_n) + (1 - self._smoothing) * self._avg_time_per_node ) if self._avg_time_per_node: rate = 1.0 / self._avg_time_per_node else: rate = 0.0 imbalance = self._dispatcher._compute_load_imbalance() idle = 0 if hasattr(self._dispatcher, "needs_work_queue"): idle = len(self._dispatcher.needs_work_queue) tag = "" if (not new_objective) else "*" bound = self._dispatcher._get_current_bound() objective = self._dispatcher.best_objective agap = self._dispatcher.converger.compute_absolute_gap(bound, objective) rgap = self._dispatcher.converger.compute_relative_gap(bound, objective) rgap *= 100.0 if (self._print_count % 5) == 0: if self._print_count == 0: self._log.info(self._initial_header_line) self._log.info(self._header_line) if (rgap != inf) and (rgap > 9999.0): self._log.info( self._line_template_big_gap.format( tag=tag, explored=explored_nodes_count, unexplored=unexplored_nodes_count, objective=objective, bound=bound, rgap="9999+", agap=agap, runtime=current_time - self._dispatcher.start_time, rate=rate, imbalance=imbalance, idle=idle, ) ) self._last_rgap = None else: self._log.info( self._line_template.format( tag=tag, explored=explored_nodes_count, unexplored=unexplored_nodes_count, objective=objective, bound=bound, rgap=rgap, agap=agap, runtime=current_time - self._dispatcher.start_time, rate=rate, imbalance=imbalance, idle=idle, ) ) self._last_rgap = rgap self._last_explored_nodes_count = explored_nodes_count self._print_count += 1 self._last_print_time = current_time
[docs]class DispatcherBase(object): """The base dispatcher implementation with some core functionality shared by the distributed and local implementations.""" def __init__(self): self.start_time = None self.initialized = False self.log_new_incumbent = False self.termination_condition = None self.converger = None self.last_global_bound = None self.track_bound = False self.queue = None self.best_node = None self.best_objective = None self.journalist = None self.node_limit = None self.time_limit = None self.queue_limit = None self.served_nodes_count = None self.worst_terminal_bound = None self.clock = None def _add_work_to_queue(self, node): bound = node.bound if self.converger.eligible_for_queue(bound, self.best_objective): self.queue.put(node) if ( (self.queue_limit is not None) and (self.queue.size() > self.queue_limit) and (self.termination_condition is None) ): self.termination_condition = TerminationCondition.queue_limit return True else: self._check_update_worst_terminal_bound(bound) return False def _check_update_worst_terminal_bound(self, bound): if (self.worst_terminal_bound is None) or self.converger.bound_worsened( bound, self.worst_terminal_bound ): self.worst_terminal_bound = bound def _check_update_best_objective(self, objective): assert objective is not None assert not math.isnan(objective) updated = False if self.converger.objective_improved(objective, self.best_objective): self.best_objective = objective updated = True if self.journalist is not None: self.journalist.new_objective(report=self.log_new_incumbent) eligible_for_queue_ = self.converger.eligible_for_queue removed = self.queue.filter( lambda node_: eligible_for_queue_(node_.bound, self.best_objective) ) for node_ in removed: self._check_update_worst_terminal_bound(node_.bound) return updated def _check_update_best_node(self, node): best_objective_updated = False objective = node.objective assert objective is not None assert not math.isnan(objective) if (objective != self.converger.infeasible_objective) and ( (self.best_node is None) or self.converger.objective_improved(objective, self.best_node.objective) ): assert node._uuid is not None self.best_node = node best_objective_updated = self._check_update_best_objective(objective) return best_objective_updated def _check_termination(self): # check if we are done if self.termination_condition is None: if self.track_bound: global_bound = self._get_current_bound() self.last_global_bound = global_bound self.termination_condition = self.converger.check_termination_criteria( global_bound, self.best_objective ) if self.termination_condition is None: if self.node_limit is not None: ( served_nodes_count, explored_nodes_count, _, ) = self._get_node_counts() # we need to check the explored count as # it may be greater than the served # count due to the use of a nested solver if (served_nodes_count >= self.node_limit) or ( explored_nodes_count >= self.node_limit ): self.termination_condition = TerminationCondition.node_limit if self.termination_condition is None: if (self.time_limit is not None) and ( (self.clock() - self.start_time) >= self.time_limit ): self.termination_condition = TerminationCondition.time_limit def _get_work_item(self): node = self.queue.get() assert node is not None self.served_nodes_count += 1 return node # # Interface #
[docs] def initialize( self, best_objective, best_node, initialize_queue, queue_strategy, converger, node_limit, time_limit, queue_limit, track_bound, log, log_interval_seconds, log_new_incumbent, ): """Initialize the dispatcher for a new solve. Parameters ---------- best_objective : float The assumed best objective to start with. best_node : :class:`Node <pybnb.node.Node>` A node storing the assumed best objective. initialize_queue : :class:`pybnb.dispatcher.DispatcherQueueData` The initial queue. queue_strategy : :class:`QueueStrategy <pybnb.common.QueueStrategy>` Sets the strategy for prioritizing nodes in the central dispatcher queue. See the :class:`QueueStrategy <pybnb.common.QueueStrategy>` enum for the list of acceptable values. converger : :class:`pybnb.convergence_checker.ConvergenceChecker` The branch-and-bound convergence checker object. node_limit : int or None An integer representing the maximum number of nodes to processes before beginning to terminate the solve. If None, no node limit will be enforced. time_limit : float or None The maximum amount of time to spend processing nodes before beginning to terminate the solve. If None, no time limit will be enforced. queue_limit : int or None The maximum allowed queue size. If exceeded, the solve will terminate. If None, no size limit on the queue will be enforced. log : ``logging.Logger`` A log object where solver output should be sent. log_interval_seconds : float The approximate maximum time (in seconds) between solver log updates. More time may pass between log updates if no updates have been received from any workers, and less time may pass if a new incumbent is found. log_new_incumbent : bool Controls whether updates to the best objective are logged immediately (overriding the log interval). Setting this to false can be useful when frequent updates to the incumbent are expected and the additional logging slows down the dispatcher. """ assert (node_limit is None) or ( (node_limit > 0) and (node_limit == int(node_limit)) ) assert (time_limit is None) or (time_limit >= 0) assert (queue_limit is None) or (queue_limit >= 0) self.start_time = self.clock() self.initialized = True self.log_new_incumbent = log_new_incumbent self.termination_condition = None self.converger = converger if initialize_queue.sense != self.converger.sense: raise ValueError( "The objective sense does not match that of the initial queue." ) self.last_global_bound = self.converger.unbounded_objective self.track_bound = track_bound self.queue = PriorityQueueFactory( queue_strategy, self.converger.sense, self.track_bound ) self.best_objective = best_objective self.best_node = None if best_node is not None: assert best_node._uuid is not None self._check_update_best_node(best_node) self.node_limit = None if node_limit is not None: self.node_limit = int(node_limit) self.time_limit = None if time_limit is not None: self.time_limit = float(time_limit) self.queue_limit = None if queue_limit is not None: self.queue_limit = int(queue_limit) self.served_nodes_count = 0 self.worst_terminal_bound = initialize_queue.worst_terminal_bound self.journalist = None if (log is not None) and (not log.disabled): self.journalist = StatusPrinter( self, log, log_interval_seconds=log_interval_seconds ) for node in initialize_queue.nodes: self._add_work_to_queue(node)
[docs] def log_info(self, msg): """Pass a message to ``log.info``""" if self.journalist is not None: self.journalist.log_info(msg)
[docs] def log_warning(self, msg): """Pass a message to ``log.warning``""" if self.journalist is not None: self.journalist.log_warning(msg)
[docs] def log_debug(self, msg): """Pass a message to ``log.debug``""" if self.journalist is not None: self.journalist.log_debug(msg)
[docs] def log_error(self, msg): """Pass a message to ``log.error``""" if self.journalist is not None: self.journalist.log_error(msg)
[docs] def log_critical(self, msg): """Pass a message to ``log.critical``""" if self.journalist is not None: self.journalist.log_critical(msg)
[docs] def save_dispatcher_queue(self): """Saves the current dispatcher queue. The result can be used to re-initialize a solve. Returns ------- queue_data : :class:`pybnb.dispatcher.DispatcherQueueData` An object storing information that can be used to re-initialize the dispatcher queue to its current state. """ return DispatcherQueueData( nodes=list(self.queue.items()), worst_terminal_bound=self.worst_terminal_bound, sense=self.converger.sense, )
# # Abstract Methods # def update(self, *args, **kwds): # pragma:nocover raise NotImplementedError def _compute_load_imbalance(self): # pragma:nocover """Get the worker load imbalance.""" raise NotImplementedError() def _get_current_bound(self): # pragma:nocover """Get the current global bound""" raise NotImplementedError() def _get_final_solve_info(self): # pragma:nocover """Get the final solve information""" raise NotImplementedError() def _get_node_counts(self): # pragma:nocover """Get the served and explored node counts""" raise NotImplementedError()
[docs]class DispatcherLocal(DispatcherBase): """The central dispatcher for a serial branch-and-bound algorithm.""" def __init__(self): super(DispatcherLocal, self).__init__() self.external_bound = None self.solve_info = _SolveInfo() self.active_nodes = 0 self.clock = time.time def _compute_load_imbalance(self): """Get the worker load imbalance.""" return 0.0 def _get_current_bound(self): """Get the current global bound""" bound = self.queue.bound() if self.converger.sense == maximize: if (self.external_bound is not None) and ( (bound is None) or (self.external_bound > bound) ): bound = self.external_bound if (self.worst_terminal_bound is not None) and ( (bound is None) or (self.worst_terminal_bound > bound) ): bound = self.worst_terminal_bound else: if (self.external_bound is not None) and ( (bound is None) or (self.external_bound < bound) ): bound = self.external_bound if (self.worst_terminal_bound is not None) and ( (bound is None) or (self.worst_terminal_bound < bound) ): bound = self.worst_terminal_bound if bound is not None: return bound else: # likely means the dispatcher was initialized # with an empty queue return self.converger.unbounded_objective def _get_final_solve_info(self): """Get the final solve information""" solve_info = _SolveInfo() solve_info.data[:] = self.solve_info.data return solve_info def _get_node_counts(self): return ( self.served_nodes_count, self.solve_info.explored_nodes_count, self.queue.size() + self.active_nodes, ) # # Overloaded base class methods # def _check_update_best_objective(self, objective): updated = super(DispatcherLocal, self)._check_update_best_objective(objective) if updated: assert self.best_objective == objective if self.external_bound is not None: if not self.converger.eligible_for_queue( self.external_bound, objective ): self.external_bound = None # # Interface #
[docs] def initialize( self, best_objective, best_node, initialize_queue, queue_strategy, converger, node_limit, time_limit, queue_limit, track_bound, log, log_interval_seconds, log_new_incumbent, ): """Initialize the dispatcher. See the :func:`pybnb.dispatcher.DispatcherBase.initialize` method for argument descriptions.""" self.solve_info.reset() self.active_nodes = 0 super(DispatcherLocal, self).initialize( best_objective, best_node, initialize_queue, queue_strategy, converger, node_limit, time_limit, queue_limit, track_bound, log, log_interval_seconds, log_new_incumbent, ) if self.journalist is not None: self.log_info( "Starting branch & bound solve:\n" " - dispatcher pid: %s (%s)\n" " - worker processes: 1" % (os.getpid(), socket.gethostname()) ) self.journalist.tic()
[docs] def update(self, best_objective, best_node, terminal_bound, solve_info, node_list): """Update local worker information. Parameters ---------- best_objective : float or None A new potential best objective found by the worker. best_node : :class:`Node <pybnb.node.Node>` or None A new potential best node found by the worker. terminal_bound : float or None The worst bound of any terminal nodes that were processed by the worker since the last update. solve_info : :class:`_SolveInfo` The most up-to-date worker solve information. node_list : list A list of nodes to add to the queue. Returns ------- solve_finished : bool Indicates if the dispatcher has terminated the solve. new_objective : float The best objective known to the dispatcher. best_node : :class:`Node <pybnb.node.Node>` or None The best node known to the dispatcher. data : :class:`Node <pybnb.node.Node>` or None If solve_finished is false, a new node for the worker to process. Otherwise, a tuple containing the global bound, the termination condition string, and the number of explored nodes. """ assert self.initialized if best_objective is not None: self._check_update_best_objective(best_objective) if best_node is not None: assert best_node._uuid is not None self._check_update_best_node(best_node) self.solve_info.data[:] = solve_info.data self.external_bound = None self.active_nodes = 0 if len(node_list) > 0: for node in node_list: self._add_work_to_queue(node) if terminal_bound is not None: self._check_update_worst_terminal_bound(terminal_bound) last_global_bound = self.last_global_bound self._check_termination() if (self.queue.size() == 0) and (self.termination_condition is None): self.termination_condition = TerminationCondition.queue_empty if self.termination_condition is None: node = self._get_work_item() self.active_nodes = 1 self.external_bound = node.bound if self.journalist is not None: force = (last_global_bound == self.converger.unbounded_objective) and ( last_global_bound != self.last_global_bound ) self.journalist.tic(force=force) return (False, self.best_objective, self.best_node, node) else: if self.journalist is not None: self.journalist.tic(force=True) self.journalist.log_info(self.journalist._lines) self.initialized = False return ( True, self.best_objective, self.best_node, ( self._get_current_bound(), self.termination_condition, self._get_final_solve_info(), ), )
[docs]class DispatcherDistributed(DispatcherBase): """The central dispatcher for a distributed branch-and-bound algorithm. Parameters ---------- comm : ``mpi4py.MPI.Comm``, optional The MPI communicator to use. If set to None, this will disable the use of MPI and avoid an attempted import of `mpi4py.MPI` (which avoids triggering a call to `MPI_Init()`). """ def __init__(self, comm): assert comm.size > 1 import mpi4py.MPI assert mpi4py.MPI.Is_initialized() super(DispatcherDistributed, self).__init__() self.clock = mpi4py.MPI.Wtime self.comm = comm # send rank of dispatcher to all workers self.dispatcher_rank = DispatcherProxy._init(self.comm, ProcessType.dispatcher) assert self.dispatcher_rank == self.comm.rank self.worker_ranks = [i for i in range(self.comm.size) if i != self.comm.rank] self.needs_work_queue = collections.deque([], len(self.worker_ranks)) self._solve_info_by_source = {i: _SolveInfo() for i in self.worker_ranks} self.last_known_bound = dict() self.external_bounds = SortedList() self.has_work = set() self._send_requests = None self.explored_nodes_count = 0 def _compute_load_imbalance(self): pmin = inf pmax = -inf psum = 0 for solve_info in self._solve_info_by_source.values(): count = solve_info.explored_nodes_count if count < pmin: pmin = count if count > pmax: pmax = count psum += count if psum > 0: pavg = psum / float(len(self._solve_info_by_source)) return (pmax - pmin) / pavg * 100.0 else: return 0.0 def _get_current_bound(self): """Get the current global bound""" bound = self.queue.bound() if self.converger.sense == maximize: if len(self.external_bounds) and ( (bound is None) or (self.external_bounds[-1] > bound) ): bound = self.external_bounds[-1] if (self.worst_terminal_bound is not None) and ( (bound is None) or (self.worst_terminal_bound > bound) ): bound = self.worst_terminal_bound else: if len(self.external_bounds) and ( (bound is None) or (self.external_bounds[0] < bound) ): bound = self.external_bounds[0] if (self.worst_terminal_bound is not None) and ( (bound is None) or (self.worst_terminal_bound < bound) ): bound = self.worst_terminal_bound if bound is not None: return bound else: # likely means the dispatcher was initialized # with an empty queue return self.converger.unbounded_objective def _get_final_solve_info(self): solve_info = _SolveInfo() for worker_solve_info in self._solve_info_by_source.values(): solve_info.add_from(worker_solve_info) return solve_info def _get_node_counts(self): return ( self.served_nodes_count, self.explored_nodes_count, self.queue.size() + len(self.has_work), ) # # Overloaded base class methods # def _check_update_best_objective(self, objective): updated = super(DispatcherDistributed, self)._check_update_best_objective( objective ) if updated: assert self.best_objective == objective self_external_bounds = self.external_bounds eligible_for_queue = self.converger.eligible_for_queue # trim the sorted external_bounds list N = len(self_external_bounds) if self.converger.sense == maximize: i = 0 for i in range(N): if eligible_for_queue(self_external_bounds[i], objective): break if i != 0: self.external_bounds = SortedList(self_external_bounds.islice(i, N)) else: i = N - 1 for i in range(N - 1, -1, -1): if eligible_for_queue(self_external_bounds[i], objective): break if i != N - 1: self.external_bounds = SortedList( self_external_bounds.islice(0, i + 1) ) def _get_work_to_send(self, dest): node = self._get_work_item() bound = node.bound self.last_known_bound[dest] = bound self.external_bounds.add(bound) self.has_work.add(dest) return node def _send_work(self): stop = False data = None if len(self.needs_work_queue) > 0: if self._send_requests is None: self._send_requests = {i: None for i in self.worker_ranks} if self.termination_condition is None: while (self.queue.size() > 0) and (len(self.needs_work_queue) > 0): stop = False dest = self.needs_work_queue.popleft() node = self._get_work_to_send(dest) best_node_slots = None if self.best_node is not None: best_node_slots = self.best_node.slots send_ = marshal.dumps( (self.best_objective, best_node_slots, node.slots), config.MARSHAL_PROTOCOL_VERSION, ) if self._send_requests[dest] is not None: self._send_requests[dest].Wait() self._send_requests[dest] = self.comm.Isend( [send_, mpi4py.MPI.BYTE], dest, tag=DispatcherResponse.work ) # a shortcut to check if we should keep sending nodes if (self.node_limit is not None) and ( self.served_nodes_count >= self.node_limit ): break if len(self.needs_work_queue) == (self.comm.size - 1): if self.termination_condition is None: self.termination_condition = TerminationCondition.queue_empty requests = [] for r_ in self._send_requests.values(): if r_ is not None: requests.append(r_) mpi4py.MPI.Request.Waitall(requests) self._send_requests = None stop = True data = ( self._get_current_bound(), self.termination_condition, self._get_final_solve_info(), ) best_node_slots = None if self.best_node is not None: best_node_slots = self.best_node.slots send_ = marshal.dumps( ( self.best_objective, best_node_slots, data[0], _termination_condition_to_int[data[1]], data[2].data, ), config.MARSHAL_PROTOCOL_VERSION, ) # everyone needs work, so we must be done requests = [] while len(self.needs_work_queue) > 0: dest = self.needs_work_queue.popleft() requests.append( self.comm.Isend( [send_, mpi4py.MPI.BYTE], dest, DispatcherResponse.nowork ) ) mpi4py.MPI.Request.Waitall(requests) return (stop, self.best_objective, self.best_node, data) def _update_solve_info(self, solve_info_data, source): self.explored_nodes_count -= self._solve_info_by_source[ source ].explored_nodes_count self._solve_info_by_source[source].data[:] = solve_info_data self.explored_nodes_count += self._solve_info_by_source[ source ].explored_nodes_count # # Interface #
[docs] def initialize( self, best_objective, best_node, initialize_queue, queue_strategy, converger, node_limit, time_limit, queue_limit, track_bound, log, log_interval_seconds, log_new_incumbent, ): """Initialize the dispatcher. See the :func:`pybnb.dispatcher.DispatcherBase.initialize` method for argument descriptions.""" self.needs_work_queue.clear() for solve_info in self._solve_info_by_source.values(): solve_info.reset() self.last_known_bound.clear() self.external_bounds.clear() self.has_work.clear() self._send_requests = None self.explored_nodes_count = 0 if best_node is not None: best_node = _SerializedNode.from_node(best_node) initialize_queue_ = DispatcherQueueData( nodes=[ _SerializedNode.from_node(node) if (type(node) is not _SerializedNode) else node for node in initialize_queue.nodes ], worst_terminal_bound=initialize_queue.worst_terminal_bound, sense=initialize_queue.sense, ) super(DispatcherDistributed, self).initialize( best_objective, best_node, initialize_queue_, queue_strategy, converger, node_limit, time_limit, queue_limit, track_bound, log, log_interval_seconds, log_new_incumbent, ) if self.journalist is not None: self.log_info( "Starting branch & bound solve:\n" " - dispatcher pid: %s (%s)\n" " - worker processes: %d" % (os.getpid(), socket.gethostname(), len(self.worker_ranks)) ) self.journalist.tic()
[docs] def update( self, best_objective, best_node, terminal_bound, solve_info, node_list, source ): """Update local worker information. Parameters ---------- best_objective : float or None A new potential best objective found by the worker. best_node : :class:`Node <pybnb.node.Node>` or None A new potential best node found by the worker. terminal_bound : float or None The worst bound of any terminal nodes that were processed by the worker since the last update. solve_info : :class:`_SolveInfo` The most up-to-date worker solve information. node_list : list A list of nodes to add to the queue. source : int The worker process rank that the update came from. Returns ------- solve_finished : bool Indicates if the dispatcher has terminated the solve. new_objective : float The best objective value known to the dispatcher. best_node : :class:`Node <pybnb.node.Node>` or None The best node known to the dispatcher. data : ``array.array`` or None If solve_finished is false, a data array representing a new node for the worker to process. Otherwise, a tuple containing the global bound, the termination condition string, and the number of explored nodes. """ assert self.initialized self._update_solve_info(solve_info.data, source) self.needs_work_queue.append(source) self.has_work.discard(source) if source in self.last_known_bound: val_ = self.last_known_bound[source] try: self.external_bounds.remove(val_) except ValueError: # rare, but can happen when # _check_update_best_node modifies # the external_bounds list pass if best_objective is not None: self._check_update_best_objective(best_objective) if best_node is not None: assert best_node._uuid is not None self._check_update_best_node(best_node) if len(node_list) > 0: for node in node_list: self._add_work_to_queue(node) if terminal_bound is not None: self._check_update_worst_terminal_bound(terminal_bound) last_global_bound = self.last_global_bound self._check_termination() ret = self._send_work() stop = ret[0] if not stop: if self.journalist is not None: force = (last_global_bound == self.converger.unbounded_objective) and ( last_global_bound != self.last_global_bound ) self.journalist.tic(force=force) else: if self.journalist is not None: self.journalist.tic(force=True) self.journalist.log_info(self.journalist._lines) assert self.initialized self.initialized = False return ret
# # Distributed Interface #
[docs] def serve(self): """Start listening for distributed branch-and-bound commands and map them to commands in the local dispatcher interface.""" def rebuild_update_requests(size): update_requests = {} update_data = bytearray(size) for i in self.worker_ranks: update_requests[i] = self.comm.Recv_init( update_data, source=i, tag=DispatcherAction.update ) return update_requests, update_data update_requests = None solve_info_ = _SolveInfo() data = None msg = Message(self.comm) while 1: msg.probe() tag = msg.tag source = msg.source if tag == DispatcherAction.update: size = msg.status.Get_count(datatype=mpi4py.MPI.BYTE) if (data is None) or (len(data) < size): update_requests, data = rebuild_update_requests(size) req = update_requests[msg.status.Get_source()] req.Start() req.Wait() if six.PY2: data_ = str(data) else: data_ = data ( best_objective, best_node, terminal_bound, solve_info_data, node_list, ) = marshal.loads(data_) solve_info_.data = array.array("d", solve_info_data) if best_node is not None: best_node = _SerializedNode(best_node) node_list = [_SerializedNode(state) for state in node_list] ret = self.update( best_objective, best_node, terminal_bound, solve_info_, node_list, source, ) stop = ret[0] if stop: best_node = ret[2] if best_node is not None: best_node = _SerializedNode.restore_node(best_node.slots) return ( ret[1], # best_objective best_node, ret[3][0], # global_bound ret[3][1], # termination_condition ret[3][2], ) # global_solve_info elif tag == DispatcherAction.log_info: msg.recv(mpi4py.MPI.CHAR) self.log_info(msg.data) elif tag == DispatcherAction.log_warning: msg.recv(mpi4py.MPI.CHAR) self.log_warning(msg.data) elif tag == DispatcherAction.log_debug: msg.recv(mpi4py.MPI.CHAR) self.log_debug(msg.data) elif tag == DispatcherAction.log_error: msg.recv(mpi4py.MPI.CHAR) self.log_error(msg.data) elif tag == DispatcherAction.log_critical: msg.recv(mpi4py.MPI.CHAR) self.log_critical(msg.data) elif tag == DispatcherAction.stop_listen: msg.recv() assert msg.data is None return (None, None, None, None, None) else: # pragma:nocover raise RuntimeError( "Dispatcher received invalid " "message tag '%s' from rank '%s'" % (tag, source) )
[docs] def save_dispatcher_queue(self): """Saves the current dispatcher queue. The result can be used to re-initialize a solve. Returns ------- queue_data : :class:`pybnb.dispatcher.DispatcherQueueData` An object storing information that can be used to re-initialize the dispatcher queue to its current state. """ nodes = [] for node in self.queue.items(): node_ = node.restore_node(node.slots) nodes.append(node_) return DispatcherQueueData( nodes=nodes, worst_terminal_bound=self.worst_terminal_bound, sense=self.converger.sense, )