Skip to content

Back to the Basics

Published: at 09:32 AM in 1 min readSuggest Changes

Table of Contents

Open Table of Contents

🚧 Warning: Article Under Construction 🚧

For now, we just have a grab-bag of random exercises. Probably come back later…

Memory Manager

import bisect
from dataclasses import dataclass


@dataclass
class MemoryBlock:
    start: int
    size: int

    def __lt__(self, other):
        if isinstance(other, MemoryBlock):
            if self.size == other.size:
                return self.start < other.start
            return self.size < other.size
        return self < other


class MemoryManager:
    def __init__(self, N: int):
        """Initializes the memory manager with a total memory size of N."""
        self.N = N
        self.free_blocks: list[MemoryBlock] = [MemoryBlock(0, N)]
        self.allocated_blocks = {}

    def malloc(self, k: int) -> int:
        """Allocates a block of memory of size k and returns the starting address.

        Args:
            k: The size of the block to allocate

        Returns:
            The starting address of the allocated block

        Raises:
            ValueError: If the allocation size is invalid
            MemoryError: If there's not enough contiguous space
        """
        if k <= 0 or k > self.N:
            raise ValueError("Invalid allocation size")

        # Find the smallest free block that can accommodate the request
        index = bisect.bisect_left(self.free_blocks, MemoryBlock(0, k))
        if index == len(self.free_blocks):
            raise MemoryError("Not enough contiguous space")

        block = self.free_blocks.pop(index)
        start = block.start

        # If there's remaining space in the block, add it back to free_blocks
        if block.size > k:
            remaining = MemoryBlock(start + k, block.size - k)
            bisect.insort(self.free_blocks, remaining)

        # Add to allocated blocks
        self.allocated_blocks[start] = k

        return start

    def free(self, ptr: int):
        """Frees the block of memory starting at address ptr.

        Also merges adjacent free blocks to form larger free blocks.

        Args:
            ptr: The starting address of the block to free

        Raises:
            ValueError: If the pointer is invalid or the block is already freed
        """
        if ptr not in self.allocated_blocks:
            raise ValueError("Invalid pointer or already freed")

        size = self.allocated_blocks.pop(ptr)
        new_block = MemoryBlock(ptr, size)

        # Merge with adjacent free blocks if possible
        prev_index = bisect.bisect_right(self.free_blocks, MemoryBlock(ptr, 0)) - 1
        next_index = prev_index + 1

        if prev_index >= 0:
            prev_block = self.free_blocks[prev_index]
            if prev_block.start + prev_block.size == ptr:
                new_block.start = prev_block.start
                new_block.size += prev_block.size
                self.free_blocks.pop(prev_index)
                next_index -= 1

        if next_index < len(self.free_blocks):
            next_block = self.free_blocks[next_index]
            if new_block.start + new_block.size == next_block.start:
                new_block.size += next_block.size
                self.free_blocks.pop(next_index)

        bisect.insort(self.free_blocks, new_block)

    def __str__(self):
        memory = ["0"] * self.N
        for start, size in self.allocated_blocks.items():
            for i in range(start, start + size):
                memory[i] = "1"
        return "".join(memory)


# Test the implementation
if __name__ == "__main__":
    m = MemoryManager(1000)
    print("Initial memory state:", m)

    a = m.malloc(100)
    print(f"Allocated 100 bytes at {a}. Memory state:", m)

    b = m.malloc(500)
    print(f"Allocated 500 bytes at {b}. Memory state:", m)

    try:
        c = m.malloc(950)  # This should raise an error
    except MemoryError as e:
        print("Error:", e)

    m.free(b)
    print("Freed memory at b. Memory state:", m)

    m.free(a)
    print("Freed memory at a. Memory state:", m)

    try:
        m.free(a)  # This should raise an error
    except ValueError as e:
        print("Error:", e)

    c = m.malloc(950)
    print(f"Allocated 950 bytes at {c}. Memory state:", m)
Test Memory Manager
import pytest
from memory_manager import MemoryManager


@pytest.fixture
def memory_manager():
    return MemoryManager(1000)


def test_initialization(memory_manager):
    assert str(memory_manager) == "0" * 1000
    assert len(memory_manager.free_blocks) == 1
    assert memory_manager.free_blocks[0].start == 0
    assert memory_manager.free_blocks[0].size == 1000


def test_malloc_basic(memory_manager):
    ptr = memory_manager.malloc(100)
    assert ptr == 0
    assert str(memory_manager) == "1" * 100 + "0" * 900


def test_malloc_multiple(memory_manager):
    ptr1 = memory_manager.malloc(100)
    ptr2 = memory_manager.malloc(200)
    ptr3 = memory_manager.malloc(300)

    assert ptr1 == 0
    assert ptr2 == 100
    assert ptr3 == 300
    assert str(memory_manager) == "1" * 600 + "0" * 400


