Heaps

1. Problem statement

Implement a priority queue, using a heap.

2. Insights

2.1. What is a priority queue?

Another example of an abstract data structure is a list. Possible concrete implementations include arrays and linked lists.

A priority queue is an abstract data structure. Abstract here means that there can be multiple different (concrete) implementations, each with their own advantages.

A priority queue maintains a set S of elements, and requires the following operations:

insert(S, x)
add the element x into the set
get_max(S)
return the element with the largest key
pop_max(S)
same as get_max(S), but also delete the returned element from the set

The following are additional operations that may be useful depending on the application:

increase_key(S, x, k)
increases the value of x's key to k (assumed to be at least as large as x's current key value)
merge(S1, S2)
merge two sets together

We use the word key to mean "a standard unit of measurement that is comparable and which is present for every node in the set".

2.1.1. Practical applications

Priority queues are great any time you need to build up a list of "seen" items, and when you also need to examine the largest items. In a sense, a priority queue is like a regular queue. Whereas a regular queue "gets" or "pops" the oldest-inserted item first (priority determined by when the item was inserted into the queue), a priority queue gets or pops items based on their key value, independent of when they were inserted into the priority queue.

With a priority queue, you could for example efficiently maintain a task scheduler that needs to process the most important (heavily weighted) task first. New tasks could be coming in with insert(), but at any given point in time, we can get to the most important task with get_max() and run pop_max() when we're finished with it.

2.2. What are heaps?

Heaps are concrete data structures typically used to implement priority queues, because they are a natural fit. They have the best performance compared to other alternatives.

Heaps are a tree-based data structure, that come in two (symmetric) flavors: max-heaps and min-heaps. Max-heaps have the following heap property: child nodes cannot be greater than its parent (nodes get smaller as you go down each level of the tree). In a min-heap, it's the opposite: child nodes cannot be smaller than its parent (nodes get bigger as you go down each level of the tree).

A tree with just one child per node wouldn't be considered a tree any more, because it would be exactly the same as a linked list.

Note that heaps could technically be implement using any type of tree, but in practice binary trees are used because they are the simplest (they have the minimum number of children at each node to be considered a tree).

2.2.1. Performance

The maximum value in a max-heap is simply the root node. If we are using a binary tree, and also building it up as a complete tree (all levels are filled up left-to-right, top-down, as we insert new nodes), then the minimum value is somewhere along the lowest (bottom) level of the tree. As you can see, finding the minimum value in a max-heap would require some extra work (either traversing the bottom level, or doing additional bookkeeping during tree modifications (insertions/deletions)). The same performance penalty would exist for finding the maximum in a min-heap.

3. Solution

We'll look at two solutions, a brute force approach and another one using an array but using a clever power-of-two indexing scheme to represent a complete binary tree. We use the word "array" in this section, because that's the preferred term in the existing computer science literature. But in both our brute force and optimal implementations, we use Python lists (because Python calls arrays "lists").

3.1. Brute force

The simplest way to implement a priority queue (without a heap) is to just use a sorted array. The get_max() and pop_max() operations are just array lookups at the end of the sorted array.

The insert() operation would be slow though, because we'd have to make sure to insert the element in the correct (sorted) location.

class MaxHeapBruteForce:
    def __init__(self):
        self.items = []
        self._count = 0

    def __len__(self):
        return self._count

    # Insertion is slow because we have to sort all items *every* time.
    def insert(self, x: Any):
        self.items.append(x)
        self.items.sort()
        self._count += 1

    def get_max(self) -> Any:
        if self._count == 0:
            raise IndexError("get from an empty heap")

        return self.items[-1]

    def pop_max(self) -> Any:
        if self._count == 0:
            raise IndexError("pop from an empty heap")

        max = self.items[-1]
        self.items = self.items[:-1]
        self._count -= 1

        return max

The only downside is the slow speed of insert(). The other functions get_max() and pop_max() are straightforward enough.

3.2. Heap based on complete binary trees

