Sharing State Between Processes¶
Processes have isolated memory spaces. To share data between processes, you need special mechanisms provided by the multiprocessing module.
The Challenge: Memory Isolation¶
from multiprocessing import Process
# This does NOT work as expected!
counter = 0
def increment():
global counter
for _ in range(100000):
counter += 1
if __name__ == "__main__":
p1 = Process(target=increment)
p2 = Process(target=increment)
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Counter: {counter}") # Still 0!
# Each process has its own copy of counter
Shared Value¶
For sharing simple scalar values between processes.
Basic Usage¶
from multiprocessing import Process, Value
def increment(shared_counter, n):
for _ in range(n):
shared_counter.value += 1
if __name__ == "__main__":
# Type codes: 'i' = int, 'd' = double, 'c' = char
counter = Value('i', 0)
p1 = Process(target=increment, args=(counter, 100000))
p2 = Process(target=increment, args=(counter, 100000))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Counter: {counter.value}")
# May be less than 200000 — race condition!
With Lock (Thread-Safe)¶
from multiprocessing import Process, Value, Lock
def increment(shared_counter, lock, n):
for _ in range(n):
with lock:
shared_counter.value += 1
if __name__ == "__main__":
counter = Value('i', 0)
lock = Lock()
p1 = Process(target=increment, args=(counter, lock, 100000))
p2 = Process(target=increment, args=(counter, lock, 100000))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Counter: {counter.value}") # Always 200000
Value with Built-in Lock¶
from multiprocessing import Process, Value
def increment(shared_counter, n):
for _ in range(n):
with shared_counter.get_lock():
shared_counter.value += 1
if __name__ == "__main__":
counter = Value('i', 0) # Has built-in lock
p1 = Process(target=increment, args=(counter, 100000))
p2 = Process(target=increment, args=(counter, 100000))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Counter: {counter.value}") # Always 200000
Type Codes¶
| Code | C Type | Python Type |
|---|---|---|
'b' |
signed char | int |
'B' |
unsigned char | int |
'i' |
signed int | int |
'I' |
unsigned int | int |
'l' |
signed long | int |
'L' |
unsigned long | int |
'f' |
float | float |
'd' |
double | float |
Shared Array¶
For sharing sequences of values.
Basic Array¶
from multiprocessing import Process, Array
def fill_array(shared_array, start_value):
for i in range(len(shared_array)):
shared_array[i] = start_value + i
if __name__ == "__main__":
# 'd' = double, size 5
arr = Array('d', 5)
p = Process(target=fill_array, args=(arr, 10))
p.start()
p.join()
print(list(arr)) # [10.0, 11.0, 12.0, 13.0, 14.0]
Initialize with Values¶
from multiprocessing import Array
# Initialize with values
arr1 = Array('i', [1, 2, 3, 4, 5])
print(list(arr1)) # [1, 2, 3, 4, 5]
# Initialize with size (zeros)
arr2 = Array('d', 10)
print(list(arr2)) # [0.0, 0.0, ..., 0.0]
Concurrent Array Access¶
from multiprocessing import Process, Array, Lock
def increment_array(arr, lock, amount):
with lock:
for i in range(len(arr)):
arr[i] += amount
if __name__ == "__main__":
arr = Array('i', [0, 0, 0, 0, 0])
lock = Lock()
processes = []
for _ in range(10):
p = Process(target=increment_array, args=(arr, lock, 1))
processes.append(p)
p.start()
for p in processes:
p.join()
print(list(arr)) # [10, 10, 10, 10, 10]
Queue for Communication¶
Queue is the recommended way to pass data between processes.
Basic Queue¶
from multiprocessing import Process, Queue
def producer(q, items):
for item in items:
q.put(item)
print(f"Produced: {item}")
q.put(None) # Sentinel
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"Consumed: {item}")
if __name__ == "__main__":
q = Queue()
p1 = Process(target=producer, args=(q, [1, 2, 3, 4, 5]))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
Collecting Results with Queue¶
from multiprocessing import Process, Queue
def worker(task_id, q):
result = task_id ** 2
q.put((task_id, result))
if __name__ == "__main__":
result_queue = Queue()
processes = []
for i in range(5):
p = Process(target=worker, args=(i, result_queue))
processes.append(p)
p.start()
for p in processes:
p.join()
# Collect results
results = {}
while not result_queue.empty():
task_id, result = result_queue.get()
results[task_id] = result
print(results) # {0: 0, 1: 1, 2: 4, 3: 9, 4: 16}
Pipe for Two-Way Communication¶
Pipe creates a connection between two processes.
Basic Pipe¶
from multiprocessing import Process, Pipe
def sender(conn):
conn.send("Hello from sender!")
conn.send([1, 2, 3])
conn.close()
def receiver(conn):
msg1 = conn.recv()
msg2 = conn.recv()
print(f"Received: {msg1}")
print(f"Received: {msg2}")
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
Bidirectional Pipe¶
from multiprocessing import Process, Pipe
def ping_pong(conn, name):
for i in range(3):
msg = conn.recv()
print(f"{name} received: {msg}")
conn.send(f"{name} reply {i}")
conn.close()
if __name__ == "__main__":
conn1, conn2 = Pipe()
p = Process(target=ping_pong, args=(conn2, "Worker"))
p.start()
# Main process
for i in range(3):
conn1.send(f"Main message {i}")
reply = conn1.recv()
print(f"Main received: {reply}")
p.join()
Manager for Complex Objects¶
Manager provides a way to share complex Python objects (lists, dicts) between processes.
Shared List¶
from multiprocessing import Process, Manager
def worker(shared_list, item):
shared_list.append(item)
if __name__ == "__main__":
with Manager() as manager:
shared_list = manager.list()
processes = []
for i in range(5):
p = Process(target=worker, args=(shared_list, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(list(shared_list)) # [0, 1, 2, 3, 4] (order may vary)
Shared Dictionary¶
from multiprocessing import Process, Manager
def worker(shared_dict, key, value):
shared_dict[key] = value
if __name__ == "__main__":
with Manager() as manager:
shared_dict = manager.dict()
processes = []
for i in range(5):
p = Process(target=worker, args=(shared_dict, f"key_{i}", i ** 2))
processes.append(p)
p.start()
for p in processes:
p.join()
print(dict(shared_dict))
# {'key_0': 0, 'key_1': 1, 'key_2': 4, 'key_3': 9, 'key_4': 16}
Shared Namespace¶
from multiprocessing import Process, Manager
def worker(ns):
ns.x += 1
ns.items.append(ns.x)
if __name__ == "__main__":
with Manager() as manager:
ns = manager.Namespace()
ns.x = 0
ns.items = manager.list()
processes = []
for _ in range(5):
p = Process(target=worker, args=(ns,))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"x = {ns.x}")
print(f"items = {list(ns.items)}")
Manager Performance Warning¶
Manager objects are slower than Value/Array because they use a separate server process for synchronization:
# Fast: Direct shared memory
counter = Value('i', 0) # Direct memory access
# Slower: Proxy through manager process
with Manager() as manager:
counter = manager.Value('i', 0) # Proxied access
Shared Memory (Python 3.8+)¶
SharedMemory provides raw shared memory for high-performance data sharing.
Basic SharedMemory¶
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np
def worker(shm_name, shape, dtype):
# Attach to existing shared memory
existing_shm = SharedMemory(name=shm_name)
array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# Modify array
array[:] = array * 2
existing_shm.close()
if __name__ == "__main__":
# Create array in shared memory
arr = np.array([1, 2, 3, 4, 5], dtype=np.float64)
shm = SharedMemory(create=True, size=arr.nbytes)
shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
shared_arr[:] = arr[:] # Copy data
print(f"Before: {shared_arr}")
p = Process(target=worker, args=(shm.name, arr.shape, arr.dtype))
p.start()
p.join()
print(f"After: {shared_arr}")
shm.close()
shm.unlink() # Clean up
SharedMemory with NumPy¶
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np
def compute_in_place(shm_name, shape, dtype, start, end):
"""Process a slice of shared array."""
shm = SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
arr[start:end] = arr[start:end] ** 2
shm.close()
if __name__ == "__main__":
size = 1000000
arr = np.arange(size, dtype=np.float64)
# Create shared memory
shm = SharedMemory(create=True, size=arr.nbytes)
shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
shared_arr[:] = arr
# Process in parallel
num_workers = 4
chunk_size = size // num_workers
processes = []
for i in range(num_workers):
start = i * chunk_size
end = start + chunk_size if i < num_workers - 1 else size
p = Process(target=compute_in_place,
args=(shm.name, arr.shape, arr.dtype, start, end))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"First 10: {shared_arr[:10]}")
shm.close()
shm.unlink()
Comparison of Sharing Methods¶
| Method | Speed | Use Case | Complexity |
|---|---|---|---|
Value |
Fast | Single scalar | Simple |
Array |
Fast | Fixed-size arrays | Simple |
Queue |
Medium | Message passing | Simple |
Pipe |
Medium | Two-process communication | Simple |
Manager |
Slow | Complex objects (list, dict) | Medium |
SharedMemory |
Fastest | Large arrays, NumPy | Complex |
Best Practices¶
1. Prefer Message Passing (Queue)¶
# Good: Clear data flow
result_queue = Queue()
p = Process(target=worker, args=(data, result_queue))
2. Use Locks for Shared State¶
# Good: Protected access
with shared_value.get_lock():
shared_value.value += 1
3. Minimize Shared State¶
# Better: Pass data, return results
def worker(input_data, result_queue):
result = process(input_data)
result_queue.put(result)
4. Clean Up SharedMemory¶
# Always unlink shared memory
try:
# ... use shared memory ...
finally:
shm.close()
shm.unlink()
Key Takeaways¶
- Processes have isolated memory — global variables aren't shared
- Use
ValueandArrayfor simple shared data (fast) - Use
Queuefor message passing between processes (recommended) - Use
Pipefor two-way communication between two processes - Use
Managerfor complex objects like lists and dicts (slower) - Use
SharedMemoryfor high-performance NumPy array sharing - Always use locks when multiple processes modify shared data
- Prefer message passing over shared state when possible