def test_malloc_full_memory(memory_manager):
    ptr = memory_manager.malloc(1000)
    assert ptr == 0
    assert str(memory_manager) == "1" * 1000


def test_malloc_invalid_size(memory_manager):
    with pytest.raises(ValueError):
        memory_manager.malloc(0)

    with pytest.raises(ValueError):
        memory_manager.malloc(-100)

    with pytest.raises(ValueError):
        memory_manager.malloc(1001)


def test_malloc_out_of_memory(memory_manager):
    memory_manager.malloc(600)
    memory_manager.malloc(300)

    with pytest.raises(MemoryError):
        memory_manager.malloc(101)


def test_free_basic(memory_manager):
    ptr = memory_manager.malloc(100)
    memory_manager.free(ptr)
    assert str(memory_manager) == "0" * 1000


def test_free_multiple(memory_manager):
    ptr1 = memory_manager.malloc(100)
    ptr2 = memory_manager.malloc(200)
    ptr3 = memory_manager.malloc(300)

    memory_manager.free(ptr2)
    assert str(memory_manager) == "1" * 100 + "0" * 200 + "1" * 300 + "0" * 400

    memory_manager.free(ptr1)
    assert str(memory_manager) == "0" * 300 + "1" * 300 + "0" * 400

    memory_manager.free(ptr3)
    assert str(memory_manager) == "0" * 1000


def test_free_invalid_pointer(memory_manager):
    with pytest.raises(ValueError):
        memory_manager.free(100)

    ptr = memory_manager.malloc(100)
    memory_manager.free(ptr)

    with pytest.raises(ValueError):
        memory_manager.free(ptr)


def test_malloc_after_free(memory_manager):
    ptr1 = memory_manager.malloc(300)
    ptr2 = memory_manager.malloc(400)
    memory_manager.free(ptr1)

    ptr3 = memory_manager.malloc(200)
    assert ptr3 == 0
    assert str(memory_manager) == "1" * 200 + "0" * 100 + "1" * 400 + "0" * 300


def test_malloc_best_fit(memory_manager):
    ptr1 = memory_manager.malloc(200)
    ptr2 = memory_manager.malloc(300)
    ptr3 = memory_manager.malloc(100)

    memory_manager.free(ptr1)
    memory_manager.free(ptr3)

    ptr4 = memory_manager.malloc(150)
    print(memory_manager.free_blocks)
    assert ptr4 == 0
    assert (
        str(memory_manager) == "1" * 150 + "0" * 50 + "1" * 300 + "0" * 500
    )


def test_fragmentation_and_coalescing(memory_manager):
    ptr1 = memory_manager.malloc(100)
    ptr2 = memory_manager.malloc(200)
    ptr3 = memory_manager.malloc(300)
    ptr4 = memory_manager.malloc(100)

    memory_manager.free(ptr2)
    memory_manager.free(ptr4)
    assert len(memory_manager.free_blocks) == 2

    memory_manager.free(ptr3)
    assert len(memory_manager.free_blocks) == 1
    assert memory_manager.free_blocks[0].size == 600

    memory_manager.free(ptr1)
    assert len(memory_manager.free_blocks) == 1
    assert memory_manager.free_blocks[0].size == 1000


def test_stress_test(memory_manager):
    pointers = []
    for _ in range(100):
        try:
            ptr = memory_manager.malloc(10)
            pointers.append(ptr)
        except MemoryError:
            break

    assert len(pointers) > 0

    for ptr in pointers[::2]:
        memory_manager.free(ptr)

    try:
        memory_manager.malloc(15)
    except MemoryError:
        pytest.fail("Should be able to allocate 15 bytes after freeing")

    for ptr in pointers[1::2]:
        memory_manager.free(ptr)

    assert str(memory_manager) == "0" * 1000

Utils

from collections.abc import Callable
from operator import le, lt
from typing import Any, Literal, Protocol, TypeVar


class Comparable(Protocol):
    def __lt__(self, other: Any) -> bool: ...
    def __le__(self, other: Any) -> bool: ...


CT = TypeVar("CT", bound=Comparable)


def mergesort(arr: list[CT]):
    def merge(left: list[CT], right: list[CT]) -> list[CT]:
        """O(n) To do merging"""
        result = []
        i, j = 0, 0

        while i < len(left) and j < len(right):
            if left[i] < right[j]:
                result.append(left[i])
                i += 1
            else:
                result.append(right[j])
                j += 1
        # Add remaining elements
        result.extend(left[i:])
        result.extend(right[j:])
        return result

    if len(arr) <= 1:
        return arr
    # Divide and conquer O(log n)
    mid = len(arr) // 2
    left = mergesort(arr[:mid])
    right = mergesort(arr[mid:])
    return merge(left, right)


