"""
A collection of priority queue implementations that can be
used by the dispatcher.
Copyright by Gabriel A. Hackebeil (gabe.hackebeil@gmail.com).
"""
from typing import (
Type,
Dict,
Any,
Optional,
Tuple,
Callable,
List,
Iterator,
Iterable,
Union,
TypeVar,
Generic,
cast,
)
import random
import collections
import heapq
import math
from sortedcontainers import SortedList
import six
from pybnb.common import minimize, maximize, ProblemSense, inf
from pybnb.node import Node, PriorityType
T = TypeVar("T")
class _NoThreadingMaxPriorityFirstQueue(Generic[T]):
"""A simple priority queue implementation that is not
thread safe. When the queue is not empty, the item with
the highest priority is next.
This queue implementation is not allowed to store None.
"""
requires_priority = True # type: bool
def __init__(self):
# type: () -> None
self._count = 0 # type: int
self._heap = [] # type: List[Tuple[PriorityType, int, T]]
def _negate(self, priority):
# type: (PriorityType) -> PriorityType
if not hasattr(priority, "__iter__"):
return -priority # type: ignore
else:
return tuple(-v for v in priority) # type: ignore
def size(self):
# type: () -> int
"""Returns the size of the queue."""
return len(self._heap)
def put(self, item, priority, _push_=heapq.heappush):
# type: (T, PriorityType, Any) -> int
"""Puts an item into the queue with the given
priority. Items placed in the queue may not be
None. This method returns a unique counter associated
with each put."""
if item is None:
raise ValueError("queue item can not be None")
cnt = self._count
self._count += 1
_push_(self._heap, (self._negate(priority), cnt, item))
return cnt
def get(self, _pop_=heapq.heappop):
# type: (Any) -> Optional[T]
"""Removes and returns the highest priority item in
the queue, where ties are broken by the order items
were placed in the queue. If the queue is empty,
returns None."""
if len(self._heap) > 0:
return _pop_(self._heap)[2]
else:
return None
def put_get(self, item, priority, _push_pop_=heapq.heappushpop):
# type: (T, PriorityType, Any) -> Tuple[int, T]
"""Combines a put and get call, which can be more
efficient than two separate put and get
calls. Returns a tuple containing the put and get
return values."""
if item is None:
raise ValueError("queue item can not be None")
cnt = self._count
self._count += 1
if len(self._heap) > 0:
item_ = _push_pop_(self._heap, (self._negate(priority), cnt, item))[2]
return cnt, item_
else:
return cnt, item
def next(self):
# type: () -> Tuple[int, T]
"""Returns, without modifying the queue, a tuple of
the form (cnt, item), where item is highest priority
entry in the queue and cnt is the unique counter
assigned to it when it was added to the queue.
Raises
------
IndexError
If the queue is empty.
"""
try:
return self._heap[0][1:]
except IndexError:
raise IndexError("The queue is empty")
def filter(self, func, include_counters=False):
# type: (Callable[[T], bool], bool) -> List[Union[T, Tuple[int, T]]]
"""Removes items from the queue for which
`func(item)` returns False. The list of items
removed is returned. If `include_counters` is set to
True, values in the returned list will have the form
(cnt, item), where cnt is a unique counter that was
created for the item when it was added to the
queue."""
heap_new = []
removed = [] # type: List[Union[T, Tuple[int, T]]]
for priority, cnt, item in self._heap:
if func(item):
heap_new.append((priority, cnt, item))
elif not include_counters:
removed.append(item)
else:
removed.append((cnt, item))
heapq.heapify(heap_new)
self._heap = heap_new
return removed
def items(self):
# type: () -> Iterator[T]
"""Iterates over the queued items in arbitrary order
without modifying the queue."""
for _, _, item in self._heap:
yield item
class _NoThreadingFIFOQueue(Generic[T]):
"""A simple first-in, first-out queue implementation
that is not thread safe.
This queue implementation is not allowed to store None.
"""
requires_priority = False # type: bool
def __init__(self):
# type: () -> None
self._count = 0 # type: int
self._deque = collections.deque() # type: collections.deque
def size(self):
# type: () -> int
"""Returns the size of the queue."""
return len(self._deque)
def put(self, item):
# type: (T) -> int
"""Puts an item into the queue. Items placed in the
queue may not be None. This method returns a unique
counter associated with each put."""
if item is None:
raise ValueError("queue item can not be None")
cnt = self._count
self._count += 1
self._deque.append((cnt, item))
return cnt
def get(self):
# type: () -> Optional[T]
"""Removes and returns the next item in the
queue. If the queue is empty, returns None."""
if len(self._deque) > 0:
return self._deque.popleft()[1]
else:
return None
def put_get(self, item):
# type: (T) -> Tuple[int, T]
"""Combines a put and get call, which can be more
efficient than two separate put and get
calls. Returns a tuple containing the put and get
return values."""
if item is None:
raise ValueError("queue item can not be None")
cnt = self._count
self._count += 1
if len(self._deque) > 0:
self._deque.rotate(-1)
return_item = self._deque[-1][1]
self._deque[-1] = (cnt, item)
return cnt, return_item
else:
return cnt, item
def next(self):
# type: () -> Tuple[int, T]
"""Returns, without modifying the queue, a tuple of
the form (cnt, item), where item is highest priority
entry in the queue and cnt is the unique counter
assigned to it when it was added to the queue.
Raises
------
IndexError
If the queue is empty.
"""
try:
return self._deque[0]
except IndexError:
raise IndexError("The queue is empty")
def filter(self, func, include_counters=False):
# type: (Callable[[T], bool], bool) -> List[T]
"""Removes items from the queue for which
`func(item)` returns False. The list of items
removed is returned. If `include_counters` is set to
True, values in the returned list will have the form
(cnt, item), where cnt is a unique counter that was
created for the item when it was added to the
queue."""
deque_new = collections.deque() # type: collections.deque
removed = []
for cnt, item in self._deque:
if func(item):
deque_new.append((cnt, item))
elif not include_counters:
removed.append(item)
else:
removed.append((cnt, item))
self._deque = deque_new
return removed
def items(self):
# type: () -> Iterator[T]
"""Iterates over the queued items in arbitrary order
without modifying the queue."""
for _, item in self._deque:
yield item
class _NoThreadingLIFOQueue(Generic[T]):
"""A simple last-in, first-out queue implementation
that is not thread safe.
This queue implementation is not allowed to store None.
"""
requires_priority = False # type: bool
def __init__(self):
# type: () -> None
self._count = 0 # type: int
self._items = [] # type: List[Tuple[int, T]]
def size(self):
# type: () -> int
"""Returns the size of the queue."""
return len(self._items)
def put(self, item):
# type: (T) -> int
"""Puts an item into the queue. Items placed in the
queue may not be None. This method returns a unique
counter associated with each put."""
if item is None:
raise ValueError("queue item can not be None")
cnt = self._count
self._count += 1
self._items.append((cnt, item))
return cnt
def get(self):
# type: () -> Optional[T]
"""Removes and returns the next item in the
queue. If the queue is empty, returns None."""
if len(self._items) > 0:
return self._items.pop()[1]
else:
return None
def put_get(self, item):
# type: (T) -> Tuple[int, T]
"""Combines a put and get call, which can be more
efficient than two separate put and get
calls. Returns a tuple containing the put and get
return values."""
if item is None:
raise ValueError("queue item can not be None")
cnt = self._count
self._count += 1
return cnt, item
def next(self):
# type: () -> Tuple[int, T]
"""Returns, without modifying the queue, a tuple of
the form (cnt, item), where item is highest priority
entry in the queue and cnt is the unique counter
assigned to it when it was added to the queue.
Raises
------
IndexError
If the queue is empty.
"""
try:
return self._items[-1]
except IndexError:
raise IndexError("The queue is empty")
def filter(self, func, include_counters=False):
# type: (Callable[[T], bool], bool) -> List[Union[T, Tuple[int, T]]]
"""Removes items from the queue for which
`func(item)` returns False. The list of items
removed is returned. If `include_counters` is set to
True, values in the returned list will have the form
(cnt, item), where cnt is a unique counter that was
created for the item when it was added to the
queue."""
items_new = []
removed = [] # type: List[Union[T, Tuple[int, T]]]
for cnt, item in self._items:
if func(item):
items_new.append((cnt, item))
elif not include_counters:
removed.append(item)
else:
removed.append((cnt, item))
self._items = items_new
return removed
def items(self):
# type: () -> Iterator[T]
"""Iterates over the queued items in arbitrary order
without modifying the queue."""
for _, item in self._items:
yield item
SimpleQueueType = Union[
Type[_NoThreadingMaxPriorityFirstQueue[Node]],
Type[_NoThreadingFIFOQueue[Node]],
Type[_NoThreadingLIFOQueue[Node]],
]
[docs]class IPriorityQueue(object):
"""The abstract interface for priority queues that store
node data for the dispatcher."""
def __init__(self, *args, **kwds):
raise NotImplementedError # pragma:nocover
@staticmethod
def generate_priority(node, sense, queue):
# type: (Node, ProblemSense, Any) -> Union[int, float]
raise NotImplementedError() # pragma:nocover
[docs] def size(self):
# type: () -> int
"""Returns the size of the queue."""
raise NotImplementedError() # pragma:nocover
[docs] def put(self, node):
# type: (Node) -> int
"""Puts an node in the queue, possibly updating the
value of :attr:`queue_priority <pybnb.node.Node.queue_priority>`,
depending on the queue implementation. This method
returns a unique counter associated with each
put."""
raise NotImplementedError() # pragma:nocover
[docs] def get(self):
# type: () -> Optional[Node]
"""Returns the next node in the queue. If the queue
is empty, returns None."""
raise NotImplementedError() # pragma:nocover
[docs] def bound(self):
# type: () -> Optional[Union[int, float]]
"""Returns the weakest bound of all nodes in the
queue. If the queue is empty, returns None."""
raise NotImplementedError() # pragma:nocover
[docs] def filter(self, func):
# type: (Callable[[Node], bool]) -> List[Node]
"""Removes nodes from the queue for which
`func(node)` returns False. The list of nodes
removed is returned. If the queue is empty or no
nodes are removed, the returned list will be
empty."""
raise NotImplementedError() # pragma:nocover
[docs] def items(self):
# type: () -> Iterator[Node]
"""Iterates over the queued nodes in arbitrary order
without modifying the queue."""
raise NotImplementedError() # pragma:nocover
[docs]class WorstBoundFirstPriorityQueue(IPriorityQueue):
"""A priority queue implementation that serves nodes
with the worst bound first.
Parameters
----------
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
track_bound : bool
Indicates whether or not to track the global queue
bound. Note that this particular queue
implementation always tracks the global bound. This
argument is ignored.
"""
def __init__(self, sense, track_bound):
# type: (ProblemSense, bool) -> None
assert sense in ProblemSense
self._sense = sense # type: ProblemSense
self._queue = _NoThreadingMaxPriorityFirstQueue[Node]()
@staticmethod
def generate_priority(node, sense, queue):
# type: (Node, ProblemSense, Any) -> Union[int, float]
bound = node.bound
assert bound is not None
assert not math.isnan(bound)
if sense == minimize:
return -bound
else:
assert sense == maximize
return bound
[docs] def size(self):
# type: () -> int
return self._queue.size()
[docs] def put(self, node):
# type: (Node) -> int
node.queue_priority = self.generate_priority(node, self._sense, None)
return self._queue.put(node, node.queue_priority)
[docs] def get(self):
# type: () -> Optional[Node]
return self._queue.get()
[docs] def bound(self):
# type: () -> Optional[Union[int, float]]
try:
return self._queue.next()[1].bound
except IndexError:
return None
[docs] def filter(self, func):
# type: (Callable[[Node], bool]) -> List[Node]
return cast(List[Node], self._queue.filter(func))
[docs] def items(self):
# type: () -> Iterator[Node]
return self._queue.items()
[docs]class CustomPriorityQueue(IPriorityQueue):
"""A priority queue implementation that can handle
custom node priorities. It uses an additional data
structure to reduce the amount of time it takes to
compute a queue bound.
Parameters
----------
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
track_bound : bool
Indicates whether or not to track the global queue
bound.
"""
def __init__(
self, sense, track_bound, _queue_type_=_NoThreadingMaxPriorityFirstQueue[Node]
):
# type: (ProblemSense, bool, SimpleQueueType) -> None
assert sense in ProblemSense
self._sense = sense
self._queue = _queue_type_()
self._sorted_by_bound = None
if track_bound:
self._sorted_by_bound = SortedList()
[docs] def size(self):
# type: () -> int
return self._queue.size()
[docs] def put(self, node):
# type: (Node) -> int
if self._queue.requires_priority:
priority = node.queue_priority
if priority is None:
raise ValueError("A node queue priority is required")
cnt = self._queue.put(node, priority) # type: ignore
else:
cnt = self._queue.put(node) # type: ignore
if self._sorted_by_bound is not None:
bound = node.bound
assert bound is not None
assert not math.isnan(bound)
if self._sense == maximize:
self._sorted_by_bound.add((-bound, cnt, node))
else:
self._sorted_by_bound.add((bound, cnt, node))
return cnt
[docs] def get(self):
# type: () -> Optional[Node]
if self._queue.size() > 0:
cnt, tmp_ = self._queue.next()
node = self._queue.get()
assert node is not None
assert tmp_ is node
if self._sorted_by_bound is not None:
bound = node.bound
assert bound is not None
if self._sense == maximize:
self._sorted_by_bound.remove((-bound, cnt, node))
else:
self._sorted_by_bound.remove((bound, cnt, node))
return node
else:
return None
[docs] def bound(self):
# type: () -> Optional[Union[int, float]]
if self._sorted_by_bound is not None:
try:
return self._sorted_by_bound[0][2].bound
except IndexError:
return None
else:
if self.size() > 0:
if self._sense == maximize:
return inf
else:
return -inf
else:
return None
[docs] def filter(self, func):
# type: (Callable[[Node], bool]) -> List[Node]
removed = []
if self._sorted_by_bound is not None:
for item in self._queue.filter(func, include_counters=True):
cnt, node = cast(Tuple[int, Node], item)
removed.append(node)
bound = node.bound
assert bound is not None
if self._sense == maximize:
self._sorted_by_bound.remove((-bound, cnt, node))
else:
self._sorted_by_bound.remove((bound, cnt, node))
else:
removed.extend(
cast(List[Node], self._queue.filter(func, include_counters=False))
)
return removed
[docs] def items(self):
# type: () -> Iterator[Node]
return self._queue.items()
[docs]class BestObjectiveFirstPriorityQueue(CustomPriorityQueue):
"""A priority queue implementation that serves nodes
with the best objective first.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
@staticmethod
def generate_priority(node, sense, queue):
# type: (Node, ProblemSense, Any) -> Union[int, float]
objective = node.objective
assert objective is not None
assert not math.isnan(objective)
if sense == minimize:
return -objective
else:
assert sense == maximize
return objective
[docs] def put(self, node):
# type: (Node) -> int
node.queue_priority = self.generate_priority(node, self._sense, None)
return super(BestObjectiveFirstPriorityQueue, self).put(node)
[docs]class BreadthFirstPriorityQueue(CustomPriorityQueue):
"""A priority queue implementation that serves nodes in
breadth-first order.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
@staticmethod
def generate_priority(node, sense, queue):
# type: (Node, ProblemSense, Any) -> Union[int, float]
tree_depth = node.tree_depth
assert tree_depth is not None
assert tree_depth >= 0
return -tree_depth
[docs] def put(self, node):
# type: (Node) -> int
node.queue_priority = self.generate_priority(node, self._sense, None)
return super(BreadthFirstPriorityQueue, self).put(node)
[docs]class DepthFirstPriorityQueue(CustomPriorityQueue):
"""A priority queue implementation that serves nodes in
depth-first order.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
@staticmethod
def generate_priority(node, sense, queue):
# type: (Node, ProblemSense, Any) -> Union[int, float]
tree_depth = node.tree_depth
assert tree_depth is not None
assert tree_depth >= 0
return tree_depth
[docs] def put(self, node):
# type: (Node) -> int
node.queue_priority = self.generate_priority(node, self._sense, None)
return super(DepthFirstPriorityQueue, self).put(node)
[docs]class FIFOQueue(CustomPriorityQueue):
"""A priority queue implementation that serves nodes in
first-in, first-out order.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
def __init__(self, sense, track_bound):
# type: (ProblemSense, bool) -> None
super(FIFOQueue, self).__init__(
sense, track_bound, _queue_type_=_NoThreadingFIFOQueue[Node]
)
@staticmethod
def generate_priority(node, sense, queue):
# type: (Node, ProblemSense, Any) -> Union[int, float]
return -queue._count
[docs] def put(self, node):
# type: (Node) -> int
node.queue_priority = self.generate_priority(node, self._sense, self._queue)
cnt = super(FIFOQueue, self).put(node)
assert node.queue_priority == -cnt
return cnt
[docs]class LIFOQueue(CustomPriorityQueue):
"""A priority queue implementation that serves nodes in
last-in, first-out order.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
def __init__(self, sense, track_bound):
# type: (ProblemSense, bool) -> None
super(LIFOQueue, self).__init__(
sense, track_bound, _queue_type_=_NoThreadingLIFOQueue[Node]
)
@staticmethod
def generate_priority(node, sense, queue):
# type: (Node, ProblemSense, Any) -> Union[int, float]
return queue._count
[docs] def put(self, node):
# type: (Node) -> int
node.queue_priority = self.generate_priority(node, self._sense, self._queue)
cnt = super(LIFOQueue, self).put(node)
assert node.queue_priority == cnt
return cnt
[docs]class RandomPriorityQueue(CustomPriorityQueue):
"""A priority queue implementation that assigns
a random priority to each incoming node.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
@staticmethod
def generate_priority(node, sense, queue):
# type: (Node, ProblemSense, Any) -> Union[int, float]
return random.random()
[docs] def put(self, node):
# type: (Node) -> int
node.queue_priority = self.generate_priority(node, self._sense, None)
return super(RandomPriorityQueue, self).put(node)
[docs]class LocalGapPriorityQueue(CustomPriorityQueue):
"""A priority queue implementation that serves nodes
with the largest gap between the local objective and
bound first.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
@staticmethod
def generate_priority(node, sense, queue):
# type: (Node, ProblemSense, Any) -> Union[int, float]
objective = node.objective
bound = node.bound
assert objective is not None
assert bound is not None
if sense == minimize:
gap = objective - bound
else:
assert sense == maximize
gap = bound - objective
assert not math.isnan(gap)
return gap
[docs] def put(self, node):
# type: (Node) -> int
node.queue_priority = self.generate_priority(node, self._sense, None)
return super(LocalGapPriorityQueue, self).put(node)
[docs]class LexicographicPriorityQueue(CustomPriorityQueue):
"""A priority queue implementation that serves nodes
with the largest gap between the local objective and
bound first.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
def __init__(self, queue_types, sense, track_bound):
# type: (Iterable[Type[IPriorityQueue]], ProblemSense, bool) -> None
self._queue_types = tuple(queue_types) # Tuple[Type[IPriorityQueue], ...]
assert len(self._queue_types)
super(LexicographicPriorityQueue, self).__init__(sense, track_bound)
def _generate_priority(self, node):
# type: (Node) -> Tuple[Union[int, float], ...]
return tuple(
qt.generate_priority(node, self._sense, self._queue)
for qt in self._queue_types
)
[docs] def put(self, node):
# type: (Node) -> int
node.queue_priority = self._generate_priority(node)
return super(LexicographicPriorityQueue, self).put(node)
_registered_queue_types = {} # type: Dict[str, Type[IPriorityQueue]]
[docs]def PriorityQueueFactory(name, *args, **kwds):
# type: (str, Any, Any) -> IPriorityQueue
"""Returns a new instance of the priority queue type
registered under the given name."""
if isinstance(name, six.string_types):
if name not in _registered_queue_types:
raise ValueError("invalid queue type: %s" % (name))
return _registered_queue_types[name](*args, **kwds)
else:
names = []
for n_ in name:
if n_ not in _registered_queue_types:
raise ValueError("invalid queue type: %s" % (n_))
if n_ == "custom":
raise ValueError(
"'custom' queue type not "
"allowed when defining a "
"lexicographic queue strategy"
)
names.append(_registered_queue_types[n_])
if len(names) == 0:
raise ValueError(
"Can not define lexicographic queue strategy with empty list"
)
return LexicographicPriorityQueue(names, *args, **kwds)
[docs]def register_queue_type(name, cls):
# type: (str, Type[IPriorityQueue]) -> None
"""Registers a new priority queue class with the
PriorityQueueFactory."""
if (name in _registered_queue_types) and (_registered_queue_types[name] is not cls):
raise ValueError(
"The name '%s' has already been registered"
"for priority queue type '%s'" % (name, cls)
)
_registered_queue_types[name] = cls
register_queue_type("bound", WorstBoundFirstPriorityQueue)
register_queue_type("custom", CustomPriorityQueue)
register_queue_type("objective", BestObjectiveFirstPriorityQueue)
register_queue_type("breadth", BreadthFirstPriorityQueue)
register_queue_type("depth", DepthFirstPriorityQueue)
register_queue_type("fifo", FIFOQueue)
register_queue_type("lifo", LIFOQueue)
register_queue_type("random", RandomPriorityQueue)
register_queue_type("local_gap", LocalGapPriorityQueue)