Monotonic Deque Pattern – Sliding Window Maximum
LeetCode 239. Given an array and window size k, return the maximum in each window. Naive O(nk) using deque to get O(n).
from collections import deque
def maxSlidingWindow(nums, k):
dq = deque() # stores indices, front is always max
result = []
for i, num in enumerate(nums):
# Remove indices outside window
while dq and dq[0] < i - k + 1:
dq.popleft()
# Maintain decreasing order: remove smaller elements from back
while dq and nums[dq[-1]] = k - 1:
result.append(nums[dq[0]])
return result
Key insight: if element j is in the window and nums[j] <= nums[i] where j < i, then j can never be the maximum for any future window (i is newer and at least as large). So we discard j. The deque stays monotonically decreasing, and the front is always the current maximum. Each element is pushed and popped at most once: O(n) total.
Monotonic Stack – Next Greater Element
LC 496 (Next Greater Element I) and LC 503 (circular array variant).
def nextGreaterElement(nums):
n = len(nums)
result = [-1] * n
stack = [] # indices of elements waiting for their next greater
for i in range(n):
while stack and nums[stack[-1]] < nums[i]:
idx = stack.pop()
result[idx] = nums[i]
stack.append(i)
return result
# Circular array variant (LC 503): iterate twice, use i % n
def nextGreaterElements_circular(nums):
n = len(nums)
result = [-1] * n
stack = []
for i in range(2 * n):
while stack and nums[stack[-1]] < nums[i % n]:
idx = stack.pop()
result[idx] = nums[i % n]
if i < n:
stack.append(i)
return result
Stack always holds indices in increasing order of their values (monotone increasing from bottom). When a new larger element arrives, it is the “next greater” for everything smaller at the top of the stack.
Task Scheduler LC 621
Given tasks with cooldown n, find the minimum time to execute all tasks.
import heapq
from collections import Counter, deque
def leastInterval(tasks, n):
counts = Counter(tasks)
max_heap = [-c for c in counts.values()]
heapq.heapify(max_heap)
time = 0
cooldown_queue = deque() # (count_when_done, available_at_time)
while max_heap or cooldown_queue:
time += 1
if max_heap:
count = heapq.heappop(max_heap) + 1 # decrement (neg)
if count < 0:
cooldown_queue.append((count, time + n))
else:
# CPU idle - jump time to next available task
time = cooldown_queue[0][1]
if cooldown_queue and cooldown_queue[0][1] <= time:
heapq.heappush(max_heap, cooldown_queue.popleft()[0])
return time
O(1) math formula: result = max(len(tasks), (max_freq – 1) * (n + 1) + count_of_max_freq_tasks). The formula computes the minimum slots needed if you arrange max-frequency tasks in a grid. If other tasks fill the gaps, len(tasks) wins.
BFS with Levels
from collections import deque
def bfs_levels(root):
if not root:
return []
queue = deque([root])
result = []
while queue:
level_size = len(queue) # key: snapshot size before processing level
level = []
for _ in range(level_size):
node = queue.popleft()
level.append(node.val)
for child in node.children:
queue.append(child)
result.append(level)
return result
# Multi-source BFS: start from multiple sources simultaneously
def multi_source_bfs(grid, sources):
queue = deque()
visited = set()
for r, c in sources:
queue.append((r, c, 0))
visited.add((r, c))
while queue:
r, c, dist = queue.popleft()
for dr, dc in [(0,1),(0,-1),(1,0),(-1,0)]:
nr, nc = r+dr, c+dc
if (nr, nc) not in visited and in_bounds(grid, nr, nc):
visited.add((nr, nc))
queue.append((nr, nc, dist+1))
Multi-source BFS is used in: 01 Matrix (LC 542), Rotting Oranges (LC 994), Walls and Gates (LC 286). All sources are seeded into the queue at distance 0 before the main loop starts.
Circular Buffer Implementation
class MyCircularQueue:
def __init__(self, k: int):
self.size = k + 1 # +1 to distinguish full from empty
self.buf = [0] * self.size
self.head = 0 # points to first element
self.tail = 0 # points to next write position
def enQueue(self, value: int) -> bool:
if self.isFull():
return False
self.buf[self.tail] = value
self.tail = (self.tail + 1) % self.size
return True
def deQueue(self) -> bool:
if self.isEmpty():
return False
self.head = (self.head + 1) % self.size
return True
def Front(self) -> int:
return -1 if self.isEmpty() else self.buf[self.head]
def Rear(self) -> int:
return -1 if self.isEmpty() else self.buf[(self.tail - 1) % self.size]
def isEmpty(self) -> bool:
return self.head == self.tail
def isFull(self) -> bool:
return (self.tail + 1) % self.size == self.head
The classic trick: allocate k+1 slots, declare full when (tail+1)%size == head. This wastes one slot but avoids needing a separate count variable. Alternative: use a count variable and allocate exactly k slots.
Shortest Subarray with Sum >= K (LC 862)
Finding shortest subarray with sum >= K is easy with a sliding window when all values are non-negative. With negative values, a monotonic deque on prefix sums is required.
def shortestSubarray(nums, k):
n = len(nums)
prefix = [0] * (n + 1)
for i in range(n):
prefix[i+1] = prefix[i] + nums[i]
result = float('inf')
dq = deque() # indices into prefix, monotone increasing by value
for i in range(n + 1):
# Check if prefix[i] - prefix[dq[0]] >= k
while dq and prefix[i] - prefix[dq[0]] >= k:
result = min(result, i - dq.popleft())
# Maintain monotone increasing: remove indices with prefix >= prefix[i]
while dq and prefix[dq[-1]] >= prefix[i]:
dq.pop()
dq.append(i)
return result if result != float('inf') else -1
Why deque is needed: with negatives, a prefix sum can decrease and then increase. A later prefix[j] might be smaller than an earlier prefix[i], making the subarray [i,j] invalid for a left-pointer approach. The deque keeps potential left boundaries in increasing order of their prefix sum, so once prefix[i] – prefix[dq[0]] >= k, dq[0] is used and discarded (any later right boundary would give a longer subarray from that left).
Jump Game VI LC 1696
From index i you can jump to [i+1, i+k]. Maximize score sum. Classic sliding window maximum + DP.
def maxResult(nums, k):
n = len(nums)
dp = [float('-inf')] * n
dp[0] = nums[0]
dq = deque([0]) # indices, decreasing by dp value
for i in range(1, n):
# Remove indices outside window
while dq and dq[0] < i - k:
dq.popleft()
dp[i] = dp[dq[0]] + nums[i]
# Maintain decreasing order
while dq and dp[dq[-1]] <= dp[i]:
dq.pop()
dq.append(i)
return dp[n-1]
Pattern: whenever you have dp[i] = max(dp[i-k..i-1]) + cost(i), use a sliding window maximum deque. The deque replaces an O(k) inner loop with O(1) amortized.
Pattern Recognition Table
| Problem Type | Data Structure | Key LeetCode Problems |
|---|---|---|
| Sliding window max/min | Monotonic deque | 239, 1438, 1696 |
| Next greater/smaller element | Monotonic stack | 496, 503, 739, 84, 85 |
| Level-order traversal | Queue + len snapshot | 102, 103, 107, 199 |
| Multi-source shortest distance | Multi-source BFS | 542, 994, 286 |
| Fixed-size FIFO with O(1) ops | Circular buffer | 622, 641 |
| Task scheduling with cooldown | Max-heap + cooldown queue | 621, 358 |
| Shortest subarray, negative values | Monotonic deque on prefix sums | 862 |
| DP with sliding window max | Monotonic deque + DP array | 1696, 1425, 375 |
| Histogram largest rectangle | Monotonic stack | 84, 85, 42 |
Meta coding interviews test monotonic deque and BFS patterns. See common questions for Meta interview: queue and sliding window problems.
Apple coding rounds test queue-based algorithms and sliding window. Review patterns for Apple interview: queue and deque data structure problems.
Snowflake engineering interviews test data structure implementations. See patterns for Snowflake interview: queue and data structure problems.