def bisect(
    arr: list[CT],
    target: CT,
    side: Literal["left", "right"] = "left",
    key: Callable[[CT], CT] | None = None,
):
    """

    If side == "right:
        Returns a partition ip such that
        all(elem <= target for elem in arr[lo: ip]) == True
        all(elem >  target for elem in arr[ip: hi]) == True
    else:
        Returns a partition ip such that
        all(elem < target for elem in arr[lo: ip]) == True
        all(elem >= target for elem in arr[ip: hi]) == True
    """

    def key_func(x: CT) -> CT:
        return x if key is None else key(x)

    compare = lt if side == "left" else le

    left, right = 0, len(arr)
    while left < right:
        mid = (left + right) >> 1
        if compare(key_func(arr[mid]), target):
            left = mid + 1
        else:
            right = mid

    return left


def bisect_left(arr: list, target: Any, key: Callable | None = None):
    return bisect(arr, target, side="left", key=key)


def bisect_right(arr: list, target: Any, key: Callable | None = None):
    return bisect(arr, target, side="right", key=key)

Architecture Generator

from typing import List, Set, Tuple
import unittest

"""
Design a function that takes:
- input_size: Size of input layer
- output_size: Size of output layer
- hidden_layers: Maximum number of hidden layers allowed
- min_neurons: Minimum neurons per hidden layer
- max_neurons: Maximum neurons per hidden layer

Return all valid neural network architectures where:
- Each hidden layer must have neurons between min_neurons and max_neurons
- Number of hidden layers can be 1 to hidden_layers
- Each subsequent layer must be smaller than or equal to the previous layer
"""


def generate_architectures(
    input_size,
    output_size,
    max_hidden_layers,
    min_neurons,
    max_neurons,
):
    """Return all valid neural network architectures.

    Computational Complexity:

    O((max_neurons - min_neurons) ^ max_hidden_layers)

    Space Complexity:

    """
    architectures = []
    # a valid architecture should have 2 <= len(arch) <= hidden_layers + 2

    def generate_helper(
        prev_size,
        layers_left,
        min_neurons,
        max_neurons,
        curr_arch,
    ):
        if layers_left == 0:
            # If we have no more hidden layers to add, we can add the output layer
            architectures.append(curr_arch + [output_size])
            return

        prev_neurons = curr_arch[-1]
        for num_neurons in range(min(prev_size, max_neurons), min_neurons - 1, -1):
            if num_neurons <= prev_neurons:
                generate_helper(
                    num_neurons,
                    layers_left - 1,
                    min_neurons,
                    max_neurons,
                    curr_arch=curr_arch + [num_neurons],
                )

    for num_hidden_layers in range(1, max_hidden_layers + 1):
        # Generate all possible archs. given a set number of hidden layers
        generate_helper(
            input_size,
            num_hidden_layers,
            min_neurons,
            max_neurons,
            curr_arch=[input_size],  # seed with input layer
        )

    return sorted(architectures)


# Example:
# generate_architectures(784, 10, 2, 32, 128)
# Should return something like:
# [[784, 128, 10], [784, 64, 10], [784, 128, 64, 10], ...]
# out = generate_architectures(784, 10, 3, 32, 64)
# rich.print(out)


class TestNNArchitectureGenerator(unittest.TestCase):
    def test_basic_case(self):
        """Test a simple case with clear constraints"""
        architectures = generate_architectures(
            input_size=4,
            output_size=2,
            max_hidden_layers=1,
            min_neurons=2,
            max_neurons=3,
        )
        expected = [[4, 2, 2], [4, 3, 2]]
        self.assertEqual(architectures, expected)

    def test_multiple_hidden_layers(self):
        """Test generation with multiple hidden layers allowed"""
        architectures = generate_architectures(
            input_size=6,
            output_size=2,
            max_hidden_layers=2,
            min_neurons=3,
            max_neurons=4,
        )
        # Verify basic properties
        for arch in architectures:
            # Check input/output sizes
            self.assertEqual(arch[0], 6)
            self.assertEqual(arch[-1], 2)
            # Check hidden layer constraints
            for hidden_size in arch[1:-1]:
                self.assertTrue(3 <= hidden_size <= 4)
            # Check monotonic decrease in hidden layers
            for i in range(1, len(arch) - 2):
                self.assertTrue(arch[i] >= arch[i + 1])

    def test_edge_cases(self):
        """Test edge cases and constraints"""
        # Test with min_neurons = max_neurons
        architectures = generate_architectures(
            input_size=4,
            output_size=2,
            max_hidden_layers=2,
            min_neurons=3,
            max_neurons=3,
        )
        for arch in architectures:
            for hidden_size in arch[1:-1]:
                self.assertEqual(hidden_size, 3)

        # Test with single possible hidden layer
        architectures = generate_architectures(
            input_size=4,
            output_size=2,
            max_hidden_layers=1,
            min_neurons=3,
            max_neurons=3,
        )
        self.assertEqual(architectures, [[4, 3, 2]])


