Table of Contents
Open Table of Contents
🚧 Warning: Article Under Construction 🚧
For now, we just have a grab-bag of random exercises. Probably come back later…
Memory Manager
import bisect
from dataclasses import dataclass
@dataclass
class MemoryBlock:
start: int
size: int
def __lt__(self, other):
if isinstance(other, MemoryBlock):
if self.size == other.size:
return self.start < other.start
return self.size < other.size
return self < other
class MemoryManager:
def __init__(self, N: int):
"""Initializes the memory manager with a total memory size of N."""
self.N = N
self.free_blocks: list[MemoryBlock] = [MemoryBlock(0, N)]
self.allocated_blocks = {}
def malloc(self, k: int) -> int:
"""Allocates a block of memory of size k and returns the starting address.
Args:
k: The size of the block to allocate
Returns:
The starting address of the allocated block
Raises:
ValueError: If the allocation size is invalid
MemoryError: If there's not enough contiguous space
"""
if k <= 0 or k > self.N:
raise ValueError("Invalid allocation size")
# Find the smallest free block that can accommodate the request
index = bisect.bisect_left(self.free_blocks, MemoryBlock(0, k))
if index == len(self.free_blocks):
raise MemoryError("Not enough contiguous space")
block = self.free_blocks.pop(index)
start = block.start
# If there's remaining space in the block, add it back to free_blocks
if block.size > k:
remaining = MemoryBlock(start + k, block.size - k)
bisect.insort(self.free_blocks, remaining)
# Add to allocated blocks
self.allocated_blocks[start] = k
return start
def free(self, ptr: int):
"""Frees the block of memory starting at address ptr.
Also merges adjacent free blocks to form larger free blocks.
Args:
ptr: The starting address of the block to free
Raises:
ValueError: If the pointer is invalid or the block is already freed
"""
if ptr not in self.allocated_blocks:
raise ValueError("Invalid pointer or already freed")
size = self.allocated_blocks.pop(ptr)
new_block = MemoryBlock(ptr, size)
# Merge with adjacent free blocks if possible
prev_index = bisect.bisect_right(self.free_blocks, MemoryBlock(ptr, 0)) - 1
next_index = prev_index + 1
if prev_index >= 0:
prev_block = self.free_blocks[prev_index]
if prev_block.start + prev_block.size == ptr:
new_block.start = prev_block.start
new_block.size += prev_block.size
self.free_blocks.pop(prev_index)
next_index -= 1
if next_index < len(self.free_blocks):
next_block = self.free_blocks[next_index]
if new_block.start + new_block.size == next_block.start:
new_block.size += next_block.size
self.free_blocks.pop(next_index)
bisect.insort(self.free_blocks, new_block)
def __str__(self):
memory = ["0"] * self.N
for start, size in self.allocated_blocks.items():
for i in range(start, start + size):
memory[i] = "1"
return "".join(memory)
# Test the implementation
if __name__ == "__main__":
m = MemoryManager(1000)
print("Initial memory state:", m)
a = m.malloc(100)
print(f"Allocated 100 bytes at {a}. Memory state:", m)
b = m.malloc(500)
print(f"Allocated 500 bytes at {b}. Memory state:", m)
try:
c = m.malloc(950) # This should raise an error
except MemoryError as e:
print("Error:", e)
m.free(b)
print("Freed memory at b. Memory state:", m)
m.free(a)
print("Freed memory at a. Memory state:", m)
try:
m.free(a) # This should raise an error
except ValueError as e:
print("Error:", e)
c = m.malloc(950)
print(f"Allocated 950 bytes at {c}. Memory state:", m)
Test Memory Manager
import pytest
from memory_manager import MemoryManager
@pytest.fixture
def memory_manager():
return MemoryManager(1000)
def test_initialization(memory_manager):
assert str(memory_manager) == "0" * 1000
assert len(memory_manager.free_blocks) == 1
assert memory_manager.free_blocks[0].start == 0
assert memory_manager.free_blocks[0].size == 1000
def test_malloc_basic(memory_manager):
ptr = memory_manager.malloc(100)
assert ptr == 0
assert str(memory_manager) == "1" * 100 + "0" * 900
def test_malloc_multiple(memory_manager):
ptr1 = memory_manager.malloc(100)
ptr2 = memory_manager.malloc(200)
ptr3 = memory_manager.malloc(300)
assert ptr1 == 0
assert ptr2 == 100
assert ptr3 == 300
assert str(memory_manager) == "1" * 600 + "0" * 400
def test_malloc_full_memory(memory_manager):
ptr = memory_manager.malloc(1000)
assert ptr == 0
assert str(memory_manager) == "1" * 1000
def test_malloc_invalid_size(memory_manager):
with pytest.raises(ValueError):
memory_manager.malloc(0)
with pytest.raises(ValueError):
memory_manager.malloc(-100)
with pytest.raises(ValueError):
memory_manager.malloc(1001)
def test_malloc_out_of_memory(memory_manager):
memory_manager.malloc(600)
memory_manager.malloc(300)
with pytest.raises(MemoryError):
memory_manager.malloc(101)
def test_free_basic(memory_manager):
ptr = memory_manager.malloc(100)
memory_manager.free(ptr)
assert str(memory_manager) == "0" * 1000
def test_free_multiple(memory_manager):
ptr1 = memory_manager.malloc(100)
ptr2 = memory_manager.malloc(200)
ptr3 = memory_manager.malloc(300)
memory_manager.free(ptr2)
assert str(memory_manager) == "1" * 100 + "0" * 200 + "1" * 300 + "0" * 400
memory_manager.free(ptr1)
assert str(memory_manager) == "0" * 300 + "1" * 300 + "0" * 400
memory_manager.free(ptr3)
assert str(memory_manager) == "0" * 1000
def test_free_invalid_pointer(memory_manager):
with pytest.raises(ValueError):
memory_manager.free(100)
ptr = memory_manager.malloc(100)
memory_manager.free(ptr)
with pytest.raises(ValueError):
memory_manager.free(ptr)
def test_malloc_after_free(memory_manager):
ptr1 = memory_manager.malloc(300)
ptr2 = memory_manager.malloc(400)
memory_manager.free(ptr1)
ptr3 = memory_manager.malloc(200)
assert ptr3 == 0
assert str(memory_manager) == "1" * 200 + "0" * 100 + "1" * 400 + "0" * 300
def test_malloc_best_fit(memory_manager):
ptr1 = memory_manager.malloc(200)
ptr2 = memory_manager.malloc(300)
ptr3 = memory_manager.malloc(100)
memory_manager.free(ptr1)
memory_manager.free(ptr3)
ptr4 = memory_manager.malloc(150)
print(memory_manager.free_blocks)
assert ptr4 == 0
assert (
str(memory_manager) == "1" * 150 + "0" * 50 + "1" * 300 + "0" * 500
)
def test_fragmentation_and_coalescing(memory_manager):
ptr1 = memory_manager.malloc(100)
ptr2 = memory_manager.malloc(200)
ptr3 = memory_manager.malloc(300)
ptr4 = memory_manager.malloc(100)
memory_manager.free(ptr2)
memory_manager.free(ptr4)
assert len(memory_manager.free_blocks) == 2
memory_manager.free(ptr3)
assert len(memory_manager.free_blocks) == 1
assert memory_manager.free_blocks[0].size == 600
memory_manager.free(ptr1)
assert len(memory_manager.free_blocks) == 1
assert memory_manager.free_blocks[0].size == 1000
def test_stress_test(memory_manager):
pointers = []
for _ in range(100):
try:
ptr = memory_manager.malloc(10)
pointers.append(ptr)
except MemoryError:
break
assert len(pointers) > 0
for ptr in pointers[::2]:
memory_manager.free(ptr)
try:
memory_manager.malloc(15)
except MemoryError:
pytest.fail("Should be able to allocate 15 bytes after freeing")
for ptr in pointers[1::2]:
memory_manager.free(ptr)
assert str(memory_manager) == "0" * 1000
Utils
from collections.abc import Callable
from operator import le, lt
from typing import Any, Literal, Protocol, TypeVar
class Comparable(Protocol):
def __lt__(self, other: Any) -> bool: ...
def __le__(self, other: Any) -> bool: ...
CT = TypeVar("CT", bound=Comparable)
def mergesort(arr: list[CT]):
def merge(left: list[CT], right: list[CT]) -> list[CT]:
"""O(n) To do merging"""
result = []
i, j = 0, 0
while i < len(left) and j < len(right):
if left[i] < right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
# Add remaining elements
result.extend(left[i:])
result.extend(right[j:])
return result
if len(arr) <= 1:
return arr
# Divide and conquer O(log n)
mid = len(arr) // 2
left = mergesort(arr[:mid])
right = mergesort(arr[mid:])
return merge(left, right)
def bisect(
arr: list[CT],
target: CT,
side: Literal["left", "right"] = "left",
key: Callable[[CT], CT] | None = None,
):
"""
If side == "right:
Returns a partition ip such that
all(elem <= target for elem in arr[lo: ip]) == True
all(elem > target for elem in arr[ip: hi]) == True
else:
Returns a partition ip such that
all(elem < target for elem in arr[lo: ip]) == True
all(elem >= target for elem in arr[ip: hi]) == True
"""
def key_func(x: CT) -> CT:
return x if key is None else key(x)
compare = lt if side == "left" else le
left, right = 0, len(arr)
while left < right:
mid = (left + right) >> 1
if compare(key_func(arr[mid]), target):
left = mid + 1
else:
right = mid
return left
def bisect_left(arr: list, target: Any, key: Callable | None = None):
return bisect(arr, target, side="left", key=key)
def bisect_right(arr: list, target: Any, key: Callable | None = None):
return bisect(arr, target, side="right", key=key)
Architecture Generator
from typing import List, Set, Tuple
import unittest
"""
Design a function that takes:
- input_size: Size of input layer
- output_size: Size of output layer
- hidden_layers: Maximum number of hidden layers allowed
- min_neurons: Minimum neurons per hidden layer
- max_neurons: Maximum neurons per hidden layer
Return all valid neural network architectures where:
- Each hidden layer must have neurons between min_neurons and max_neurons
- Number of hidden layers can be 1 to hidden_layers
- Each subsequent layer must be smaller than or equal to the previous layer
"""
def generate_architectures(
input_size,
output_size,
max_hidden_layers,
min_neurons,
max_neurons,
):
"""Return all valid neural network architectures.
Computational Complexity:
O((max_neurons - min_neurons) ^ max_hidden_layers)
Space Complexity:
"""
architectures = []
# a valid architecture should have 2 <= len(arch) <= hidden_layers + 2
def generate_helper(
prev_size,
layers_left,
min_neurons,
max_neurons,
curr_arch,
):
if layers_left == 0:
# If we have no more hidden layers to add, we can add the output layer
architectures.append(curr_arch + [output_size])
return
prev_neurons = curr_arch[-1]
for num_neurons in range(min(prev_size, max_neurons), min_neurons - 1, -1):
if num_neurons <= prev_neurons:
generate_helper(
num_neurons,
layers_left - 1,
min_neurons,
max_neurons,
curr_arch=curr_arch + [num_neurons],
)
for num_hidden_layers in range(1, max_hidden_layers + 1):
# Generate all possible archs. given a set number of hidden layers
generate_helper(
input_size,
num_hidden_layers,
min_neurons,
max_neurons,
curr_arch=[input_size], # seed with input layer
)
return sorted(architectures)
# Example:
# generate_architectures(784, 10, 2, 32, 128)
# Should return something like:
# [[784, 128, 10], [784, 64, 10], [784, 128, 64, 10], ...]
# out = generate_architectures(784, 10, 3, 32, 64)
# rich.print(out)
class TestNNArchitectureGenerator(unittest.TestCase):
def test_basic_case(self):
"""Test a simple case with clear constraints"""
architectures = generate_architectures(
input_size=4,
output_size=2,
max_hidden_layers=1,
min_neurons=2,
max_neurons=3,
)
expected = [[4, 2, 2], [4, 3, 2]]
self.assertEqual(architectures, expected)
def test_multiple_hidden_layers(self):
"""Test generation with multiple hidden layers allowed"""
architectures = generate_architectures(
input_size=6,
output_size=2,
max_hidden_layers=2,
min_neurons=3,
max_neurons=4,
)
# Verify basic properties
for arch in architectures:
# Check input/output sizes
self.assertEqual(arch[0], 6)
self.assertEqual(arch[-1], 2)
# Check hidden layer constraints
for hidden_size in arch[1:-1]:
self.assertTrue(3 <= hidden_size <= 4)
# Check monotonic decrease in hidden layers
for i in range(1, len(arch) - 2):
self.assertTrue(arch[i] >= arch[i + 1])
def test_edge_cases(self):
"""Test edge cases and constraints"""
# Test with min_neurons = max_neurons
architectures = generate_architectures(
input_size=4,
output_size=2,
max_hidden_layers=2,
min_neurons=3,
max_neurons=3,
)
for arch in architectures:
for hidden_size in arch[1:-1]:
self.assertEqual(hidden_size, 3)
# Test with single possible hidden layer
architectures = generate_architectures(
input_size=4,
output_size=2,
max_hidden_layers=1,
min_neurons=3,
max_neurons=3,
)
self.assertEqual(architectures, [[4, 3, 2]])
if __name__ == "__main__":
# Run tests
unittest.main(argv=["first-arg-is-ignored"], exit=False)
# Example usage
print("\nExample architectures:")
architectures = generate_architectures(
input_size=784, # MNIST input size
output_size=10, # MNIST output size
max_hidden_layers=2,
min_neurons=32,
max_neurons=128,
)
print(f"Found {len(architectures)} valid architectures")
print("First few architectures:", architectures[:5])
Bank Account
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable
def bisect_left(arr: list, target: Any, key: Callable | None = None):
def key_func(x):
return x if key is None else key(x)
left, right = 0, len(arr)
while left < right:
mid = (left + right) >> 1
if key_func(arr[mid]) <= target:
left = mid + 1
else:
right = mid
return left
def bisect_right(arr: list, target: Any, key: Callable | None = None):
def key_func(x):
return x if key is None else key(x)
left, right = 0, len(arr)
while left < right:
mid = (left + right) >> 1
if key_func(arr[mid]) <= target:
left = mid + 1
else:
right = mid
return left
def insort_left_(arr: list, target: Any, val: Any, key: Callable | None = None):
idx = bisect_left(arr, target, key=key)
arr.insert(idx, val)
class TransactionType(str, Enum):
DEPOSIT = "deposit"
WITHDRAWAL = "withdrawal"
@dataclass
class Transaction:
timestamp: int
amount: float
running_sum: float = 0
type: TransactionType = TransactionType.DEPOSIT
def __lt__(self, other: Any) -> bool:
if isinstance(other, Transaction):
return self.timestamp < other.timestamp
return self.timestamp < other
class AccountBalances:
def __init__(self):
self.transactions: list[Transaction] = []
def _insert(self, timestamp: int, amount: float, type_: TransactionType):
"""maintaines sorted invariant"""
if not isinstance(timestamp, int):
raise TypeError("Timestamp must be an integer")
if timestamp < 0:
raise ValueError("Timestamp must be non-negative")
if not isinstance(amount, (int, float)):
raise TypeError("Amount must be a number")
running_sum = amount
idx = bisect_left(self.transactions, timestamp, key=lambda x: x.timestamp)
if idx > 0:
running_sum += self.transactions[-1].running_sum
# Create new transaction
transaction = Transaction(
timestamp=timestamp,
amount=amount,
running_sum=running_sum,
type=type_,
)
self.transactions.insert(idx, transaction)
# Update subsequent running sums
for i in range(idx + 1, len(self.transactions)):
curr_transaction = self.transactions[i]
curr_running_sum = (
self.transactions[i - 1].running_sum + curr_transaction.amount
)
self.transactions[i] = Transaction(
timestamp=curr_transaction.timestamp,
amount=curr_transaction.amount,
running_sum=curr_running_sum,
type=curr_transaction.type,
)
def deposit(self, amount: float, timestamp: int) -> None:
"""Add amount at timestamp"""
if amount < 0:
raise ValueError("Deposit amount must be positive")
self._insert(timestamp, amount, TransactionType.DEPOSIT)
def withdraw(self, amount: float, timestamp: int) -> None:
"""Remove amount at timestamp"""
if amount < 0:
raise ValueError("Withdraw amount must be positive")
self._insert(timestamp, -amount, TransactionType.WITHDRAWAL)
def get_balance(self, timestamp: int) -> int | float:
"""Return balance at timestamp
Returns balance of all transactions with timestamps <= query timestamp
"""
if not isinstance(timestamp, int):
raise TypeError("Timestamp must be an integer")
if timestamp < 0:
raise ValueError("Timestamp must be non-negative")
idx = bisect_right(self.transactions, timestamp, key=lambda x: x.timestamp)
return 0 if idx == 0 else self.transactions[idx - 1].running_sum
def get_transactions_in_range(self, start_time: int, end_time: int) -> list:
start_idx = bisect_left(self.transactions, start_time, key=lambda x: x[0])
end_idx = bisect_right(self.transactions, end_time, key=lambda x: x[0])
return self.transactions[start_idx:end_idx]
def test_bank():
# Example 1: Simple case
balances = AccountBalances()
balances.deposit(100, 1) # Balance becomes 100 at t=1
balances.withdraw(50, 2) # Balance becomes 50 at t=2
assert balances.get_balance(1) == 100 # At t=1
assert balances.get_balance(2) == 50 # At t=2
assert balances.get_balance(3) == 50 # At t=3
# Example 2: Out of order transactions
balances = AccountBalances()
balances.withdraw(30, 2) # Balance becomes -30 at t=2
balances.deposit(100, 1) # Balance becomes 100 at t=1, then 70 at t=2
assert balances.get_balance(1) == 100
assert balances.get_balance(2) == 70
# Example 3: Query at timestamp with no transaction
balances = AccountBalances()
balances.deposit(50, 1)
balances.deposit(50, 3)
assert (
balances.get_balance(2) == 50
) # Should return balance as of last transaction <= 2
def test_account_balances():
# Test 1: Basic operations in order
ab = AccountBalances()
ab.deposit(100, 1)
ab.withdraw(30, 2)
assert ab.get_balance(1) == 100, "Balance at t=1 should be 100"
assert ab.get_balance(2) == 70, "Balance at t=2 should be 70"
assert ab.get_balance(3) == 70, "Balance at t=3 should be 70"
# Test 2: Out of order transactions
ab = AccountBalances()
ab.withdraw(30, 2)
ab.deposit(100, 1)
assert ab.get_balance(1) == 100, "Balance at t=1 should be 100"
assert ab.get_balance(2) == 70, "Balance at t=2 should be 70"
# Test 3: Query between transactions
ab = AccountBalances()
ab.deposit(50, 1)
ab.deposit(50, 3)
assert ab.get_balance(2) == 50, "Balance at t=2 should be 50"
# Test 4: Multiple transactions at same timestamp
ab = AccountBalances()
ab.deposit(100, 1)
ab.deposit(50, 1)
ab.withdraw(30, 1)
assert ab.get_balance(1) == 120, "Balance should include all t=1 transactions"
# Test 5: Query before first transaction
ab = AccountBalances()
ab.deposit(100, 5)
assert ab.get_balance(1) == 0, "Balance before first transaction should be 0"
# Test 6: Large number of transactions
ab = AccountBalances()
transactions = [(i, 10) for i in range(1000)] # 1000 posts
for t, amount in transactions:
ab.deposit(amount, t)
assert ab.get_balance(500) == 5010, "Balance should handle many transactions"
# Test 7: Zero balance after posts and deletes
ab = AccountBalances()
ab.deposit(100, 1)
ab.withdraw(100, 2)
assert ab.get_balance(3) == 0, "Balance should be 0 after equal posts and deletes"
print("All tests passed!")
Dynamic Programming
import numpy as np
def subset_sum(arr: list[int], target):
"""
dp[i][s] = True if the first i items can sum to s
# i-th item can sum to itself
dp[i][s] = arr[i - 1] == s # first item is index 0 in arr
# Once a sum is reached, adding more items doesn't change that
dp[i][s] = dp[i - 1][s]
# When adding a new item xi for a given target, it can sum to s
# if the previous elements can sum to the difference (s - xi) because
# s -xi + xi = s
"""
N = len(arr)
# dp = np.zeros((N + 1, target + 1), dtype=np.dtype("bool"))
dp = np.zeros((N + 1, target + 1))
dp[:, 0] = 1 # Can aways sum to zero
for i in range(1, N + 1):
for j in range(1, target + 1):
if arr[i - 1] <= j:
dp[i, j] = bool(dp[i - 1][j - arr[i - 1]] or dp[i - 1][j])
else:
dp[i][j] = dp[i - 1][j]
return bool(dp[N, target])
# Let's test it
if __name__ == "__main__":
x = [1, 3, 5, 9]
out = subset_sum(x, 4)
print(out)
Graph Algorithms
import contextlib
from collections import deque
from typing import TypeVar
T = TypeVar("T")
def topological_sort(graph: dict[T, list[T]]):
visited: set[T] = set()
path: set[T] = set()
sorted_nodes: list[T] = []
def dfs(node: T):
if node in path:
raise ValueError("Cycle detected")
if node in visited:
return
path.add(node)
for adj in graph[node]:
dfs(adj)
path.remove(node)
visited.add(node)
sorted_nodes.append(node)
for node in graph:
if node not in visited:
dfs(node)
return sorted_nodes[::-1]
def topological_sort_iterative(graph: dict[T, list[T]]):
visited: set[T] = set()
path: set[T] = set()
sorted_nodes: list[T] = []
def dfs(start: T):
stack = deque([start])
while stack:
node = stack.pop()
if node in stack:
raise ValueError("Cycle detected")
if node in visited:
return
for adj in graph[node]:
path.add(adj)
stack.append(adj)
with contextlib.suppress(Exception):
# path.remove(node)
print(f"Removing from Path: {node}")
visited.add(node)
print(f"Adding: {node}")
sorted_nodes.append(node)
for node in graph:
dfs(node)
return sorted_nodes
def dfs_recursive(graph: dict[T, list[T]], node: T, visited: set[T] | None = None):
# Initialize visited set on first call
if visited is None:
visited = set()
if node in visited:
return
visited.add(node)
for adj in graph[node]:
if adj not in visited:
dfs_recursive(graph, adj, visited)
return visited
def dfs_iterative(graph: dict[T, list[T]], start: T, visited: set[T] | None = None):
if visited is None:
visited = set()
stack = deque([start])
while stack:
node = stack.pop()
if node not in visited:
visited.add(node)
for adj in reversed(graph[node]):
if adj not in visited:
stack.append(adj)
return visited
def bfs(graph: dict[T, list[T]], start: T, visited: set[T] | None = None):
if visited is None:
visited = set()
queue = deque([start])
while queue:
node = queue.popleft()
if node not in visited:
visited.add(node)
for adj in graph[node]:
if adj not in visited:
queue.append(adj)
return visited
def has_cycle(graph):
visited = set()
path = set()
def dfs(node):
if node in path:
return True
if node in visited:
return False
path.add(node)
for adj in graph[node]:
if dfs(adj):
return True
path.remove(node)
visited.add(node)
return False
for node in graph:
if node not in visited and dfs(node):
return True
return False
graph = {"5": ["3", "7"], "3": ["2", "4"], "7": ["8"], "2": [], "4": ["8"], "8": []}
print(topological_sort(graph))
print(topological_sort_iterative(graph))
Heap Queue
from dataclasses import dataclass
from typing import Generic, List, Optional, TypeVar
T = TypeVar("T")
def _sift_down(heap: list[T], start_pos: int, pos: int) -> None:
"""
Move an item down in the heap until it finds its correct position.
This is the core operation that maintains the heap invariant:
parent <= children for a min heap. We move down the heap, swapping with
the smaller child until we find the right spot.
Args:
heap: The heap list
start_pos: The starting position in the heap
pos: Current position of item being sifted
"""
new_item = heap[pos]
# Follow the path to a leaf, moving parents down until we find where new_item belongs
while pos > start_pos:
parent_pos = (pos - 1) >> 1 # Same as (pos - 1) // 2 but faster
parent = heap[parent_pos]
if parent <= new_item: # Found the right spot
break
heap[pos] = parent # Move parent down
pos = parent_pos
heap[pos] = new_item
def _sift_up(heap: List[T], pos: int) -> None:
"""
Move an item up in the heap until it finds its correct position.
This operation maintains the heap invariant by moving up the tree,
comparing with children and swapping if necessary.
Args:
heap: The heap list
pos: Position of item being sifted
"""
end_pos = len(heap)
start_pos = pos
new_item = heap[pos]
# Get the left child position
child_pos = (pos << 1) + 1 # Same as 2*pos + 1 but faster
# Move down the heap, swapping with smaller child
while child_pos < end_pos:
right_pos = child_pos + 1
# Find the smaller of the two children
if right_pos < end_pos and heap[right_pos] < heap[child_pos]:
child_pos = right_pos
# Move the smaller child up
heap[pos] = heap[child_pos]
pos = child_pos
child_pos = (pos << 1) + 1
# Put the new item in its final position
heap[pos] = new_item
_sift_down(heap, start_pos, pos)
def heappush(heap: List[T], item: T) -> None:
"""
Push item onto heap, maintaining the heap invariant.
Args:
heap: The heap list
item: Item to add to the heap
"""
heap.append(item) # Add to end
_sift_down(heap, 0, len(heap) - 1) # Restore heap property
def heappop(heap: List[T]) -> T:
"""
Pop and return the smallest item from the heap.
Args:
heap: The heap list
Returns:
The smallest item
Raises:
IndexError: If heap is empty
"""
if not heap:
raise IndexError("Attempting to pop from empty heap")
last_item = heap.pop() # Remove last item
if heap: # If heap not empty
return_item = heap[0] # Get first item
heap[0] = last_item # Move last item to root
_sift_up(heap, 0) # Restore heap property
return return_item
return last_item
def heapify(x: List[T]) -> None:
"""
Transform list into a heap, in-place, in O(len(x)) time.
This works by sifting up each non-leaf node, starting from the last parent
and moving towards the root.
Args:
x: List to convert to heap
"""
n = len(x)
# Start with last parent node: (n-2)//2 for 0-based indexing
for i in reversed(range(n // 2)):
_sift_up(x, i)
def heapreplace(heap: List[T], item: T) -> T:
"""
Pop and return smallest item, and add new item.
This is more efficient than heappop() followed by heappush(),
and can be more appropriate when using a fixed-size heap.
Args:
heap: The heap list
item: Item to add to heap
Returns:
The smallest item
Raises:
IndexError: If heap is empty
"""
if not heap:
raise IndexError("Attempting to replace in empty heap")
return_item = heap[0]
heap[0] = item
_sift_up(heap, 0)
return return_item
def heappushpop(heap: List[T], item: T) -> T:
"""
Push item on heap, then pop and return smallest item.
This is more efficient than separate push then pop when the
pushed item is larger than the smallest item.
Args:
heap: The heap list
item: Item to add to heap
Returns:
The smallest item after the push
"""
if heap and heap[0] < item:
item, heap[0] = heap[0], item
_sift_up(heap, 0)
return item
@dataclass
class Priority:
"""Wrapper class to add priority to any object."""
priority: float
item: T
def __lt__(self, other: "Priority") -> bool:
return self.priority < other.priority
class PriorityQueue(Generic[T]):
"""Min-heap implementation of a priority queue."""
def __init__(self):
self._heap: List[Priority[T]] = []
def push(self, item: T, priority: float) -> None:
"""Add an item with given priority.
Args:
item: The item to store
priority: Lower values have higher priority
"""
entry = Priority(priority, item)
self._heap.append(entry)
self._sift_up(len(self._heap) - 1)
def _get_parent_idx(self, idx: int):
return (idx - 1) >> 1 # same as (idx - 1) //2
def pop(self) -> Optional[T]:
"""Remove and return the highest priority item.
Returns:
The item with lowest priority value, or None if queue is empty
"""
if not self._heap:
return None
result = self._heap[0].item
# Move last element to root and sift down
last_entry = self._heap.pop()
if self._heap:
self._heap[0] = last_entry
self._sift_down(0)
return result
def _sift_up(self, pos: int) -> None:
"""Restore heap property by moving item up."""
entry = self._heap[pos]
# Keep going up while parent has higher priority value
while pos > 0:
parent_pos = self._get_parent_idx(pos)
parent = self._heap[parent_pos]
# If the parent is smaller than the entry,
# we are done
if parent <= entry:
break
# Move the parent to current location
self._heap[pos] = parent
pos = parent_pos # assume the parents pos
self._heap[pos] = entry
def _sift_down(self, pos: int) -> None:
"""Restore heap property by moving item down."""
entry = self._heap[pos]
end_pos = len(self._heap)
while True:
child_pos = 2 * pos + 1
if child_pos >= end_pos:
break
# Find smaller child
right_pos = child_pos + 1
if right_pos < end_pos and self._heap[right_pos] < self._heap[child_pos]:
child_pos = right_pos
child = self._heap[child_pos]
if entry <= child:
break
self._heap[pos] = child
pos = child_pos
self._heap[pos] = entry
LRU Cache
import functools
from collections import OrderedDict
from typing import Any, Callable
class LRUCache:
def __init__(self, capacity: int | None = None):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key: str):
"""Retrieve an item from the cache, moving it to the end (most recently used).
Args:
key: The key to look up
Returns:
The value associated with the key
Raises:
KeyError: If the key isn't in the cache
"""
if key not in self.cache:
raise KeyError(f"Key '{key}' not found in cache")
value = self.cache.get(key)
self.cache.move_to_end(key)
return value
def put(self, key: str, value: Any) -> None:
"""Add an item to the cache, evicting least recently used item if necessary.
Args:
key: The key to store
value: The value to store
"""
if key in self.cache:
self.cache.pop(key)
elif self.capacity is not None and len(self.cache) > self.capacity:
self.cache.popitem(last=False)
self.cache[key] = value
def lru_cache(capacity: int | None = None):
"""Decorator that caches function results using LRU replacement policy.
Args:
capacity: Maximum number of results to cache
Returns:
Decorated function with caching
"""
def decorator(func: Callable):
cache = LRUCache(capacity)
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = str((args, tuple(sorted(kwargs.items()))))
try:
return cache.get(key)
except KeyError:
value = func(*args, **kwargs)
cache.put(key, value)
return value
return wrapper
return decorator
Redistribute Load
"""
Given two arrays, serverCapacity, and serverLoad, each of size n, they represent the resource requirements or capacities of servers and the workload distribution across the servers. The total resource consumption is the sum of server Capacity[i] * serverLoad[i] (0 ≤ i < n).
The task is to rearrange the array serverload, redistributing the load across servers to minimize the total resource consumption, ensuring their efficient utilization. Return this optimal arrangement. In scenarios with multiple solutions, return the lexicographically smallest array of serverLoad.
Example:
```
serverCapacity = [4, 5, 6]
serverLoad = [1, 2, 3]
```
The task is to rearrange the elements in the array serverLoad to minimize the sum,
sum(server Capacity[i]* serverLoad[i])
if serverLoad = [3, 2, 1], the total resources are
calculated as follows: 4 * 3 + 5 * 2 + 6 * 1 = 28. This
arrangement yields the lowest sum of resources.
"""
from typing import Iterable
def redistribute_load(server_capacity, server_load):
"""
Notes:
- need to match maximum capacity with minimum load
"""
sorted_inds = sorted(range(len(server_capacity)), key=lambda i: server_capacity[i])
sorted_load = sorted(server_load, reverse=True)
return [sorted_load[i] for i in sorted_inds]
def dot_product(a: Iterable[float | int], b: Iterable[float | int]):
return sum(aa * bb for aa, bb in zip(a, b))
def test_min_load():
server_capacity = [4, 5, 6]
server_load = [1, 2, 3]
consumption = dot_product(server_capacity, server_load)
assert consumption == 32
new_load = redistribute_load(server_capacity, server_load)
consumption = dot_product(server_capacity, new_load)
assert consumption == 28
test_min_load()
Time Map
from collections import defaultdict
from typing import Any, Callable
import pytest
class TimeMap:
def __init__(self):
self.store = defaultdict(list)
def set(self, key: str, value: Any, timestamp: int) -> None:
# Append (timestamp, value) tuple
# List remains sorted because timestamps are strictly increasing
self.store[key].append((timestamp, value))
def get(self, key: str, timestamp: int) -> Any:
if key not in self.store:
return ""
timestamps_and_vals = self.store[key]
# idx = bisect.bisect_right(timestamps_and_vals, (timestamp, float("inf")))
# OR
idx = self._bisect_right(
timestamps_and_vals,
timestamp,
key=lambda x: x[0] if isinstance(x, tuple) else x,
)
return "" if idx == 0 else timestamps_and_vals[idx - 1][1]
def _bisect_right(self, arr, target, key: Callable | None = None):
def key_func(x):
return x if key is None else key(x)
left, right = 0, len(arr)
while left < right:
mid = (left + right) >> 1
val = key_func(arr[mid])
if val <= target:
left = mid + 1
else:
right = mid
return left
@pytest.fixture
def time_map():
return TimeMap()
def test_case(time_map):
calls = ["set", "get", "get", "set", "get", "get"]
inputs = [
["foo", "bar", 1],
["foo", 1],
["foo", 3],
["foo", "bar2", 4],
["foo", 4],
["foo", 5],
]
expected = [None, "bar", "bar", None, "bar2", "bar2"]
# sourcery skip: no-loop-in-tests
for call, input, expect in zip(calls, inputs, expected):
f = getattr(time_map, call)
assert f(*input) == expect
def test_time_map():
time_map = TimeMap()
test_case(time_map)
print("PASSED")
Utils
from collections.abc import Callable
from operator import le, lt
from typing import Any, Literal, Protocol, TypeVar
class Comparable(Protocol):
def __lt__(self, other: Any) -> bool: ...
def __le__(self, other: Any) -> bool: ...
CT = TypeVar("CT", bound=Comparable)
def mergesort(arr: list[CT]):
def merge(left: list[CT], right: list[CT]) -> list[CT]:
"""O(n) To do merging"""
result = []
i, j = 0, 0
while i < len(left) and j < len(right):
if left[i] < right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
# Add remaining elements
result.extend(left[i:])
result.extend(right[j:])
return result
if len(arr) <= 1:
return arr
# Divide and conquer O(log n)
mid = len(arr) // 2
left = mergesort(arr[:mid])
right = mergesort(arr[mid:])
return merge(left, right)
def bisect(
arr: list[CT],
target: CT,
side: Literal["left", "right"] = "left",
key: Callable[[CT], CT] | None = None,
):
"""
If side == "right:
Returns a partition ip such that
all(elem <= target for elem in arr[lo: ip]) == True
all(elem > target for elem in arr[ip: hi]) == True
else:
Returns a partition ip such that
all(elem < target for elem in arr[lo: ip]) == True
all(elem >= target for elem in arr[ip: hi]) == True
"""
def key_func(x: CT) -> CT:
return x if key is None else key(x)
compare = lt if side == "left" else le
left, right = 0, len(arr)
while left < right:
mid = (left + right) >> 1
if compare(key_func(arr[mid]), target):
left = mid + 1
else:
right = mid
return left
def bisect_left(arr: list, target: Any, key: Callable | None = None):
return bisect(arr, target, side="left", key=key)
def bisect_right(arr: list, target: Any, key: Callable | None = None):
return bisect(arr, target, side="right", key=key)