Skip to content

Sharing State Between Processes

Processes have isolated memory spaces. To share data between processes, you need special mechanisms provided by the multiprocessing module.


The Challenge: Memory Isolation

from multiprocessing import Process

# This does NOT work as expected!
counter = 0

def increment():
    global counter
    for _ in range(100000):
        counter += 1

if __name__ == "__main__":
    p1 = Process(target=increment)
    p2 = Process(target=increment)

    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print(f"Counter: {counter}")  # Still 0!
    # Each process has its own copy of counter

Shared Value

For sharing simple scalar values between processes.

Basic Usage

from multiprocessing import Process, Value

def increment(shared_counter, n):
    for _ in range(n):
        shared_counter.value += 1

if __name__ == "__main__":
    # Type codes: 'i' = int, 'd' = double, 'c' = char
    counter = Value('i', 0)

    p1 = Process(target=increment, args=(counter, 100000))
    p2 = Process(target=increment, args=(counter, 100000))

    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print(f"Counter: {counter.value}")
    # May be less than 200000 — race condition!

With Lock (Thread-Safe)

from multiprocessing import Process, Value, Lock

def increment(shared_counter, lock, n):
    for _ in range(n):
        with lock:
            shared_counter.value += 1

if __name__ == "__main__":
    counter = Value('i', 0)
    lock = Lock()

    p1 = Process(target=increment, args=(counter, lock, 100000))
    p2 = Process(target=increment, args=(counter, lock, 100000))

    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print(f"Counter: {counter.value}")  # Always 200000

Value with Built-in Lock

from multiprocessing import Process, Value

def increment(shared_counter, n):
    for _ in range(n):
        with shared_counter.get_lock():
            shared_counter.value += 1

if __name__ == "__main__":
    counter = Value('i', 0)  # Has built-in lock

    p1 = Process(target=increment, args=(counter, 100000))
    p2 = Process(target=increment, args=(counter, 100000))

    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print(f"Counter: {counter.value}")  # Always 200000

Type Codes

Code C Type Python Type
'b' signed char int
'B' unsigned char int
'i' signed int int
'I' unsigned int int
'l' signed long int
'L' unsigned long int
'f' float float
'd' double float

Shared Array

For sharing sequences of values.

Basic Array

from multiprocessing import Process, Array

def fill_array(shared_array, start_value):
    for i in range(len(shared_array)):
        shared_array[i] = start_value + i

if __name__ == "__main__":
    # 'd' = double, size 5
    arr = Array('d', 5)

    p = Process(target=fill_array, args=(arr, 10))
    p.start()
    p.join()

    print(list(arr))  # [10.0, 11.0, 12.0, 13.0, 14.0]

Initialize with Values

from multiprocessing import Array

# Initialize with values
arr1 = Array('i', [1, 2, 3, 4, 5])
print(list(arr1))  # [1, 2, 3, 4, 5]

# Initialize with size (zeros)
arr2 = Array('d', 10)
print(list(arr2))  # [0.0, 0.0, ..., 0.0]

Concurrent Array Access

from multiprocessing import Process, Array, Lock

def increment_array(arr, lock, amount):
    with lock:
        for i in range(len(arr)):
            arr[i] += amount

if __name__ == "__main__":
    arr = Array('i', [0, 0, 0, 0, 0])
    lock = Lock()

    processes = []
    for _ in range(10):
        p = Process(target=increment_array, args=(arr, lock, 1))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(list(arr))  # [10, 10, 10, 10, 10]

Queue for Communication

Queue is the recommended way to pass data between processes.

Basic Queue

from multiprocessing import Process, Queue

def producer(q, items):
    for item in items:
        q.put(item)
        print(f"Produced: {item}")
    q.put(None)  # Sentinel

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Consumed: {item}")

if __name__ == "__main__":
    q = Queue()

    p1 = Process(target=producer, args=(q, [1, 2, 3, 4, 5]))
    p2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

Collecting Results with Queue

from multiprocessing import Process, Queue

def worker(task_id, q):
    result = task_id ** 2
    q.put((task_id, result))

if __name__ == "__main__":
    result_queue = Queue()
    processes = []

    for i in range(5):
        p = Process(target=worker, args=(i, result_queue))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    # Collect results
    results = {}
    while not result_queue.empty():
        task_id, result = result_queue.get()
        results[task_id] = result

    print(results)  # {0: 0, 1: 1, 2: 4, 3: 9, 4: 16}

Pipe for Two-Way Communication

Pipe creates a connection between two processes.

Basic Pipe

from multiprocessing import Process, Pipe

def sender(conn):
    conn.send("Hello from sender!")
    conn.send([1, 2, 3])
    conn.close()