if __name__ == "__main__":
    # Run tests
    unittest.main(argv=["first-arg-is-ignored"], exit=False)

    # Example usage
    print("\nExample architectures:")
    architectures = generate_architectures(
        input_size=784,  # MNIST input size
        output_size=10,  # MNIST output size
        max_hidden_layers=2,
        min_neurons=32,
        max_neurons=128,
    )
    print(f"Found {len(architectures)} valid architectures")
    print("First few architectures:", architectures[:5])

Bank Account

from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable


def bisect_left(arr: list, target: Any, key: Callable | None = None):
    def key_func(x):
        return x if key is None else key(x)

    left, right = 0, len(arr)

    while left < right:
        mid = (left + right) >> 1
        if key_func(arr[mid]) <= target:
            left = mid + 1
        else:
            right = mid
    return left


def bisect_right(arr: list, target: Any, key: Callable | None = None):
    def key_func(x):
        return x if key is None else key(x)

    left, right = 0, len(arr)

    while left < right:
        mid = (left + right) >> 1
        if key_func(arr[mid]) <= target:
            left = mid + 1
        else:
            right = mid
    return left


def insort_left_(arr: list, target: Any, val: Any, key: Callable | None = None):
    idx = bisect_left(arr, target, key=key)
    arr.insert(idx, val)


class TransactionType(str, Enum):
    DEPOSIT = "deposit"
    WITHDRAWAL = "withdrawal"


@dataclass
class Transaction:
    timestamp: int
    amount: float
    running_sum: float = 0
    type: TransactionType = TransactionType.DEPOSIT

    def __lt__(self, other: Any) -> bool:
        if isinstance(other, Transaction):
            return self.timestamp < other.timestamp
        return self.timestamp < other


class AccountBalances:
    def __init__(self):
        self.transactions: list[Transaction] = []

    def _insert(self, timestamp: int, amount: float, type_: TransactionType):
        """maintaines sorted invariant"""
        if not isinstance(timestamp, int):
            raise TypeError("Timestamp must be an integer")
        if timestamp < 0:
            raise ValueError("Timestamp must be non-negative")
        if not isinstance(amount, (int, float)):
            raise TypeError("Amount must be a number")

        running_sum = amount
        idx = bisect_left(self.transactions, timestamp, key=lambda x: x.timestamp)
        if idx > 0:
            running_sum += self.transactions[-1].running_sum

        # Create new transaction
        transaction = Transaction(
            timestamp=timestamp,
            amount=amount,
            running_sum=running_sum,
            type=type_,
        )

        self.transactions.insert(idx, transaction)

        # Update subsequent running sums
        for i in range(idx + 1, len(self.transactions)):
            curr_transaction = self.transactions[i]
            curr_running_sum = (
                self.transactions[i - 1].running_sum + curr_transaction.amount
            )
            self.transactions[i] = Transaction(
                timestamp=curr_transaction.timestamp,
                amount=curr_transaction.amount,
                running_sum=curr_running_sum,
                type=curr_transaction.type,
            )

    def deposit(self, amount: float, timestamp: int) -> None:
        """Add amount at timestamp"""
        if amount < 0:
            raise ValueError("Deposit amount must be positive")
        self._insert(timestamp, amount, TransactionType.DEPOSIT)

    def withdraw(self, amount: float, timestamp: int) -> None:
        """Remove amount at timestamp"""
        if amount < 0:
            raise ValueError("Withdraw amount must be positive")
        self._insert(timestamp, -amount, TransactionType.WITHDRAWAL)

    def get_balance(self, timestamp: int) -> int | float:
        """Return balance at timestamp
        Returns balance of all transactions with timestamps <= query timestamp
        """
        if not isinstance(timestamp, int):
            raise TypeError("Timestamp must be an integer")
        if timestamp < 0:
            raise ValueError("Timestamp must be non-negative")

        idx = bisect_right(self.transactions, timestamp, key=lambda x: x.timestamp)
        return 0 if idx == 0 else self.transactions[idx - 1].running_sum

    def get_transactions_in_range(self, start_time: int, end_time: int) -> list:
        start_idx = bisect_left(self.transactions, start_time, key=lambda x: x[0])
        end_idx = bisect_right(self.transactions, end_time, key=lambda x: x[0])
        return self.transactions[start_idx:end_idx]


def test_bank():
    # Example 1: Simple case
    balances = AccountBalances()
    balances.deposit(100, 1)  # Balance becomes 100 at t=1
    balances.withdraw(50, 2)  # Balance becomes 50 at t=2
    assert balances.get_balance(1) == 100  # At t=1
    assert balances.get_balance(2) == 50  # At t=2
    assert balances.get_balance(3) == 50  # At t=3

    # Example 2: Out of order transactions
    balances = AccountBalances()
    balances.withdraw(30, 2)  # Balance becomes -30 at t=2
    balances.deposit(100, 1)  # Balance becomes 100 at t=1, then 70 at t=2
    assert balances.get_balance(1) == 100
    assert balances.get_balance(2) == 70

    # Example 3: Query at timestamp with no transaction
    balances = AccountBalances()
    balances.deposit(50, 1)
    balances.deposit(50, 3)
    assert (
        balances.get_balance(2) == 50
    )  # Should return balance as of last transaction <= 2


