Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Solutions: 39 — System of systems

Exercise 1 — Audit cadence

A typical simulator’s cadence audit:

systemcadencepattern
motionevery tickstandard
food_spawnevery tickstandard
next_eventevery tickstandard
apply_eat/repro/starveevery tickstandard
cleanupevery tickstandard
sort_for_localityevery 50 ticksperiodic
snapshotevery 1000 ticksperiodic
log_flushevery 10 ticksperiodic
inspect (debug)every tick (debug-on) / never (debug-off)conditional
path_plannerper-creature, on demand, with deadlineanytime
spatial_searchper-creature, time-slicedtime-sliced
strategy_aiout-of-loop, ~1 Hz updateout-of-loop

The cadences that aren’t “every tick” are candidates for one of the chapter’s three patterns. Any system that’s currently capped (e.g. “only the first 100 path-finds per tick”) is a candidate for the anytime pattern.

Exercise 2 — Anytime path-finder

import time, random, numpy as np

def greedy_route(start, goal, obstacles):
    """Trivial baseline: take a straight line, ignoring obstacles."""
    return [start, goal]

def improve(route, obstacles):
    """Local search: try perturbing a random waypoint."""
    if len(route) < 3:
        # add a waypoint
        mid = ((route[0][0] + route[-1][0]) / 2 + random.uniform(-1, 1),
               (route[0][1] + route[-1][1]) / 2 + random.uniform(-1, 1))
        return [route[0], mid, route[-1]]
    i = random.randint(1, len(route) - 2)
    perturbed = list(route)
    perturbed[i] = (perturbed[i][0] + random.uniform(-0.5, 0.5),
                    perturbed[i][1] + random.uniform(-0.5, 0.5))
    return perturbed

def score(route, obstacles):
    """Lower is better. Penalises length and obstacle collisions."""
    length = sum(((route[i+1][0] - route[i][0])**2 + (route[i+1][1] - route[i][1])**2)**0.5
                 for i in range(len(route)-1))
    collisions = sum(1 for waypoint in route for ox, oy, r in obstacles
                     if (waypoint[0]-ox)**2 + (waypoint[1]-oy)**2 < r**2)
    return length + collisions * 100

def plan_route(start, goal, obstacles, deadline: float):
    best = greedy_route(start, goal, obstacles)
    best_score = score(best, obstacles)
    while time.perf_counter() < deadline:
        candidate = improve(best, obstacles)
        s = score(candidate, obstacles)
        if s < best_score:
            best = candidate
            best_score = s
    return best, best_score

# At 5 ms deadline
deadline = time.perf_counter() + 0.005
r1, s1 = plan_route((0, 0), (10, 10), [(5, 5, 1)], deadline)

# At 50 ms deadline  
deadline = time.perf_counter() + 0.050
r2, s2 = plan_route((0, 0), (10, 10), [(5, 5, 1)], deadline)

print(f"5ms:  {len(r1)} waypoints, score={s1:.2f}")
print(f"50ms: {len(r2)} waypoints, score={s2:.2f}")

Quality improves with deadline. Plot score-vs-deadline by repeating at 1 ms, 5 ms, 10 ms, 50 ms, 100 ms, 500 ms — typically logarithmic improvement (each doubling of time buys roughly the same quality increment).

from dataclasses import dataclass
import numpy as np

@dataclass
class SpatialSearch:
    target_x: float
    target_y: float
    cells: list[np.ndarray]
    cursor: int = 0
    best_id: int = -1
    best_dist: float = float("inf")
    done: bool = False

    def step(self, world, max_cells: int):
        end = min(self.cursor + max_cells, len(self.cells))
        for i in range(self.cursor, end):
            for cid in self.cells[i]:
                d2 = (world.pos_x[cid] - self.target_x)**2 + \
                     (world.pos_y[cid] - self.target_y)**2
                if d2 < self.best_dist:
                    self.best_id = int(cid)
                    self.best_dist = float(d2)
        self.cursor = end
        if self.cursor >= len(self.cells):
            self.done = True
        return self.done

# Run a single-pass search
single = SpatialSearch(target_x=5.0, target_y=5.0, cells=world.cells)
single.step(world, max_cells=len(world.cells))

# Run a time-sliced search, 10 cells per tick
sliced = SpatialSearch(target_x=5.0, target_y=5.0, cells=world.cells)
while not sliced.done:
    sliced.step(world, max_cells=10)

assert single.best_id == sliced.best_id, "time-sliced version must match single-pass"

The time-sliced result is identical to the single-pass result. The work is the same; the granularity differs. The simulator can call step(max_cells=budget_cells) every tick with a budget computed from the remaining tick time.

Exercise 4 — Out-of-loop AI

import multiprocessing, time, queue