We can use a standard binary tree data structure, but make sure to always fill it up left-to-right at every level (and only move down to the next level when we're out of room at the current level). This way, we can minimize the depth of the tree. These types of binary trees are called complete binary trees.

Why do we want to minimize the depth of the tree? It's because that way, we can minimize the depth of the heap as we grow it. Minimizing depth is important because it minimizes the cost of insert() and pop_max().

We won't be implementing a tree-based heap though, mainly because in order to do so we need to be able to go up to a parent node from a child node, and our existing binary trees implementation's nodes only has links to its children, not its parent.

3.3. Optimal solution: array-based heap

Did you know that you could represent complete binary trees using an array? All you have to do is make sure to store each level of the tree contiguously in the array, with the upper levels on the left and the lower levels on the right.

Array indices:
1 2 3 4 5 6 7 8 9 ...

Tree representation:

              1
     2                 3
4         5       6         7

We use 1-based indexing, because it makes the conversion between an array index and its location in the binary tree a little bit easier. Using the above scheme, it becomes easy to both represent the location of the array index in the binary tree, and also to figure out which index is the parent or left/right child.

Consider the index 3. Its binary representation (this is the crux of the scheme) is 11. We ignore the high-order bit (that is, the leftmost bit) to get just 1. The "1" here means "go right" (and "0" means "go left"). The children of 3 are 6 and 7. We just tack on an additional 0 and 1 to the original 11 to get the children (110 and 111). Again we ignore the leftmost bit, and arrive at 10 and 11, which mean "right, left" and "right, right" from the root node, respectively.

In summary we use binary digits to mean "go left" (0) or "go right" (1), and if we use 0-based indexing we can very easily associate a binary number with a position in the binary tree. This scheme is the one we used in our binary trees implementation to specify "directions" for figuring out the location of the node in the overall tree.

For the following reasons, this array-based implementation is considered to be the optimal implementation for most scenarios:

Space-efficient
No need to store links to other nodes — every node just sits in an array.
Cache-friendly
Arrays live in contiguous blocks of memory, and are easier to load into (and stay in) the CPU's cache. Trees store links, and links can lead to random locations in memory, and are therefore less cache-friendly.
O(1) access to all nodes
Because we use an array, accessing any node in the heap takes constant time; contrast this with a tree-based heap that must jump through links starting from the root to get to the desired node.
Easy to know where the "end" of the heap is
The place where we can insert a new item in the node is just right of the last element in the array, and can be accessed in O(1) time.
class MaxHeap:
    optimal_max_heap_methods

class MinHeap(MaxHeap):
    optimal_min_heap_methods

The min-heap (MinHeap) is just a symmetric mirror of the max-heap. As such, we inherit from MaxHeap to reduce code duplication.

3.3.1. Initialization

Initialization is pretty simple. We just need an array and a _count field to know how many elements were inserted. Having the _count field allows us to quickly determine where the last element is.

Note that because Python lists are already dynamic arrays, we could just use len() instead of a _count field (and there's also no need to set an explicit size). However, setting an explicit size and using a _count field will let us simulate using a fixed-size array, which would give the best performance (as long as we are operating within the allocated size of the array, we won't get hit with an array reallocation penalty if we needed to grow the array).

Instead of using _get_key(), we could just expect the objects to implement a __lt()__ method, making them inherently comparable without an explicit key. But then the author of the object, not the caller, would have control over how the objects are compared. This may be a deal-breaker for users of our heap if they don't have control over how the __lt()__ method is implemented.

We also make use of a _get_key() function that returns the key of an item in the heap. The idea is that you can provide custom functions for this so that you can compare arbitrary objects against each other (that is, this heap implementation delegates the retrieval of some comparable, key-like property of items to the caller).

def __init__(self, size=1, get_key: Optional[Callable]=None):
    if size < 1:
        size = 1
    self.heap = [None] * size
    self._count = 0

    # get_key is a function used to return the "key" --- some aspect of the item
    # that makes it comparable with other items. If this function is not
    # specified we just return the item itself, which works for simple types
    # like integers.
    def identity(item: Any) -> Any:
        if item is None:
            raise TypeError("cannot get key of None type")
        return item

    if get_key is None:
        get_key = identity
    self._get_key = get_key

Implementing __len__() allows us to use the len() built-in function against our MaxHeap object.

def __len__(self):
    return self._count

For completeness, let's also include a grow_heap() method to grow the heap if we run out of space. This way we take control of how often we need to reallocate the bintree array if we run out of room.

def _grow(self):
    self.heap.extend([None] * len(self.heap))

# Used to check if we need to grow the heap.
def _full(self) -> bool:
    return self._count + 1 == len(self.heap)

3.3.2. Get max

Getting the max in a max-heap is easy — it's the first element. The hard part is just making sure that it is even a valid entry.

def get_max(self) -> Any:
    # If the heap is empty, there's no max item.
    if self._count == 0:
        return None

    return self.heap[1]

3.3.3. Insertion

When we insert an item into the max-heap, we must make sure that when we are done, all parents are greater or equal than their children. That is, we must maintain the heap property for max-heaps.

Since we are using an array-backed heap, the simplest way of adding a new item is to add it to the end of the array. Then in order to ensure that we maintain the heap property, we just have to swap this item with its parent (up the tree) as many times as necessary. We would stop if either we don't have a parent (we're at the root node) or if the current parent is greater than or equal to us.