def test_account_balances():
    # Test 1: Basic operations in order
    ab = AccountBalances()
    ab.deposit(100, 1)
    ab.withdraw(30, 2)
    assert ab.get_balance(1) == 100, "Balance at t=1 should be 100"
    assert ab.get_balance(2) == 70, "Balance at t=2 should be 70"
    assert ab.get_balance(3) == 70, "Balance at t=3 should be 70"

    # Test 2: Out of order transactions
    ab = AccountBalances()
    ab.withdraw(30, 2)
    ab.deposit(100, 1)
    assert ab.get_balance(1) == 100, "Balance at t=1 should be 100"
    assert ab.get_balance(2) == 70, "Balance at t=2 should be 70"

    # Test 3: Query between transactions
    ab = AccountBalances()
    ab.deposit(50, 1)
    ab.deposit(50, 3)
    assert ab.get_balance(2) == 50, "Balance at t=2 should be 50"

    # Test 4: Multiple transactions at same timestamp
    ab = AccountBalances()
    ab.deposit(100, 1)
    ab.deposit(50, 1)
    ab.withdraw(30, 1)
    assert ab.get_balance(1) == 120, "Balance should include all t=1 transactions"

    # Test 5: Query before first transaction
    ab = AccountBalances()
    ab.deposit(100, 5)
    assert ab.get_balance(1) == 0, "Balance before first transaction should be 0"

    # Test 6: Large number of transactions
    ab = AccountBalances()
    transactions = [(i, 10) for i in range(1000)]  # 1000 posts
    for t, amount in transactions:
        ab.deposit(amount, t)
    assert ab.get_balance(500) == 5010, "Balance should handle many transactions"

    # Test 7: Zero balance after posts and deletes
    ab = AccountBalances()
    ab.deposit(100, 1)
    ab.withdraw(100, 2)
    assert ab.get_balance(3) == 0, "Balance should be 0 after equal posts and deletes"

    print("All tests passed!")

Dynamic Programming

import numpy as np


def subset_sum(arr: list[int], target):
    """

    dp[i][s] = True if the first i items can sum to s

    # i-th item can sum to itself
    dp[i][s] = arr[i - 1] == s  # first item is index 0 in arr

    # Once a sum is reached, adding more items doesn't change that
    dp[i][s] = dp[i - 1][s]

    # When adding a new item xi for a given target, it can sum to s
    # if the previous elements can sum to the difference (s - xi) because
    # s -xi + xi = s

    """
    N = len(arr)
    # dp = np.zeros((N + 1, target + 1), dtype=np.dtype("bool"))
    dp = np.zeros((N + 1, target + 1))

    dp[:, 0] = 1  # Can aways sum to zero

    for i in range(1, N + 1):
        for j in range(1, target + 1):
            if arr[i - 1] <= j:
                dp[i, j] = bool(dp[i - 1][j - arr[i - 1]] or dp[i - 1][j])
            else:
                dp[i][j] = dp[i - 1][j]

    return bool(dp[N, target])


# Let's test it
if __name__ == "__main__":
    x = [1, 3, 5, 9]
    out = subset_sum(x, 4)
    print(out)

Graph Algorithms

import contextlib
from collections import deque
from typing import TypeVar


T = TypeVar("T")


def topological_sort(graph: dict[T, list[T]]):
    visited: set[T] = set()
    path: set[T] = set()
    sorted_nodes: list[T] = []

    def dfs(node: T):
        if node in path:
            raise ValueError("Cycle detected")
        if node in visited:
            return

        path.add(node)
        for adj in graph[node]:
            dfs(adj)

        path.remove(node)
        visited.add(node)
        sorted_nodes.append(node)

    for node in graph:
        if node not in visited:
            dfs(node)

    return sorted_nodes[::-1]


def topological_sort_iterative(graph: dict[T, list[T]]):
    visited: set[T] = set()
    path: set[T] = set()
    sorted_nodes: list[T] = []

    def dfs(start: T):
        stack = deque([start])

        while stack:
            node = stack.pop()

            if node in stack:
                raise ValueError("Cycle detected")
            if node in visited:
                return

            for adj in graph[node]:
                path.add(adj)
                stack.append(adj)

            with contextlib.suppress(Exception):
                # path.remove(node)
                print(f"Removing from Path: {node}")
                visited.add(node)
                print(f"Adding: {node}")
                sorted_nodes.append(node)

    for node in graph:
        dfs(node)

    return sorted_nodes


def dfs_recursive(graph: dict[T, list[T]], node: T, visited: set[T] | None = None):
    # Initialize visited set on first call
    if visited is None:
        visited = set()

    if node in visited:
        return
    visited.add(node)
    for adj in graph[node]:
        if adj not in visited:
            dfs_recursive(graph, adj, visited)
    return visited


