Source code for ast_toolbox.mcts.BoundedPriorityQueues

import copy

import numpy as np
from depq import DEPQ


[docs]class BoundedPriorityQueue: """The bounded priority Queue. Parameters ---------- N : int Size of the queue. """ def __init__(self, N): self.pq = DEPQ(iterable=None, maxlen=N) self.N = N
[docs] def enqueue(self, k, v, make_copy=False): """Storing k into the queue based on the priority value v. Parameters ---------- k : The object to be stored. v : float The priority value. make_copy : bool, optional Whether to make a copy of the k. """ if isinstance(k, np.ndarray): for pair in self.pq.data: if np.array_equal(k, pair[0]): return elif k in [pair[0] for pair in self.pq]: return while v in [pair[1] for pair in self.pq]: v += 1e-4 if make_copy: ck = copy.deepcopy(k) self.pq.insert(ck, v) else: self.pq.insert(k, v)
[docs] def length(self): """Return the current size of the queue. Returns ---------- length : int The current size of the queue. """ return self.pq.size()
[docs] def empty(self): """Clear the queue. """ self.pq.clear()
[docs] def isempty(self): """Check whether the queue is empty. Returns ---------- is_empty : bool Whether the queue is empty. """ return self.pq.is_empty()
[docs] def haskey(self, k): """Check whether k in in the queue. Returns ---------- has_key : bool Whether k in in the queue. """ return k in [pair[0] for pair in self.pq]
def __iter__(self): """The redefined iteration function. Returns ---------- BPQ_Iterator : generator The BPQ iterator. """ # return start(self) kvs = list(reversed(sorted(self.pq, key=lambda x: x[1]))) return (kv for kv in kvs)