The above operation is also known as "bottom-up reheapify", because the newly inserted item always starts out at the bottom of the tree, but is moved up the tree (if necessary) to preserve the heap property (essentially converting a non-heap back into a heap).

The first thing we need to be able to do is to exchange the value of two nodes in the binary tree (parent with the child).

def _swap_heap_nodes(self, i: int, j: int):
    self.heap[i], self.heap[j] = self.heap[j], self.heap[i]

Speaking of the parent and child nodes, let's add some helper methods to figure out how to find their indices.

@staticmethod
def _parent_index(i: int) -> int:
    return i >> 1

@staticmethod
def _left_child_index(i: int) -> int:
    return i << 1

@staticmethod
def _right_child_index(i: int) -> int:
    return (i << 1) + 1

Now let's implement bottom-up reheapification. Let's call it "reheapify_up" for short. Given some index in the heap, it will move the node at that index up the heap as many times as necessary.

def _reheapify_up(self, i: int):
    while True:
        # We're already at the top; NOP.
        if i < 2:
            break

        parent_index = self._parent_index(i)
        parent_key = self._get_key(self.heap[parent_index])

        # Abort if we satisfy the heap condition.
        if parent_key >= self._get_key(self.heap[i]):
            break

        # Swap upward.
        self._swap_heap_nodes(parent_index, i)

        i = parent_index

We now have all the pieces to implement insertion.

def insert(self, x: Any):
    # Grow the heap first if we're out of room.
    if self._full():
        self._grow()

    # Insert at the end of the array (just after the last item).
    self._count += 1
    self.heap[self._count] = x

    # Reheapify.
    self._reheapify_up(self._count)

3.3.4. Pop max

