Merge monotonically increasing streams

1. Problem statement

Merge monotonically increasing streams (Aziz et al., 2018, p. 144; Sedgewick & Wayne, 2011, pp. 321–322). That is, given an input of some number of streams (where each stream's output is increasing monotonically), output a new stream such that the items remain sorted.

Inputs:
    - [0, 1, 2, 3, ...]
    - [5, 6, 7, ...]
    - [0, 2, 4, ...]

Output: [0, 0, 1, 2, 2, 3, 4, 5, 6, 7, ...]

2. Insights

2.1. We only care about the minimum

Similar to the discussion in "merge sorted linked lists", we only need to look at the first item in each of the input streams at any given time, because we just want to pick the smallest item out of all the streams.

In other words, if there are \(k\) streams, then we need some buffer of size \(k\) to determine which item is the smallest current minimum across all streams.

2.2. Streams can be very big

A key part of this problem is the assumption that the streams are very large, such that a stream's entire output cannot be captured in memory into a single array.

For our tests we'll use an array to simulate streams of small sizes, but we should keep in mind that the stream could be so big as to not fit in memory.

3. Solution

3.1. Brute force

The brute force approach looks at the very first element of all streams by putting them all into a buffer, then removes one of them as the smallest, and then advances the stream whose element was picked. Then it just repeats this process until the streams are fully drained.

In Python, the standard type for something that can generate values in a stream-like fashion is an Iterator, so that's the type that we use here.

def merge_brute_force(streams: List[Iterator[int]]) -> Iterator[int]:
    buf = []
    for stream_id, stream in enumerate(streams):
        item = next(stream, None)
        if item is not None:
            buf.append((item, stream_id))

    while buf:
        buf.sort() # Expensive!
        item, stream_id = buf[0]
        buf = buf[1:]

        yield item

        next_item = next(streams[stream_id], None)
        if next_item is not None:
            buf.append((next_item, stream_id))

The downside here is that we must search the buffer after every pick for the smallest item by sorting the entire buffer. This sort is still expensive as it can be as bad as \(O(k^2)\) (worst-case) where \(k\) is the number of streams. After the first sort though, either 1 or 0 elements (if we're lucky) will be out of place for all subsequent iterations of the while loop, so the time complexity will tend toward the best-case behavior.

Assuming our Python implementation uses Quicksort, which has best-case time complexity of \(O(k * \log{k})\), then our amortized overall time complexity will be \(O(n * k * \log{k})\) where \(n\) is the total number of items coming from the streams.

The space complexity is \(O(k)\), the size of our buffer.

3.2. Optimal

The optimal solution uses a priority queue instead of a list. Using a priority queue, we don't have to sort the buffer at all and (as is the nature of priority queues), we can quickly find the smallest element.

Python includes a min-heap priority queue. Running time is \(O(n * \log{k})\), because whereas getting the minimum of a min-heap is \(O(1)\), maintaining the heap property in the min-heap of size \(k\) is an \(O(\log{k})\) operation (after every extraction of the minimum value out of the priority queue).

The space complexity is still \(O(k)\), but we've improved greatly on the time complexity.

def merge_optimal(streams: List[Iterator[int]]) -> Iterator[int]:
    buf: List[Tuple[int, int]] = []
    for stream_id, stream in enumerate(streams):
        item = next(stream, None)
        if item is not None:
            heapq.heappush(buf, (item, stream_id))

    while buf:
        item, stream_id = heapq.heappop(buf)

        yield item

        next_item = next(streams[stream_id], None)
        if next_item is not None:
            heapq.heappush(buf, (next_item, stream_id))

Python's heapq module comes with a merge() method, so we can use that directly as well (and also use in our tests).

def merge_optimal_pythonic(streams: List[Iterator[int]]) -> Iterator[int]:
    combined_stream = heapq.merge(*streams)

    for item in combined_stream:
        yield item

4. Tests

🔗
from collections.abc import Iterator
import copy
import heapq
from hypothesis import given, strategies as st
from typing import List, Tuple
import unittest

brute_force
optimal
optimal_pythonic

# Utilities.
def drain(stream: Iterator[int]) -> List[int]:
    return [x for x in stream]

class Test(unittest.TestCase):
    test_cases

if __name__ == "__main__":
    unittest.main(exit=False)

4.1. Basic tests

def test_basic(self):
    # Empty streams result in nothing.
    s1 = iter([])
    s2 = iter([])
    s3 = iter([])
    streams = [s1, s2, s3]
    result = drain(merge_brute_force(streams))
    self.assertEqual(result, [])

    # Only one stream has content.
    s1 = iter([])
    s2 = iter([1, 2, 3])
    s3 = iter([])
    streams = [s1, s2, s3]
    result = drain(merge_brute_force(streams))
    self.assertEqual(result, [1, 2, 3])

    # Basic example, as described in the problem statement.
    s1 = iter([0, 1, 2, 3])
    s2 = iter([5, 6, 7])
    s3 = iter([0, 2, 4])
    streams = [s1, s2, s3]
    result = drain(merge_brute_force(streams))
    self.assertEqual(result, [0, 0, 1, 2, 2, 3, 4, 5, 6, 7])

4.2. Property-based tests

Check that the brute force solution agrees with the optimal solution.

@given(st.lists(st.lists(st.integers(min_value=1, max_value=1000),
                min_size=1,
                max_size=50), # Max number of items in a stream.
       min_size=1,
       max_size=5)) # Max number of streams.
def test_random(self, streams: List[List[int]]):
    # Sort values inside each stream first.
    iters = []
    for stream in streams:
        stream.sort()
        iters.append(iter(stream))

    # Create an identical set of streams (will be drained by the optimal
    # solution).
    iters_2 = copy.deepcopy(iters)
    iters_3 = copy.deepcopy(iters)

    # Run the streams through the available implementations.
    result_bf = drain(merge_brute_force(iters))
    result_optimal = drain(merge_optimal(iters_2))
    result_pythonic = drain(merge_optimal_pythonic(iters_3))

    # Do the solutions agree with each other?
    self.assertEqual(result_pythonic, result_bf)
    self.assertEqual(result_pythonic, result_optimal)

5. References

Aziz, A., Lee, T.-H., & Prakash, A. (2018). Elements of Programming Interviews in Python: The Insiders’ Guide. CreateSpace Independent Publishing Platform (25 July. 2018).
Sedgewick, R., & Wayne, K. D. (2011). Algorithms (4th ed). Addison-Wesley.