"""
A collection of priority queue implementations that can be
used by the dispatcher.
Copyright by Gabriel A. Hackebeil (gabe.hackebeil@gmail.com).
"""
import random
import collections
import heapq
import math
from pybnb.common import (minimize,
maximize,
inf)
from sortedcontainers import SortedList
import six
class _NoThreadingMaxPriorityFirstQueue(object):
"""A simple priority queue implementation that is not
thread safe. When the queue is not empty, the item with
the highest priority is next.
This queue implementation is not allowed to store None.
"""
requires_priority = True
def __init__(self):
self._count = 0
self._heap = []
def _negate(self, priority):
if hasattr(priority, '__neg__'):
return -priority
else:
return tuple(-v for v in priority)
def size(self):
"""Returns the size of the queue."""
return len(self._heap)
def put(self, item, priority, _push_=heapq.heappush):
"""Puts an item into the queue with the given
priority. Items placed in the queue may not be
None. This method returns a unique counter associated
with each put."""
if item is None:
raise ValueError("queue item can not be None")
cnt = self._count
self._count += 1
_push_(self._heap, (self._negate(priority), cnt, item))
return cnt
def get(self, _pop_=heapq.heappop):
"""Removes and returns the highest priority item in
the queue, where ties are broken by the order items
were placed in the queue. If the queue is empty,
returns None."""
if len(self._heap) > 0:
return _pop_(self._heap)[2]
else:
return None
def put_get(self, item, priority, _push_pop_=heapq.heappushpop):
"""Combines a put and get call, which can be more
efficient than two separate put and get
calls. Returns a tuple containing the put and get
return values."""
if item is None:
raise ValueError("queue item can not be None")
cnt = self._count
self._count += 1
if len(self._heap) > 0:
item_ = _push_pop_(self._heap,
(self._negate(priority), cnt, item))[2]
return cnt, item_
else:
return cnt, item
def next(self):
"""Returns, without modifying the queue, a tuple of
the form (cnt, item), where item is highest priority
entry in the queue and cnt is the unique counter
assigned to it when it was added to the queue.
Raises
------
IndexError
If the queue is empty.
"""
try:
return self._heap[0][1:]
except IndexError:
raise IndexError("The queue is empty")
def filter(self, func, include_counters=False):
"""Removes items from the queue for which
`func(item)` returns False. The list of items
removed is returned. If `include_counters` is set to
True, values in the returned list will have the form
(cnt, item), where cnt is a unique counter that was
created for the item when it was added to the
queue."""
heap_new = []
removed = []
for priority, cnt, item in self._heap:
if func(item):
heap_new.append((priority, cnt, item))
elif not include_counters:
removed.append(item)
else:
removed.append((cnt,item))
heapq.heapify(heap_new)
self._heap = heap_new
return removed
def items(self):
"""Iterates over the queued items in arbitrary order
without modifying the queue."""
for _,_,item in self._heap:
yield item
class _NoThreadingFIFOQueue(object):
"""A simple first-in, first-out queue implementation
that is not thread safe.
This queue implementation is not allowed to store None.
"""
requires_priority = False
def __init__(self):
self._count = 0
self._deque = collections.deque()
def size(self):
"""Returns the size of the queue."""
return len(self._deque)
def put(self, item):
"""Puts an item into the queue. Items placed in the
queue may not be None. This method returns a unique
counter associated with each put."""
if item is None:
raise ValueError("queue item can not be None")
cnt = self._count
self._count += 1
self._deque.append((cnt, item))
return cnt
def get(self):
"""Removes and returns the next item in the
queue. If the queue is empty, returns None."""
if len(self._deque) > 0:
return self._deque.popleft()[1]
else:
return None
def put_get(self, item):
"""Combines a put and get call, which can be more
efficient than two separate put and get
calls. Returns a tuple containing the put and get
return values."""
if item is None:
raise ValueError("queue item can not be None")
cnt = self._count
self._count += 1
if len(self._deque) > 0:
self._deque.rotate(-1)
return_item = self._deque[-1][1]
self._deque[-1] = (cnt, item)
return cnt, return_item
else:
return cnt, item
def next(self):
"""Returns, without modifying the queue, a tuple of
the form (cnt, item), where item is highest priority
entry in the queue and cnt is the unique counter
assigned to it when it was added to the queue.
Raises
------
IndexError
If the queue is empty.
"""
try:
return self._deque[0]
except IndexError:
raise IndexError("The queue is empty")
def filter(self, func, include_counters=False):
"""Removes items from the queue for which
`func(item)` returns False. The list of items
removed is returned. If `include_counters` is set to
True, values in the returned list will have the form
(cnt, item), where cnt is a unique counter that was
created for the item when it was added to the
queue."""
deque_new = collections.deque()
removed = []
for cnt, item in self._deque:
if func(item):
deque_new.append((cnt, item))
elif not include_counters:
removed.append(item)
else:
removed.append((cnt,item))
self._deque = deque_new
return removed
def items(self):
"""Iterates over the queued items in arbitrary order
without modifying the queue."""
for _,item in self._deque:
yield item
class _NoThreadingLIFOQueue(object):
"""A simple last-in, first-out queue implementation
that is not thread safe.
This queue implementation is not allowed to store None.
"""
requires_priority = False
def __init__(self):
self._count = 0
self._items = []
def size(self):
"""Returns the size of the queue."""
return len(self._items)
def put(self, item):
"""Puts an item into the queue. Items placed in the
queue may not be None. This method returns a unique
counter associated with each put."""
if item is None:
raise ValueError("queue item can not be None")
cnt = self._count
self._count += 1
self._items.append((cnt, item))
return cnt
def get(self):
"""Removes and returns the next item in the
queue. If the queue is empty, returns None."""
if len(self._items) > 0:
return self._items.pop()[1]
else:
return None
def put_get(self, item):
"""Combines a put and get call, which can be more
efficient than two separate put and get
calls. Returns a tuple containing the put and get
return values."""
if item is None:
raise ValueError("queue item can not be None")
cnt = self._count
self._count += 1
return cnt, item
def next(self):
"""Returns, without modifying the queue, a tuple of
the form (cnt, item), where item is highest priority
entry in the queue and cnt is the unique counter
assigned to it when it was added to the queue.
Raises
------
IndexError
If the queue is empty.
"""
try:
return self._items[-1]
except IndexError:
raise IndexError("The queue is empty")
def filter(self, func, include_counters=False):
"""Removes items from the queue for which
`func(item)` returns False. The list of items
removed is returned. If `include_counters` is set to
True, values in the returned list will have the form
(cnt, item), where cnt is a unique counter that was
created for the item when it was added to the
queue."""
items_new = []
removed = []
for cnt, item in self._items:
if func(item):
items_new.append((cnt, item))
elif not include_counters:
removed.append(item)
else:
removed.append((cnt,item))
self._items = items_new
return removed
def items(self):
"""Iterates over the queued items in arbitrary order
without modifying the queue."""
for _,item in self._items:
yield item
[docs]class IPriorityQueue(object):
"""The abstract interface for priority queues that store
node data for the dispatcher."""
[docs] def size(self): #pragma:nocover
"""Returns the size of the queue."""
raise NotImplementedError
[docs] def put(self, node): #pragma:nocover
"""Puts an node in the queue, possibly updating the
value of :attr:`queue_priority <pybnb.node.Node.queue_priority>`,
depending on the queue implementation. This method
returns a unique counter associated with each
put."""
raise NotImplementedError()
[docs] def get(self): #pragma:nocover
"""Returns the next node in the queue. If the queue
is empty, returns None."""
raise NotImplementedError()
[docs] def bound(self): #pragma:nocover
"""Returns the weakest bound of all nodes in the
queue. If the queue is empty, returns None."""
raise NotImplementedError()
[docs] def filter(self, func): #pragma:nocover
"""Removes nodes from the queue for which
`func(node)` returns False. The list of nodes
removed is returned. If the queue is empty or no
nodes are removed, the returned list will be
empty."""
raise NotImplementedError()
[docs] def items(self): #pragma:nocover
"""Iterates over the queued nodes in arbitrary order
without modifying the queue."""
raise NotImplementedError()
[docs]class WorstBoundFirstPriorityQueue(IPriorityQueue):
"""A priority queue implementation that serves nodes
with the worst bound first.
Parameters
----------
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
track_bound : bool
Indicates whether or not to track the global queue
bound. Note that this particular queue
implementation always tracks the global bound. This
argument is ignored.
"""
def __init__(self,
sense,
track_bound):
assert sense in (minimize, maximize)
self._sense = sense
self._queue = _NoThreadingMaxPriorityFirstQueue()
@staticmethod
def generate_priority(node, sense, queue):
bound = node.bound
assert not math.isnan(bound)
if sense == minimize:
return -bound
else:
assert sense == maximize
return bound
[docs] def size(self):
return self._queue.size()
[docs] def put(self, node):
node.queue_priority = self.generate_priority(node,
self._sense,
None)
return self._queue.put(node, node.queue_priority)
[docs] def get(self):
return self._queue.get()
[docs] def bound(self):
try:
return self._queue.next()[1].bound
except IndexError:
return None
[docs] def filter(self, func):
return self._queue.filter(func)
[docs] def items(self):
return self._queue.items()
[docs]class CustomPriorityQueue(IPriorityQueue):
"""A priority queue implementation that can handle
custom node priorities. It uses an additional data
structure to reduce the amount of time it takes to
compute a queue bound.
Parameters
----------
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
track_bound : bool
Indicates whether or not to track the global queue
bound.
"""
def __init__(self,
sense,
track_bound,
_queue_type_=_NoThreadingMaxPriorityFirstQueue):
assert sense in (minimize, maximize)
self._sense = sense
self._queue = _queue_type_()
self._sorted_by_bound = None
if track_bound:
self._sorted_by_bound = SortedList()
[docs] def size(self):
return self._queue.size()
[docs] def put(self, node):
if self._queue.requires_priority:
priority = node.queue_priority
if priority is None:
raise ValueError("A node queue priority is required")
cnt = self._queue.put(node, priority)
else:
cnt = self._queue.put(node)
if self._sorted_by_bound is not None:
bound = node.bound
assert not math.isnan(bound)
if self._sense == maximize:
self._sorted_by_bound.add((-bound, cnt, node))
else:
self._sorted_by_bound.add((bound, cnt, node))
return cnt
[docs] def get(self):
if self._queue.size() > 0:
cnt, tmp_ = self._queue.next()
assert type(cnt) is int
node = self._queue.get()
assert tmp_ is node
if self._sorted_by_bound is not None:
bound = node.bound
if self._sense == maximize:
self._sorted_by_bound.remove((-bound, cnt, node))
else:
self._sorted_by_bound.remove((bound, cnt, node))
return node
else:
return None
[docs] def bound(self):
if self._sorted_by_bound is not None:
try:
return self._sorted_by_bound[0][2].bound
except IndexError:
return None
else:
if self.size() > 0:
if self._sense == maximize:
return inf
else:
return -inf
else:
return None
[docs] def filter(self, func):
removed = []
if self._sorted_by_bound is not None:
for cnt, node in self._queue.filter(
func,
include_counters=True):
removed.append(node)
bound = node.bound
if self._sense == maximize:
self._sorted_by_bound.remove((-bound, cnt, node))
else:
self._sorted_by_bound.remove((bound, cnt, node))
else:
for cnt, node in self._queue.filter(
func,
include_counters=True):
removed.append(node)
return removed
[docs] def items(self):
return self._queue.items()
[docs]class BestObjectiveFirstPriorityQueue(CustomPriorityQueue):
"""A priority queue implementation that serves nodes
with the best objective first.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
@staticmethod
def generate_priority(node, sense, queue):
objective = node.objective
assert not math.isnan(objective)
if sense == minimize:
return -objective
else:
assert sense == maximize
return objective
[docs] def put(self, node):
node.queue_priority = self.generate_priority(node,
self._sense,
None)
return super(BestObjectiveFirstPriorityQueue, self).put(node)
[docs]class BreadthFirstPriorityQueue(CustomPriorityQueue):
"""A priority queue implementation that serves nodes in
breadth-first order.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
@staticmethod
def generate_priority(node, sense, queue):
assert node.tree_depth >= 0
return -node.tree_depth
[docs] def put(self, node):
node.queue_priority = self.generate_priority(node,
None,
None)
return super(BreadthFirstPriorityQueue, self).put(node)
[docs]class DepthFirstPriorityQueue(CustomPriorityQueue):
"""A priority queue implementation that serves nodes in
depth-first order.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
@staticmethod
def generate_priority(node, sense, queue):
assert node.tree_depth >= 0
return node.tree_depth
[docs] def put(self, node):
node.queue_priority = self.generate_priority(node,
None,
None)
return super(DepthFirstPriorityQueue, self).put(node)
[docs]class FIFOQueue(CustomPriorityQueue):
"""A priority queue implementation that serves nodes in
first-in, first-out order.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
def __init__(self,
sense,
track_bound):
super(FIFOQueue, self).__init__(
sense,
track_bound,
_queue_type_=_NoThreadingFIFOQueue)
@staticmethod
def generate_priority(node, sense, queue):
return -queue._count
[docs] def put(self, node):
node.queue_priority = self.generate_priority(None,
None,
self._queue)
cnt = super(FIFOQueue, self).put(node)
assert node.queue_priority == -cnt
return cnt
[docs]class LIFOQueue(CustomPriorityQueue):
"""A priority queue implementation that serves nodes in
last-in, first-out order.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
def __init__(self,
sense,
track_bound):
super(LIFOQueue, self).__init__(
sense,
track_bound,
_queue_type_=_NoThreadingLIFOQueue)
@staticmethod
def generate_priority(node, sense, queue):
return queue._count
[docs] def put(self, node):
node.queue_priority = self.generate_priority(None,
None,
self._queue)
cnt = super(LIFOQueue, self).put(node)
assert node.queue_priority == cnt
return cnt
[docs]class RandomPriorityQueue(CustomPriorityQueue):
"""A priority queue implementation that assigns
a random priority to each incoming node.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
@staticmethod
def generate_priority(node, sense, queue):
return random.random()
[docs] def put(self, node):
node.queue_priority = self.generate_priority(None,None,None)
return super(RandomPriorityQueue, self).put(node)
[docs]class LocalGapPriorityQueue(CustomPriorityQueue):
"""A priority queue implementation that serves nodes
with the largest gap between the local objective and
bound first.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
@staticmethod
def generate_priority(node, sense, queue):
objective = node.objective
bound = node.bound
if sense == minimize:
gap = objective - bound
else:
assert sense == maximize
gap = bound - objective
assert not math.isnan(gap)
return gap
[docs] def put(self, node):
node.queue_priority = self.generate_priority(node,
self._sense,
None)
return super(LocalGapPriorityQueue, self).put(node)
[docs]class LexicographicPriorityQueue(CustomPriorityQueue):
"""A priority queue implementation that serves nodes
with the largest gap between the local objective and
bound first.
sense : {:obj:`minimize <pybnb.common.minimize>`, :obj:`maximize <pybnb.common.maximize>`}
The objective sense for the problem.
"""
def __init__(self,
queue_types,
sense,
track_bound):
self._queue_types = tuple(queue_types)
assert len(self._queue_types)
super(LexicographicPriorityQueue, self).__init__(sense,
track_bound)
def _generate_priority(self, node):
return tuple(qt.generate_priority(node,
self._sense,
self._queue)
for qt in self._queue_types)
[docs] def put(self, node):
node.queue_priority = self._generate_priority(node)
return super(LexicographicPriorityQueue, self).put(node)
[docs]def PriorityQueueFactory(name, *args, **kwds):
"""Returns a new instance of the priority queue type
registered under the given name."""
if isinstance(name, six.string_types):
if name not in PriorityQueueFactory._types:
raise ValueError("invalid queue type: %s"
% (name))
return PriorityQueueFactory._types[name](*args,
**kwds)
else:
names = []
for n_ in name:
if n_ not in PriorityQueueFactory._types:
raise ValueError("invalid queue type: %s"
% (n_))
if n_ == 'custom':
raise ValueError("'custom' queue type not "
"allowed when defining a "
"lexicographic queue strategy")
names.append(PriorityQueueFactory._types[n_])
if len(names) == 0:
raise ValueError("Can not define lexicographic queue "
"strategy with empty list")
return LexicographicPriorityQueue(names, *args, **kwds)
PriorityQueueFactory._types = {}
[docs]def register_queue_type(name, cls):
"""Registers a new priority queue class with the
PriorityQueueFactory."""
if (name in PriorityQueueFactory._types) and \
(PriorityQueueFactory._types[name] is not cls):
raise ValueError(
"The name '%s' has already been registered"
"for priority queue type '%s'"
% (name, cls))
PriorityQueueFactory._types[name] = cls
register_queue_type('bound',
WorstBoundFirstPriorityQueue)
register_queue_type('custom',
CustomPriorityQueue)
register_queue_type('objective',
BestObjectiveFirstPriorityQueue)
register_queue_type('breadth',
BreadthFirstPriorityQueue)
register_queue_type('depth',
DepthFirstPriorityQueue)
register_queue_type('fifo',
FIFOQueue)
register_queue_type('lifo',
LIFOQueue)
register_queue_type('random',
RandomPriorityQueue)
register_queue_type('local_gap',
LocalGapPriorityQueue)