Removing the max item means deleting the root node, and then making sure to put a node (it doesn't matter which one exactly) in the root's position so that the tree structure stays intact. And then we also have to make sure that the new root node is the maximum value in the whole tree.

First, which node should we use to replace the old root node? Well the easiest choice is the child at the end of the array — this way the array can naturally "shrink" as we pop off the max items repeatedly (such that the array doesn't get any "holes" in it).

For making sure that the new root node is the new max value, we have to repeatedly exchange it with the larger of its children. This is basically like _reheapify_up(), but in the opposite direction (top-down, instead of bottom-up).

def pop_max(self) -> Any:
    max = self.get_max()

    # The last item in the array is the new root node (for now).
    self.heap[1] = self.heap[self._count]

    # Clear last item's old position in the heap (shrink the heap by 1.)
    self.heap[self._count] = None
    self._count -= 1

    # Reheapify.
    self._reheapify_down(1)

    return max

For reheapifiyng downward, we just need to make sure to get the larger of the two children.

def _reheapify_down(self, i: int):
    while True:
        # If our children would be out of bounds, abort. "Out of bounds" here
        # means outside of the tree, not the overall heap size (which includes
        # unused parts of the array).
        if self._left_child_index(i) > self._count:
            break

        left_child_index = self._left_child_index(i)
        right_child_index = self._right_child_index(i)

        # Find the bigger child. It may be that we cannot look at both child
        # indices, because one of them (the right child) might be out of bounds.
        # In that case there is only 1 possible child (the left child), and so
        # there's no need to compare children to see which one is bigger.
        if right_child_index > self._count:
            bigger_child_index = left_child_index
        else:
            left_child = self._get_key(self.heap[left_child_index])
            right_child = self._get_key(self.heap[right_child_index])

            bigger_child_index = left_child_index
            if left_child < right_child:
                bigger_child_index = right_child_index
        bigger_child = self._get_key(self.heap[bigger_child_index])

        # If we already satisfy the heap property, stop swapping.
        if bigger_child <= self.heap[i]:
            break

        # Swap downward.
        self._swap_heap_nodes(bigger_child_index, i)

        i = bigger_child_index

3.3.5. MinHeap

This is the mirror of MaxHeap. We have it here for completeness, but we don't bother with discussing the implementation because all of the previous discussion applies, but in "reverse".

def get_min(self) -> Any:
    if self._count == 0:
        return None

    return self.heap[1]

def _reheapify_up(self, i: int):
    while True:
        if i < 2:
            break

        parent_index = self._parent_index(i)
        parent_key = self._get_key(self.heap[parent_index])

        if parent_key <= self._get_key(self.heap[i]):
            break

        self._swap_heap_nodes(parent_index, i)

        i = parent_index

def pop_min(self) -> Any:
    min = self.get_min()

    self.heap[1] = self.heap[self._count]

    self.heap[self._count] = None
    self._count -= 1

    self._reheapify_down(1)

    return min

def _reheapify_down(self, i: int):
    while True:
        if self._left_child_index(i) > self._count:
            break

        left_child_index = self._left_child_index(i)
        right_child_index = self._right_child_index(i)

        if right_child_index > self._count:
            smaller_child_index = left_child_index
        else:
            left_child = self._get_key(self.heap[left_child_index])
            right_child = self._get_key(self.heap[right_child_index])

            smaller_child_index = left_child_index
            if left_child > right_child:
                smaller_child_index = right_child_index
        smaller_child = self._get_key(self.heap[smaller_child_index])

        if smaller_child >= self.heap[i]:
            break

        self._swap_heap_nodes(smaller_child_index, i)

        i = smaller_child_index

Lastly, we take care to invalidate the get_max() and pop_max() methods we've inherited from MaxHeap, because they don't make sense for a min-heap.

def get_max(self):
    raise AttributeError("get_max is not supported")
def pop_max(self):
    raise AttributeError("pop_max is not supported")

4. Tests

🔗
import heapq
from hypothesis import given, strategies as st
from typing import List
import unittest

from .heap import MaxHeapBruteForce, MaxHeap, MinHeap

class Test(unittest.TestCase):
    test_cases

if __name__ == "__main__":
    unittest.main(exit=False)

4.1. Initialization

def test_init_bf(self):
    h = MaxHeapBruteForce()
    self.assertEqual(len(h), 0)

4.2. Get max

def test_get_max_bf(self):
    h = MaxHeapBruteForce()
    self.assertEqual(len(h), 0)
    self.assertRaises(IndexError, h.get_max)
    h.insert(1)
    self.assertEqual(h.get_max(), 1)
    h.insert(2)
    self.assertEqual(h.get_max(), 2)

4.3. Insertion

def test_insertion_bf(self):
    h = MaxHeapBruteForce()
    h.insert(1)
    h.insert(2)
    h.insert(3)
    self.assertEqual(len(h), 3)

4.4. Pop max

def test_pop_max_bf(self):
    h = MaxHeapBruteForce()
    h.insert(1)
    h.insert(2)
    h.insert(3)
    self.assertEqual(len(h), 3)

    x = h.pop_max()
    self.assertEqual(x, 3)
    self.assertEqual(len(h), 2)

    x = h.pop_max()
    self.assertEqual(x, 2)
    self.assertEqual(len(h), 1)

    x = h.pop_max()
    self.assertEqual(x, 1)
    self.assertEqual(len(h), 0)

4.5. Property-based tests

Traditional heapsort actually does not use extra space. It "heapifies" the given array of items in-place. This is obviously an improvement over using an auxiliary heap as we do in test_heap_sort().

Check that we can use the heap to sort items (aka "heapsort"). We also check that the optimal version agrees with the brute force version.

@given(st.lists(st.integers(min_value=1, max_value=1000),
                min_size=1,
                max_size=50))
def test_heap_sort(self, to_insert: list[int]):
    max_heap_bf = MaxHeapBruteForce()
    max_heap_optimal = MaxHeap()
    self.assertEqual(len(max_heap_bf), len(max_heap_optimal))

    # Populate heaps.
    for item in to_insert:
        max_heap_bf.insert(item)
        max_heap_optimal.insert(item)
    self.assertEqual(len(max_heap_bf), len(max_heap_optimal))

    # Drain heaps.
    output_bf = []
    output_optimal = []
    for _ in range(len(to_insert)):
        x = max_heap_bf.pop_max()
        y = max_heap_optimal.pop_max()
        output_bf.append(x)
        output_optimal.append(y)
    self.assertEqual(output_bf, output_optimal)
    self.assertEqual(len(max_heap_bf), len(max_heap_optimal))

    # Are the outputs sorted (in reverse, because it's a max-heap)?
    s = list(sorted(to_insert))
    self.assertEqual(s, list(reversed(output_bf)))
    self.assertEqual(s, list(reversed(output_optimal)))

For our MinHeap implementation, check that it performs the same way as Python's heapq module (which implements a min-heap).

@given(st.lists(st.integers(min_value=1, max_value=1000),
                min_size=1,
                max_size=50))
def test_min_heap(self, to_insert: list[int]):
    min_heap = MinHeap()
    min_heap_std: List[int] = []

    # Populate heaps.
    for item in to_insert:
        min_heap.insert(item)
        heapq.heappush(min_heap_std, item)
    self.assertEqual(len(min_heap), len(min_heap_std))

    # Drain heaps.
    output = []
    output_std = []
    for _ in range(len(to_insert)):
        x = min_heap.pop_min()
        y = heapq.heappop(min_heap_std)
        output.append(x)
        output_std.append(y)

    # Check drained output (should be in agreement with heapq output).
    self.assertEqual(output, output)
    self.assertEqual(len(min_heap), len(min_heap_std))

5. Export

from __future__ import annotations
from typing import Any, Callable, Optional
code

6. References