Source code for pybnb.pyomo.range_reduction

"""
A Problem interface for implementing parallel range
reduction on a PyomoProblem during a branch-and-bound solve.

Copyright by Gabriel A. Hackebeil (gabe.hackebeil@gmail.com).
"""
import math
import array

from pybnb import inf, Problem
from pybnb.node import Node
from pybnb.mpi_utils import dispatched_partition
from pybnb.pyomo.misc import (
    hash_joblist,
    add_tmp_component,
    create_optimality_bound,
)
from pybnb.pyomo.problem import PyomoProblem

import pyomo.kernel as pmo

try:
    import mpi4py
except ImportError:  # pragma:nocover
    pass


[docs]class RangeReductionProblem(Problem): """A specialized implementation of the :class:`pybnb.Problem <pybnb.problem.Problem>` interface that can be used to perform optimality-based range reduction on a fully implemented :class:`PyomoProblem` by defining additional abstract methods.""" def __init__(self, problem, best_objective=None, comm=None): assert isinstance(problem, PyomoProblem) self.problem = problem assert best_objective != self.unbounded_objective self._best_objective = best_objective if self._best_objective is None: self._best_objective = self.problem.infeasible_objective() assert not math.isnan(self._best_objective) self._comm = comm if self._comm is not None: import mpi4py.MPI # noqa: F401 self._current_node = None def _notify_continue_listen(self, node): assert (self._comm is not None) and (self._comm.size > 1) data = array.array("d", [True, self._best_objective, len(node.state)]) self._comm.Bcast([data, mpi4py.MPI.DOUBLE], root=self._comm.rank) node.state = self._comm.bcast(node.state, root=self._comm.rank) def _notify_stop_listen(self): assert (self._comm is not None) and (self._comm.size > 1) data = array.array("d", [False, self._best_objective, 0]) self._comm.Bcast([data, mpi4py.MPI.DOUBLE], root=self._comm.rank) def _tighten_bounds(self): self.range_reduction_model_setup() assert self._best_objective != self.unbounded_objective() # setup objective assert self.problem.pyomo_model_objective.active self.problem.pyomo_model_objective.deactivate() tmp_objective = pmo.objective() tmp_objective_name = add_tmp_component( self.problem.pyomo_model, "rr_objective", tmp_objective ) # setup optimality bound if necessary tmp_optbound_name = None tmp_optbound = None if self._best_objective != self.infeasible_objective(): tmp_optbound = create_optimality_bound( self, self.problem.pyomo_model_objective, self._best_objective ) tmp_optbound_name = add_tmp_component( self.problem.pyomo_model, "optimality_bound", tmp_optbound ) self.range_reduction_constraint_added(tmp_optbound) try: return self._tighten_bounds_impl(tmp_objective) finally: # reset objective delattr(self.problem.pyomo_model, tmp_objective_name) self.problem.pyomo_model_objective.activate() self.range_reduction_objective_changed(self.problem.pyomo_model_objective) # remove optimality bound if it was added if tmp_optbound is not None: self.range_reduction_constraint_removed(tmp_optbound) delattr(self.problem.pyomo_model, tmp_optbound_name) self.range_reduction_model_cleanup() def _tighten_bounds_impl(self, tmp_objective): objlist = self.range_reduction_get_objects() joblist = [] objects = [] lower_bounds = [] upper_bounds = [] objects_seen = set() for i, val in enumerate(objlist): obj = None include = False val = val if type(val) is tuple else (val, True, True) assert len(val) == 3 obj = val[0] cid = self.problem.pyomo_object_to_cid[obj] if val[1]: include = True joblist.append((i, cid, "L")) if val[2]: include = True joblist.append((i, cid, "U")) joblist.append((i, cid, "U")) if include: assert obj is not None assert id(obj) not in objects_seen objects_seen.add(id(obj)) objects.append(obj) lower_bounds.append(pmo.value(obj.lb) if obj.has_lb() else -inf) upper_bounds.append(pmo.value(obj.ub) if obj.has_ub() else inf) lower_bounds = array.array("d", lower_bounds) upper_bounds = array.array("d", upper_bounds) # verify that everyone has the exact same list # (order and values), assumes everything in the list # has a well-defined hash if self._comm is not None: my_joblist_hash = hash_joblist(joblist) joblist_hash = self._comm.bcast(my_joblist_hash, root=0) assert joblist_hash == my_joblist_hash for i, cid, which in dispatched_partition(self._comm, joblist): obj = self.problem.cid_to_pyomo_object[cid] tmp_objective.expr = obj if which == "L": tmp_objective.sense = pmo.minimize else: assert which == "U" tmp_objective.sense = pmo.maximize self.range_reduction_objective_changed(tmp_objective) bound = self.range_reduction_solve_for_object_bound(obj) if bound is not None: if which == "L": lower_bounds[i] = bound else: assert which == "U" upper_bounds[i] = bound if self._comm is not None: lower_bounds_local = lower_bounds upper_bounds_local = upper_bounds lower_bounds = array.array("d", lower_bounds) upper_bounds = array.array("d", upper_bounds) self._comm.Allreduce( [lower_bounds_local, mpi4py.MPI.DOUBLE], [lower_bounds, mpi4py.MPI.DOUBLE], op=mpi4py.MPI.MAX, ) self._comm.Allreduce( [upper_bounds_local, mpi4py.MPI.DOUBLE], [upper_bounds, mpi4py.MPI.DOUBLE], op=mpi4py.MPI.MIN, ) return objects, lower_bounds, upper_bounds # # Interface #
[docs] def listen(self, root=0): """Listen for requests to run range reduction. All processes within the communicator, except for the root process, should call this method. Parameters ---------- root : int The rank of the process acting as the root. The root process should not call this function. """ assert self._comm.size > 1 assert self._comm.rank != root orig = Node() self.save_state(orig) node = Node() try: data = array.array("d", [0]) * 3 self._comm.Bcast([data, mpi4py.MPI.DOUBLE], root=root) again = bool(data[0]) self._best_objective = float(data[1]) while again: node.state = self._comm.bcast(node.state, root=root) self.load_state(node) self._tighten_bounds() self._comm.Bcast([data, mpi4py.MPI.DOUBLE], root=root) again = bool(data[0]) self._best_objective = float(data[1]) finally: self.load_state(orig)
# # Implement Problem abstract methods #
[docs] def sense(self): return self.problem.sense()
[docs] def objective(self): return self.problem.objective()
[docs] def bound(self): # tell the listeners to start bounds tightening node = Node() self.save_state(node) continue_loop = True while continue_loop: if (self._comm is not None) and (self._comm.size > 1): self._notify_continue_listen(node) continue_loop = self.range_reduction_process_bounds(*self._tighten_bounds()) self.save_state(node) return self.problem.bound()
[docs] def save_state(self, node): self.problem.save_state(node)
[docs] def load_state(self, node): self.problem.load_state(node)
[docs] def branch(self): return self.problem.branch()
[docs] def notify_new_best_node(self, node, current): self._best_objective = node.objective
[docs] def notify_solve_finished(self, comm, worker_comm, results): if (self._comm is not None) and (self._comm.size > 1): self._notify_stop_listen()
# # Abstract Methods #
[docs] def range_reduction_model_setup(self): """Called prior to starting range reduction solves to set up the Pyomo model""" raise NotImplementedError() # pragma:nocover
[docs] def range_reduction_objective_changed(self, objective): """Called to notify that the range reduction routine has changed the objective""" raise NotImplementedError() # pragma:nocover
[docs] def range_reduction_constraint_added(self, constraint): """Called to notify that the range reduction routine has added a constraint""" raise NotImplementedError() # pragma:nocover
[docs] def range_reduction_constraint_removed(self, constraint): """Called to notify that the range reduction routine has removed a constraint""" raise NotImplementedError() # pragma:nocover
[docs] def range_reduction_get_objects(self): """Called to collect the set of objects over which to perform range reduction solves""" raise NotImplementedError() # pragma:nocover
[docs] def range_reduction_solve_for_object_bound(self, x): """Called to perform a range reduction solve for a Pyomo model object""" raise NotImplementedError() # pragma:nocover
[docs] def range_reduction_model_cleanup(self): """Called after range reduction has finished to allow the user to execute any cleanup to the Pyomo model.""" raise NotImplementedError() # pragma:nocover
[docs] def range_reduction_process_bounds(self, objects, lower_bounds, upper_bounds): """Called to process the bounds obtained by the range reduction solves""" raise NotImplementedError() # pragma:nocover