def dfs_iterative(graph: dict[T, list[T]], start: T, visited: set[T] | None = None):
    if visited is None:
        visited = set()

    stack = deque([start])

    while stack:
        node = stack.pop()

        if node not in visited:
            visited.add(node)
            for adj in reversed(graph[node]):
                if adj not in visited:
                    stack.append(adj)

    return visited


def bfs(graph: dict[T, list[T]], start: T, visited: set[T] | None = None):
    if visited is None:
        visited = set()

    queue = deque([start])

    while queue:
        node = queue.popleft()

        if node not in visited:
            visited.add(node)
            for adj in graph[node]:
                if adj not in visited:
                    queue.append(adj)

    return visited


def has_cycle(graph):
    visited = set()
    path = set()

    def dfs(node):
        if node in path:
            return True
        if node in visited:
            return False

        path.add(node)
        for adj in graph[node]:
            if dfs(adj):
                return True

        path.remove(node)
        visited.add(node)
        return False

    for node in graph:
        if node not in visited and dfs(node):
            return True
    return False


graph = {"5": ["3", "7"], "3": ["2", "4"], "7": ["8"], "2": [], "4": ["8"], "8": []}
print(topological_sort(graph))
print(topological_sort_iterative(graph))

Heap Queue

from dataclasses import dataclass
from typing import Generic, List, Optional, TypeVar


T = TypeVar("T")


def _sift_down(heap: list[T], start_pos: int, pos: int) -> None:
    """
    Move an item down in the heap until it finds its correct position.

    This is the core operation that maintains the heap invariant:
    parent <= children for a min heap. We move down the heap, swapping with
    the smaller child until we find the right spot.

    Args:
        heap: The heap list
        start_pos: The starting position in the heap
        pos: Current position of item being sifted
    """
    new_item = heap[pos]

    # Follow the path to a leaf, moving parents down until we find where new_item belongs
    while pos > start_pos:
        parent_pos = (pos - 1) >> 1  # Same as (pos - 1) // 2 but faster
        parent = heap[parent_pos]
        if parent <= new_item:  # Found the right spot
            break
        heap[pos] = parent  # Move parent down
        pos = parent_pos

    heap[pos] = new_item


def _sift_up(heap: List[T], pos: int) -> None:
    """
    Move an item up in the heap until it finds its correct position.

    This operation maintains the heap invariant by moving up the tree,
    comparing with children and swapping if necessary.

    Args:
        heap: The heap list
        pos: Position of item being sifted
    """
    end_pos = len(heap)
    start_pos = pos
    new_item = heap[pos]

    # Get the left child position
    child_pos = (pos << 1) + 1  # Same as 2*pos + 1 but faster

    # Move down the heap, swapping with smaller child
    while child_pos < end_pos:
        right_pos = child_pos + 1

        # Find the smaller of the two children
        if right_pos < end_pos and heap[right_pos] < heap[child_pos]:
            child_pos = right_pos

        # Move the smaller child up
        heap[pos] = heap[child_pos]
        pos = child_pos
        child_pos = (pos << 1) + 1

    # Put the new item in its final position
    heap[pos] = new_item
    _sift_down(heap, start_pos, pos)


def heappush(heap: List[T], item: T) -> None:
    """
    Push item onto heap, maintaining the heap invariant.

    Args:
        heap: The heap list
        item: Item to add to the heap
    """
    heap.append(item)  # Add to end
    _sift_down(heap, 0, len(heap) - 1)  # Restore heap property


def heappop(heap: List[T]) -> T:
    """
    Pop and return the smallest item from the heap.

    Args:
        heap: The heap list

    Returns:
        The smallest item

    Raises:
        IndexError: If heap is empty
    """
    if not heap:
        raise IndexError("Attempting to pop from empty heap")

    last_item = heap.pop()  # Remove last item

    if heap:  # If heap not empty
        return_item = heap[0]  # Get first item
        heap[0] = last_item  # Move last item to root
        _sift_up(heap, 0)  # Restore heap property
        return return_item
    return last_item


