Stream Message Committer
In a multi-threaded system, messages are identified by unique, non-negative offsets starting from 0. While messages are processed out of order, the system can only commit an offset if all messages with smaller offsets have already been processed.
Commit Rules
- Commit Definition: Committing an offset
ximplies that all messages from0toxinclusive are now acknowledged. - Eligibility: Offset
0is always eligible once processed. Any offsetx > 0is eligible only if every offset from0tox-1has already been processed or committed. - Output: For each message in the input
offsetsarray (representing the order in which they arrive/finish processing):- If the current message allows for a new commit, output the highest offset that can now be committed.
- If the contiguous block starting from the last committed offset (or 0) is still incomplete, output
-1.
Example 1
Input
offsets = [2, 0, 1]Output
[-1, 0, 2]Explanation
1. Offset 2 is processed. We cannot commit because 0 and 1 are missing. Output -1. 2. Offset 0 is processed. 0 is the starting point, so we commit 0. Output 0. 3. Offset 1 is processed. Now we have 0, 1, and 2. The highest contiguous offset is 2. Output 2.
Example 2
Input
offsets = [0, 1, 2]Output
[0, 1, 2]Explanation
Each message arrives in order, allowing the commit pointer to move forward immediately at each step.
Example 3
Input
offsets = [2, 1, 0, 5, 4]Output
[-1, -1, 2, -1, -1]Explanation
We only see a commit when offset 0 arrives (completing the block 0-2). When 5 and 4 arrive, they are still waiting on offset 3 to complete the next block.
Constraints
1 <= offsets.length <= 10⁵0 <= offsets[i] < offsets.length- All values in
offsetsare unique.
Stream Message Committer
Overview
The problem asks us to track a contiguous sequence of processed offsets starting from 0. In a stream processing environment, messages often arrive out of order. However, the system state can only be "committed" up to a certain point if every single message prior to that point has been successfully handled.
Think of this like a sliding window where the left boundary is fixed at 0. We want to know, after every new message arrives, how far we can push the right boundary of a "completed" block. If the message that just arrived doesn't fill the immediate gap after our last commit, we can't move the boundary forward, so we return -1.
Brute force
A naive approach involves maintaining a set or a boolean array of all messages received so far. Every time a new offset arrives, we start from the offset immediately following our last committed value and check, one by one, if the subsequent offsets exist in our collection. We continue incrementing our "last committed" pointer until we hit a missing offset.
While this logic seems sound, a naive implementation that re-scans the entire collection from scratch for every input would result in quadratic time complexity.
def commit_messages(offsets: list[int]) -> list[int]:
n = len(offsets)
processed = [False] * n
results = []
last_committed = -1
for x in offsets:
processed[x] = True
# If the newly arrived offset is exactly what we need next
if x == last_committed + 1:
curr = x
# Scan forward to see how many buffered messages we can now commit
while curr + 1 < n and processed[curr + 1]:
curr += 1
last_committed = curr
results.append(last_committed)
else:
results.append(-1)
return results
Time complexity: O(N) — Although there is a nested while loop, the last_committed pointer only moves forward and never resets, ensuring each offset is visited by the inner loop at most once across the entire execution.
Space complexity: O(N) — We maintain a boolean array processed of size N to track which offsets have arrived.
Optimal / best approach
The "Brute Force" described above is actually quite efficient in terms of time complexity ($O(N)$) because of the Amortized Analysis of the pointer movement. However, we can refine the implementation to be as clean and idiomatic as possible.
We use a boolean array (or a bitset) to track seen offsets and a single integer variable next_expected to track the first offset that has not been committed yet. Whenever the incoming offset matches next_expected, we "flush" the buffer by incrementing next_expected as far as possible using the previously seen offsets.
def commit_messages(offsets: list[int]) -> list[int]:
n = len(offsets)
# Track which offsets have been received but not yet committed
seen = [False] * n
results = []
# next_expected is the offset we are currently waiting for to advance the commit
next_expected = 0
for offset in offsets:
seen[offset] = True
# If the incoming offset is the one that completes the contiguous block
if offset == next_expected:
# Advance next_expected as far as possible
while next_expected < n and seen[next_expected]:
next_expected += 1
# The highest committed offset is next_expected - 1
results.append(next_expected - 1)
else:
# The gap is still there; cannot commit anything new
results.append(-1)
return results
Time complexity: O(N) — Each element in the offsets array is processed once, and the next_expected pointer increments at most $N$ times throughout the entire function.
Space complexity: O(N) — We require a boolean array seen of size $N$ and an output list of size $N$.
Why this works
The core constraint is that we cannot commit offset x unless all offsets 0...x-1 are committed. This means the "commit point" is strictly monotonic. By using a boolean array as a fast-access buffer, we can store "future" messages that arrive before their predecessors. As soon as the "missing link" (the next_expected offset) arrives, it acts as a trigger to release all contiguous messages stored in the buffer.
Fast
Comments on the solution
No comments on the solution yet. Start a thread about the editorial or your own approach.
No comments yet for Stream Message Committer. Be the first to start the thread.
No submissions recorded for this problem yet. Use Submit in the editor — each judge run is stored here.
Output
Input
Expected
Your output
Run to see your output.