def receiver(conn):
    msg1 = conn.recv()
    msg2 = conn.recv()
    print(f"Received: {msg1}")
    print(f"Received: {msg2}")

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()

    p1 = Process(target=sender, args=(child_conn,))
    p2 = Process(target=receiver, args=(parent_conn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

Bidirectional Pipe

from multiprocessing import Process, Pipe

def ping_pong(conn, name):
    for i in range(3):
        msg = conn.recv()
        print(f"{name} received: {msg}")
        conn.send(f"{name} reply {i}")
    conn.close()

if __name__ == "__main__":
    conn1, conn2 = Pipe()

    p = Process(target=ping_pong, args=(conn2, "Worker"))
    p.start()

    # Main process
    for i in range(3):
        conn1.send(f"Main message {i}")
        reply = conn1.recv()
        print(f"Main received: {reply}")

    p.join()

Manager for Complex Objects

Manager provides a way to share complex Python objects (lists, dicts) between processes.

Shared List

from multiprocessing import Process, Manager

def worker(shared_list, item):
    shared_list.append(item)

if __name__ == "__main__":
    with Manager() as manager:
        shared_list = manager.list()

        processes = []
        for i in range(5):
            p = Process(target=worker, args=(shared_list, i))
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        print(list(shared_list))  # [0, 1, 2, 3, 4] (order may vary)

Shared Dictionary

from multiprocessing import Process, Manager

def worker(shared_dict, key, value):
    shared_dict[key] = value

if __name__ == "__main__":
    with Manager() as manager:
        shared_dict = manager.dict()

        processes = []
        for i in range(5):
            p = Process(target=worker, args=(shared_dict, f"key_{i}", i ** 2))
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        print(dict(shared_dict))
        # {'key_0': 0, 'key_1': 1, 'key_2': 4, 'key_3': 9, 'key_4': 16}

Shared Namespace

from multiprocessing import Process, Manager

def worker(ns):
    ns.x += 1
    ns.items.append(ns.x)

if __name__ == "__main__":
    with Manager() as manager:
        ns = manager.Namespace()
        ns.x = 0
        ns.items = manager.list()

        processes = []
        for _ in range(5):
            p = Process(target=worker, args=(ns,))
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        print(f"x = {ns.x}")
        print(f"items = {list(ns.items)}")

Manager Performance Warning

Manager objects are slower than Value/Array because they use a separate server process for synchronization:

# Fast: Direct shared memory
counter = Value('i', 0)  # Direct memory access

# Slower: Proxy through manager process
with Manager() as manager:
    counter = manager.Value('i', 0)  # Proxied access

Shared Memory (Python 3.8+)

SharedMemory provides raw shared memory for high-performance data sharing.

Basic SharedMemory

from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np

def worker(shm_name, shape, dtype):
    # Attach to existing shared memory
    existing_shm = SharedMemory(name=shm_name)
    array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)

    # Modify array
    array[:] = array * 2

    existing_shm.close()

if __name__ == "__main__":
    # Create array in shared memory
    arr = np.array([1, 2, 3, 4, 5], dtype=np.float64)

    shm = SharedMemory(create=True, size=arr.nbytes)
    shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
    shared_arr[:] = arr[:]  # Copy data

    print(f"Before: {shared_arr}")

    p = Process(target=worker, args=(shm.name, arr.shape, arr.dtype))
    p.start()
    p.join()

    print(f"After: {shared_arr}")

    shm.close()
    shm.unlink()  # Clean up

SharedMemory with NumPy

from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np

def compute_in_place(shm_name, shape, dtype, start, end):
    """Process a slice of shared array."""
    shm = SharedMemory(name=shm_name)
    arr = np.ndarray(shape, dtype=dtype, buffer=shm.buf)

    arr[start:end] = arr[start:end] ** 2

    shm.close()

if __name__ == "__main__":
    size = 1000000
    arr = np.arange(size, dtype=np.float64)

    # Create shared memory
    shm = SharedMemory(create=True, size=arr.nbytes)
    shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
    shared_arr[:] = arr

    # Process in parallel
    num_workers = 4
    chunk_size = size // num_workers

    processes = []
    for i in range(num_workers):
        start = i * chunk_size
        end = start + chunk_size if i < num_workers - 1 else size
        p = Process(target=compute_in_place, 
                   args=(shm.name, arr.shape, arr.dtype, start, end))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"First 10: {shared_arr[:10]}")

    shm.close()
    shm.unlink()

Comparison of Sharing Methods

Method Speed Use Case Complexity
Value Fast Single scalar Simple
Array Fast Fixed-size arrays Simple
Queue Medium Message passing Simple
Pipe Medium Two-process communication Simple
Manager Slow Complex objects (list, dict) Medium
SharedMemory Fastest Large arrays, NumPy Complex

Best Practices

1. Prefer Message Passing (Queue)

# Good: Clear data flow
result_queue = Queue()
p = Process(target=worker, args=(data, result_queue))

2. Use Locks for Shared State

# Good: Protected access
with shared_value.get_lock():
    shared_value.value += 1

3. Minimize Shared State

# Better: Pass data, return results
def worker(input_data, result_queue):
    result = process(input_data)
    result_queue.put(result)

4. Clean Up SharedMemory

# Always unlink shared memory
try:
    # ... use shared memory ...
finally:
    shm.close()
    shm.unlink()

Key Takeaways

  • Processes have isolated memory — global variables aren't shared
  • Use Value and Array for simple shared data (fast)
  • Use Queue for message passing between processes (recommended)
  • Use Pipe for two-way communication between two processes
  • Use Manager for complex objects like lists and dicts (slower)
  • Use SharedMemory for high-performance NumPy array sharing
  • Always use locks when multiple processes modify shared data
  • Prefer message passing over shared state when possible