def ai_planner_worker(snapshot_q, result_q):
    while True:
        try:
            snapshot = snapshot_q.get(timeout=1.0)
        except queue.Empty:
            continue
        if snapshot is None:
            break
        time.sleep(5.0)                                # simulate 5-second compute
        result_q.put(("strategy_update", "new_strategy_from_snapshot"))

if __name__ == "__main__":
    snapshot_q = multiprocessing.Queue(maxsize=1)
    result_q   = multiprocessing.Queue()
    worker = multiprocessing.Process(target=ai_planner_worker, args=(snapshot_q, result_q))
    worker.start()

    for tick in range(200):                            # ~7 seconds at 30 Hz
        if tick % 30 == 0:                             # every 1 second
            try:
                snapshot_q.put_nowait({"tick": tick})
            except queue.Full:
                pass                                    # AI still working on previous one

        # Check for strategy updates without blocking
        try:
            event = result_q.get_nowait()
            print(f"tick {tick}: received {event}")
        except queue.Empty:
            pass

        time.sleep(1/30)                                # tick

    snapshot_q.put(None)
    worker.join()

The simulator’s tick continues at 30 Hz. Snapshots dispatch every second. The AI takes 5 seconds; its result arrives in the result queue ~5 seconds late, and the simulator picks it up on the next polling cycle. No blocking. The tick rate is preserved exactly.

This is the architecture for “AI as a side process,” “remote pricing service,” “GPU model inference” — any work that takes longer than a tick. The queue is the seam.

Exercise 5 — Mixed cadence

def tick(world, current_tick):
    motion(world)
    food_spawn(world)
    next_event(world)
    # parallel block
    apply_eat(world); apply_reproduce(world); apply_starve(world)
    cleanup(world)
    if current_tick % 50 == 0:
        sort_for_locality(world)
    if current_tick % 1000 == 0:
        snapshot(world, f"snap_{current_tick}.npz")
    # Out-of-loop AI: dispatched separately, results enter via in_queue

Run twice with the same seed; hash after 1000 ticks. The hashes must match. The mixed cadences don’t break determinism because:

  • The cadence (every 50, every 1000) is deterministic given the tick number.
  • The out-of-loop AI’s result enters via the queue (§35), which is deterministic if the recorded queue matches.

For the AI to be deterministic across runs, the snapshot it processes and the time it takes must match. In practice this means either (a) testing with mocked AI that returns deterministic results based on snapshot content, or (b) accepting that real AI introduces stochasticity at the input queue and treating it as “another input source” rather than part of the simulator’s determinism guarantee.

Exercise 6 — The scale-up arithmetic

Suppose your simulator at full scale needs 1B creatures × 32 bytes/row = 32 GB of state, 30 Hz tick, 16-core parallelism.

Modern boxes:

spectypical 2026
RAM64 GB - 1 TB
cores16-128
memory channels2-8 (DDR5)
NVMe storage4-30 TB

32 GB fits comfortably on any 64 GB box. 16 cores fits any modern desktop. The simulator can run on a single high-end laptop or a mid-range workstation. No distributed system needed.

For a workload of 100B creatures (3.2 TB state): now you’ve left single-machine territory. But the cost of one machine with 4 TB RAM (server class, ~$10K) is much less than the engineering cost of distributing the simulator. Rent or buy the bigger box.

The threshold where distribution becomes mandatory: when one machine can’t physically hold the workload (~10 TB+ for most cloud providers, even larger on bare metal). Below that, every reasonable problem fits on one box.

Exercise 7 — Anytime under varying budget (stretch)

def plan_route_with_budget(start, goal, obstacles, remaining_ms: float):
    deadline = time.perf_counter() + remaining_ms / 1000
    return plan_route(start, goal, obstacles, deadline)

# In the tick:
def tick(world, current_tick):
    tick_start = time.perf_counter()
    motion(world); next_event(world); apply_eat(world); ...
    elapsed = (time.perf_counter() - tick_start) * 1000
    remaining_ms = max(1.0, 33.0 - elapsed)            # 30 Hz budget = 33 ms
    
    # Spend remaining budget on path planning
    if world.creatures_needing_paths:
        for creature in world.creatures_needing_paths:
            path = plan_route_with_budget(
                creature.pos, creature.goal, world.obstacles, remaining_ms)
            creature.path = path
            remaining_ms = max(1.0, 33.0 - (time.perf_counter() - tick_start) * 1000)

Some ticks have plenty of budget left (the rest of the work was cheap); some have very little (a heavy cleanup happened). The path-finder takes whatever’s available.

Plot the path quality over a 1000-tick run. The line is jittery — quality varies tick-to-tick with the available budget — but the trend is positive: even the worst tick still produces a valid path; better ticks produce better paths; the simulator never blocks.

This is the production pattern for AI in real-time systems: spend whatever budget you have, never block, never miss a deadline. The simulator’s overall frame rate is preserved; AI quality is a function of the time it gets.