def heapify(x: List[T]) -> None:
    """
    Transform list into a heap, in-place, in O(len(x)) time.

    This works by sifting up each non-leaf node, starting from the last parent
    and moving towards the root.

    Args:
        x: List to convert to heap
    """
    n = len(x)
    # Start with last parent node: (n-2)//2 for 0-based indexing
    for i in reversed(range(n // 2)):
        _sift_up(x, i)


def heapreplace(heap: List[T], item: T) -> T:
    """
    Pop and return smallest item, and add new item.

    This is more efficient than heappop() followed by heappush(),
    and can be more appropriate when using a fixed-size heap.

    Args:
        heap: The heap list
        item: Item to add to heap

    Returns:
        The smallest item

    Raises:
        IndexError: If heap is empty
    """
    if not heap:
        raise IndexError("Attempting to replace in empty heap")

    return_item = heap[0]
    heap[0] = item
    _sift_up(heap, 0)
    return return_item


def heappushpop(heap: List[T], item: T) -> T:
    """
    Push item on heap, then pop and return smallest item.

    This is more efficient than separate push then pop when the
    pushed item is larger than the smallest item.

    Args:
        heap: The heap list
        item: Item to add to heap

    Returns:
        The smallest item after the push
    """
    if heap and heap[0] < item:
        item, heap[0] = heap[0], item
        _sift_up(heap, 0)
    return item


@dataclass
class Priority:
    """Wrapper class to add priority to any object."""

    priority: float
    item: T

    def __lt__(self, other: "Priority") -> bool:
        return self.priority < other.priority


class PriorityQueue(Generic[T]):
    """Min-heap implementation of a priority queue."""

    def __init__(self):
        self._heap: List[Priority[T]] = []

    def push(self, item: T, priority: float) -> None:
        """Add an item with given priority.

        Args:
            item: The item to store
            priority: Lower values have higher priority
        """
        entry = Priority(priority, item)
        self._heap.append(entry)
        self._sift_up(len(self._heap) - 1)

    def _get_parent_idx(self, idx: int):
        return (idx - 1) >> 1  # same as (idx - 1) //2

    def pop(self) -> Optional[T]:
        """Remove and return the highest priority item.

        Returns:
            The item with lowest priority value, or None if queue is empty
        """
        if not self._heap:
            return None

        result = self._heap[0].item

        # Move last element to root and sift down
        last_entry = self._heap.pop()
        if self._heap:
            self._heap[0] = last_entry
            self._sift_down(0)

        return result

    def _sift_up(self, pos: int) -> None:
        """Restore heap property by moving item up."""
        entry = self._heap[pos]

        # Keep going up while parent has higher priority value
        while pos > 0:
            parent_pos = self._get_parent_idx(pos)
            parent = self._heap[parent_pos]

            # If the parent is smaller than the entry,
            # we are done
            if parent <= entry:
                break

            # Move the parent to current location
            self._heap[pos] = parent
            pos = parent_pos  # assume the parents pos

        self._heap[pos] = entry

    def _sift_down(self, pos: int) -> None:
        """Restore heap property by moving item down."""
        entry = self._heap[pos]
        end_pos = len(self._heap)

        while True:
            child_pos = 2 * pos + 1
            if child_pos >= end_pos:
                break

            # Find smaller child
            right_pos = child_pos + 1
            if right_pos < end_pos and self._heap[right_pos] < self._heap[child_pos]:
                child_pos = right_pos

            child = self._heap[child_pos]
            if entry <= child:
                break

            self._heap[pos] = child
            pos = child_pos

        self._heap[pos] = entry

LRU Cache

import functools
from collections import OrderedDict
from typing import Any, Callable


class LRUCache:
    def __init__(self, capacity: int | None = None):
        self.capacity = capacity
        self.cache = OrderedDict()

    def get(self, key: str):
        """Retrieve an item from the cache, moving it to the end (most recently used).

        Args:
            key: The key to look up

        Returns:
            The value associated with the key

        Raises:
            KeyError: If the key isn't in the cache
        """
        if key not in self.cache:
            raise KeyError(f"Key '{key}' not found in cache")

        value = self.cache.get(key)
        self.cache.move_to_end(key)
        return value

    def put(self, key: str, value: Any) -> None:
        """Add an item to the cache, evicting least recently used item if necessary.

        Args:
                key: The key to store
                value: The value to store
        """

        if key in self.cache:
            self.cache.pop(key)
        elif self.capacity is not None and len(self.cache) > self.capacity:
            self.cache.popitem(last=False)

        self.cache[key] = value


def lru_cache(capacity: int | None = None):
    """Decorator that caches function results using LRU replacement policy.

    Args:
        capacity: Maximum number of results to cache

    Returns:
        Decorated function with caching
    """

    def decorator(func: Callable):
        cache = LRUCache(capacity)

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            key = str((args, tuple(sorted(kwargs.items()))))
            try:
                return cache.get(key)
            except KeyError:
                value = func(*args, **kwargs)
                cache.put(key, value)
                return value

        return wrapper

    return decorator

Redistribute Load

"""
Given two arrays, serverCapacity, and serverLoad, each of size n, they represent the resource requirements or capacities of servers and the workload distribution across the servers. The total resource consumption is the sum of server Capacity[i] * serverLoad[i] (0 ≤ i < n).

The task is to rearrange the array serverload, redistributing the load across servers to minimize the total resource consumption, ensuring their efficient utilization. Return this optimal arrangement. In scenarios with multiple solutions, return the lexicographically smallest array of serverLoad.

Example:
```
serverCapacity = [4, 5, 6]
serverLoad = [1, 2, 3]
```
The task is to rearrange the elements in the array serverLoad to minimize the sum,
sum(server Capacity[i]* serverLoad[i])

if serverLoad = [3, 2, 1], the total resources are
calculated as follows: 4 * 3 + 5 * 2 + 6 * 1 = 28. This
arrangement yields the lowest sum of resources.

"""

from typing import Iterable


def redistribute_load(server_capacity, server_load):
    """
    Notes:
    - need to match maximum capacity with minimum load
    """
    sorted_inds = sorted(range(len(server_capacity)), key=lambda i: server_capacity[i])
    sorted_load = sorted(server_load, reverse=True)
    return [sorted_load[i] for i in sorted_inds]


def dot_product(a: Iterable[float | int], b: Iterable[float | int]):
    return sum(aa * bb for aa, bb in zip(a, b))


def test_min_load():
    server_capacity = [4, 5, 6]
    server_load = [1, 2, 3]
    consumption = dot_product(server_capacity, server_load)
    assert consumption == 32
    new_load = redistribute_load(server_capacity, server_load)
    consumption = dot_product(server_capacity, new_load)
    assert consumption == 28


test_min_load()

Time Map

from collections import defaultdict
from typing import Any, Callable

import pytest


class TimeMap:
    def __init__(self):
        self.store = defaultdict(list)

    def set(self, key: str, value: Any, timestamp: int) -> None:
        # Append (timestamp, value) tuple
        # List remains sorted because timestamps are strictly increasing
        self.store[key].append((timestamp, value))

    def get(self, key: str, timestamp: int) -> Any:
        if key not in self.store:
            return ""

        timestamps_and_vals = self.store[key]
        # idx = bisect.bisect_right(timestamps_and_vals, (timestamp, float("inf")))
        # OR
        idx = self._bisect_right(
            timestamps_and_vals,
            timestamp,
            key=lambda x: x[0] if isinstance(x, tuple) else x,
        )
        return "" if idx == 0 else timestamps_and_vals[idx - 1][1]

    def _bisect_right(self, arr, target, key: Callable | None = None):
        def key_func(x):
            return x if key is None else key(x)

        left, right = 0, len(arr)
        while left < right:
            mid = (left + right) >> 1
            val = key_func(arr[mid])
            if val <= target:
                left = mid + 1
            else:
                right = mid

        return left


@pytest.fixture
def time_map():
    return TimeMap()


def test_case(time_map):
    calls = ["set", "get", "get", "set", "get", "get"]
    inputs = [
        ["foo", "bar", 1],
        ["foo", 1],
        ["foo", 3],
        ["foo", "bar2", 4],
        ["foo", 4],
        ["foo", 5],
    ]
    expected = [None, "bar", "bar", None, "bar2", "bar2"]
    # sourcery skip: no-loop-in-tests
    for call, input, expect in zip(calls, inputs, expected):
        f = getattr(time_map, call)
        assert f(*input) == expect


def test_time_map():
    time_map = TimeMap()
    test_case(time_map)
    print("PASSED")

Utils

from collections.abc import Callable
from operator import le, lt
from typing import Any, Literal, Protocol, TypeVar


class Comparable(Protocol):
    def __lt__(self, other: Any) -> bool: ...
    def __le__(self, other: Any) -> bool: ...


CT = TypeVar("CT", bound=Comparable)


def mergesort(arr: list[CT]):
    def merge(left: list[CT], right: list[CT]) -> list[CT]:
        """O(n) To do merging"""
        result = []
        i, j = 0, 0

        while i < len(left) and j < len(right):
            if left[i] < right[j]:
                result.append(left[i])
                i += 1
            else:
                result.append(right[j])
                j += 1
        # Add remaining elements
        result.extend(left[i:])
        result.extend(right[j:])
        return result

    if len(arr) <= 1:
        return arr
    # Divide and conquer O(log n)
    mid = len(arr) // 2
    left = mergesort(arr[:mid])
    right = mergesort(arr[mid:])
    return merge(left, right)


def bisect(
    arr: list[CT],
    target: CT,
    side: Literal["left", "right"] = "left",
    key: Callable[[CT], CT] | None = None,
):
    """

    If side == "right:
        Returns a partition ip such that
        all(elem <= target for elem in arr[lo: ip]) == True
        all(elem >  target for elem in arr[ip: hi]) == True
    else:
        Returns a partition ip such that
        all(elem < target for elem in arr[lo: ip]) == True
        all(elem >= target for elem in arr[ip: hi]) == True
    """

    def key_func(x: CT) -> CT:
        return x if key is None else key(x)

    compare = lt if side == "left" else le

    left, right = 0, len(arr)
    while left < right:
        mid = (left + right) >> 1
        if compare(key_func(arr[mid]), target):
            left = mid + 1
        else:
            right = mid

    return left


def bisect_left(arr: list, target: Any, key: Callable | None = None):
    return bisect(arr, target, side="left", key=key)


def bisect_right(arr: list, target: Any, key: Callable | None = None):
    return bisect(arr, target, side="right", key=key)

Previous Post
A Straight Line Through Linear Algebra
Next Post
Softmax to the Max