Skip to content

Thread-Local Storage

threading.local() provides storage where each thread has its own independent copy of data.

Mental Model

threading.local() gives every thread its own private namespace that looks like a shared object. Each thread reads and writes the same attribute names, but they see different values -- like each employee having a personal desk drawer inside a shared desk. It is the simplest way to avoid sharing state without passing data through every function call.

The Problem: Shared State

Without thread-local storage, all threads share the same global variables:

```python import threading import time

Global variable - shared by all threads

request_id = None

def handle_request(req_id): global request_id request_id = req_id time.sleep(0.1) # Simulate processing # BUG: request_id may have been overwritten by another thread! print(f"Processing request {request_id}")

threads = [] for i in range(5): t = threading.Thread(target=handle_request, args=(i,)) threads.append(t) t.start()

for t in threads: t.join()

Output might show wrong request IDs!

```

Solution: threading.local()

Each thread gets its own copy of the data:

```python import threading import time

Thread-local storage

local_data = threading.local()

def handle_request(req_id): local_data.request_id = req_id time.sleep(0.1) # Each thread sees its own request_id print(f"Thread {threading.current_thread().name}: request {local_data.request_id}")

threads = [] for i in range(5): t = threading.Thread(target=handle_request, args=(i,)) threads.append(t) t.start()

for t in threads: t.join()

Output: Each thread shows its correct request_id

```

How It Works

```python import threading

local = threading.local()

def show_value(): thread_name = threading.current_thread().name if hasattr(local, 'value'): print(f"{thread_name}: value = {local.value}") else: print(f"{thread_name}: no value set")

def set_value(val): local.value = val show_value()

Main thread

local.value = "main" show_value() # main: value = main

Other threads don't see main's value

t1 = threading.Thread(target=show_value, name="Thread-1") t1.start() t1.join() # Thread-1: no value set

Each thread sets its own value

t2 = threading.Thread(target=set_value, args=("thread2",), name="Thread-2") t2.start() t2.join() # Thread-2: value = thread2

Main thread's value unchanged

show_value() # main: value = main ```

Practical Examples

1. Request Context in Web Server

```python import threading from contextlib import contextmanager

Thread-local request context

_request_context = threading.local()

@contextmanager def request_context(request_id, user_id): """Set request context for current thread.""" _request_context.request_id = request_id _request_context.user_id = user_id try: yield finally: del _request_context.request_id del _request_context.user_id

def get_current_user(): """Get user from current request context.""" return getattr(_request_context, 'user_id', None)

def get_request_id(): """Get current request ID.""" return getattr(_request_context, 'request_id', None)

def process_data(): user = get_current_user() req = get_request_id() print(f"Processing for user {user} (request {req})")

def handle_request(request_id, user_id): with request_context(request_id, user_id): process_data()

Simulate concurrent requests

threads = [] for i in range(3): t = threading.Thread(target=handle_request, args=(f"req-{i}", f"user-{i}")) threads.append(t) t.start()

for t in threads: t.join() ```

2. Database Connection Per Thread

```python import threading import sqlite3

class ThreadLocalDB: def init(self, db_path): self.db_path = db_path self._local = threading.local()

@property
def connection(self):
    """Get connection for current thread, create if needed."""
    if not hasattr(self._local, 'conn'):
        self._local.conn = sqlite3.connect(self.db_path)
    return self._local.conn

def execute(self, query, params=()):
    """Execute query on thread's connection."""
    cursor = self.connection.cursor()
    cursor.execute(query, params)
    return cursor.fetchall()

def close(self):
    """Close current thread's connection."""
    if hasattr(self._local, 'conn'):
        self._local.conn.close()
        del self._local.conn

Usage

db = ThreadLocalDB('mydb.sqlite')

def worker(worker_id): # Each thread gets its own connection db.execute("INSERT INTO logs VALUES (?)", (f"worker-{worker_id}",)) db.connection.commit()

threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)] for t in threads: t.start() for t in threads: t.join() ```

3. Logging Context

```python import threading import logging

class ContextFilter(logging.Filter): """Add thread-local context to log records."""

def __init__(self):
    super().__init__()
    self._local = threading.local()

def set_context(self, **kwargs):
    for key, value in kwargs.items():
        setattr(self._local, key, value)

def clear_context(self):
    self._local.__dict__.clear()

def filter(self, record):
    for key, value in self._local.__dict__.items():
        setattr(record, key, value)
    return True

Setup

context_filter = ContextFilter() handler = logging.StreamHandler() handler.setFormatter(logging.Formatter( '%(asctime)s [%(request_id)s] %(message)s' )) handler.addFilter(context_filter)

logger = logging.getLogger(name) logger.addHandler(handler) logger.setLevel(logging.INFO)

def process_request(request_id): context_filter.set_context(request_id=request_id) try: logger.info("Starting request") # ... do work ... logger.info("Request completed") finally: context_filter.clear_context() ```

4. Transaction Scope

```python import threading from contextlib import contextmanager

class TransactionManager: def init(self): self._local = threading.local()

@property
def current_transaction(self):
    return getattr(self._local, 'transaction', None)

@contextmanager
def transaction(self):
    if self.current_transaction is not None:
        raise RuntimeError("Nested transactions not supported")

    self._local.transaction = Transaction()
    try:
        yield self._local.transaction
        self._local.transaction.commit()
    except Exception:
        self._local.transaction.rollback()
        raise
    finally:
        del self._local.transaction

class Transaction: def init(self): self.operations = []

def add(self, op):
    self.operations.append(op)

def commit(self):
    print(f"Committing {len(self.operations)} operations")

def rollback(self):
    print("Rolling back")

Usage

tm = TransactionManager()

def worker(worker_id): with tm.transaction() as txn: txn.add(f"operation from worker {worker_id}")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)] for t in threads: t.start() for t in threads: t.join() ```

Subclassing threading.local

For initialization logic, subclass threading.local:

```python import threading

class MyLocal(threading.local): def init(self, default_value): # Called once per thread when first accessed self.value = default_value self.initialized = True

local = MyLocal("default")

def show(): print(f"{threading.current_thread().name}: {local.value}")

Main thread

show() # MainThread: default local.value = "main" show() # MainThread: main

New thread gets fresh default

t = threading.Thread(target=show) t.start() t.join() # Thread-1: default ```

Important Considerations

Data is Thread-Specific, Not Task-Specific

Thread-local data persists for the lifetime of the thread:

```python import threading from concurrent.futures import ThreadPoolExecutor

local = threading.local()

def task(task_id): # May see leftover data from previous task on same thread! old = getattr(local, 'task_id', 'none') local.task_id = task_id return f"Task {task_id} (previous: {old})"

with ThreadPoolExecutor(max_workers=2) as executor: results = list(executor.map(task, range(10))) print(results)

Tasks reuse threads, so they may see previous task's data

```

Cleanup Pattern

python def task_with_cleanup(task_id): local.task_id = task_id try: # ... do work ... pass finally: del local.task_id # Clean up

threading.local vs contextvars

Feature threading.local contextvars
Thread isolation
Async task isolation
Copy on task creation
Python version 2.4+ 3.7+

For async code, use contextvars instead.

Key Takeaways

  • threading.local() provides thread-isolated storage
  • Each thread sees its own copy of attributes
  • Useful for request context, connections, transactions
  • Data persists for thread lifetime (clean up in thread pools)
  • For async code, use contextvars instead
  • Subclass for custom initialization logic

Runnable Example: thread_event_pattern.py

```python """ TUTORIAL: Using threading.Event for Thread Coordination ========================================================

In this tutorial, you'll learn how to use threading.Event as a simple but powerful synchronization mechanism for coordinating threads.

KEY CONCEPTS: - threading.Event: A simple flag-like object for thread communication - event.set() and event.wait(): Basic signaling mechanism - Practical pattern: Using events to signal threads to stop gracefully - Real-world example: Animated spinner that responds to task completion

CREDITS: Adapted from Michele Simionato's example in python-list: https://mail.python.org/pipermail/python-list/2009-February/675659.html """

import itertools import time from threading import Thread, Event

print("=" * 70) print("THREADING.EVENT FOR THREAD COORDINATION") print("=" * 70) print()

============ EXAMPLE 1: Understanding threading.Event

=====================================================

print("EXAMPLE 1: What is threading.Event?") print("-" * 70) print() print("threading.Event is like a simple 'flag' that threads can use to") print("communicate with each other. It has two states: set or unset.") print()

event = Event() print(f"• Created an event: {event}") print(f"• Is it set? {event.is_set()}") print()

When we call event.set(), the flag becomes True

event.set() print(f"After event.set():") print(f"• Is it set now? {event.is_set()}") print()

When we call event.clear(), the flag becomes False again

event.clear() print(f"After event.clear():") print(f"• Is it set now? {event.is_set()}") print()

============ EXAMPLE 2: The Spinner Function - Showing Activity

================================================================

print("EXAMPLE 2: Creating a Spinner Thread") print("-" * 70) print() print("A spinner shows the user that work is happening. We use an Event") print("to let the spinner know when the work is complete so it can stop.") print()

def spin(msg: str, done: Event) -> None: """ Display an animated spinner while work is being done.

WHY THIS DESIGN:
- itertools.cycle creates a never-ending loop of characters
- done.wait(timeout) checks if work is finished every 0.1 seconds
- Using \r (carriage return) overwrites the same line for animation
- flush=True ensures output appears immediately

Args:
    msg: The message to display next to the spinner
    done: An Event that signals when to stop spinning
"""
print(f"\nStarting spinner with message: '{msg}'")
print("(Watch the animation below - it's cycling through characters)")
print()

spinner_chars = r'\|/-'
char_count = 0

for char in itertools.cycle(spinner_chars):
    # WHY cycle()? It lets us loop through 4 characters infinitely.
    # When we reach the end, it automatically starts over.

    status = f'\r{char} {msg}'
    print(status, end='', flush=True)

    # done.wait(0.1) does two things:
    # 1. Waits up to 0.1 seconds for the event to be set
    # 2. Returns True if event was set, False if timeout occurred
    if done.wait(0.1):
        # The event was set! This means we should stop spinning.
        break

    char_count += 1

# Clear the spinner line so it doesn't stay visible
blanks = ' ' * len(status)
print(f'\r{blanks}\r', end='')

print(f"Spinner stopped after {char_count} iterations")
print()

def slow_task() -> int: """ Simulate a long-running task that takes 3 seconds.

WHY: This represents real work - downloading, processing, etc.
We'll run the spinner while this happens.
"""
print("Long task: Starting 3-second sleep...")
time.sleep(3)
print("Long task: Finished!")
return 42

============ EXAMPLE 3: The Supervisor - Coordinating Threads

==============================================================

print("EXAMPLE 3: Coordinating Threads with an Event") print("-" * 70) print() print("The supervisor orchestrates two things:") print("1. A worker thread (spinner) showing progress") print("2. The main thread doing the actual work") print("Both use an Event to coordinate.") print()

def supervisor() -> int: """ Manage the spinner thread and the work.

WHY THIS PATTERN:
- done Event starts as unset (clear). This tells spinner: keep spinning!
- We start() the spinner thread to run in parallel
- Main thread calls slow_task() to do the actual work
- When work is done, we call done.set() to stop the spinner
- We join() to wait for the spinner thread to fully exit

This is a clean, safe way to coordinate threads.
"""

# Create an Event object. It starts in the "unset" state.
done = Event()

# Create a Thread object that will run the spin() function
# We pass the message and the Event object
spinner = Thread(target=spin, args=('thinking!', done))

print(f"Created spinner thread: {spinner}")
print()

# Start the spinner thread. It now runs in parallel with this code.
spinner.start()
print("Spinner thread started (running in parallel now)")
print()

# Do the work. Meanwhile, the spinner thread is still animating.
result = slow_task()

# Now tell the spinner to stop by setting the event
done.set()
print("Main thread: Set the done event (telling spinner to stop)")
print()

# Wait for the spinner thread to fully finish
spinner.join()
print("Main thread: Joined with spinner thread (both are done now)")
print()

return result

============ EXAMPLE 4: Running the Full Demo

==============================================

def main() -> None: """Run the complete demonstration.""" print("EXAMPLE 4: Running the Full Demonstration") print("-" * 70) print()

result = supervisor()

print()
print("=" * 70)
print(f"RESULT: The answer is {result}")
print("=" * 70)

============ EXAMPLE 5: Key Takeaways

======================================

print() print("=" * 70) print("KEY CONCEPTS TO REMEMBER:") print("=" * 70) print() print("1. threading.Event is like a flag threads can check/set") print() print("2. event.set() signals 'True' - work is done, stop waiting") print("3. event.wait(timeout) pauses until set or timeout, returns bool") print() print("4. This pattern is much safer than forcefully killing threads") print() print("5. Perfect for: progress indicators, graceful shutdown, signals") print() print("=" * 70) print()

if name == 'main': main() ```


Exercises

Exercise 1. Demonstrate the difference between a global variable and threading.local(). Create a global request_id and a threading.local() with a request_id attribute. Spawn 5 threads that each set both to their thread number, sleep for 0.1 seconds, then print both values. Show that the global variable is overwritten while the thread-local variable retains the correct value.

Solution to Exercise 1
```python
import threading
import time

global_req_id = None
local_data = threading.local()

def worker(tid):
    global global_req_id
    global_req_id = tid
    local_data.request_id = tid
    time.sleep(0.1)
    print(f"Thread {tid}: global={global_req_id}, "
          f"local={local_data.request_id}")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()
# global_req_id may show wrong value; local always correct
```

Exercise 2. Implement a ThreadLocalDB class that lazily creates one sqlite3 in-memory connection per thread. Write a method execute(sql, params) that uses the thread's connection. Spawn 3 threads that each create a table with a unique name, insert a row, and query it back. Verify each thread only sees its own table.

Solution to Exercise 2
```python
import threading
import sqlite3

class ThreadLocalDB:
    def __init__(self):
        self._local = threading.local()

    @property
    def connection(self):
        if not hasattr(self._local, 'conn'):
            self._local.conn = sqlite3.connect(':memory:')
        return self._local.conn

    def execute(self, sql, params=()):
        cur = self.connection.cursor()
        cur.execute(sql, params)
        self.connection.commit()
        return cur.fetchall()

db = ThreadLocalDB()

def worker(wid):
    table = f"t_{wid}"
    db.execute(f"CREATE TABLE {table} (val TEXT)")
    db.execute(f"INSERT INTO {table} VALUES (?)", (f"data-{wid}",))
    rows = db.execute(f"SELECT * FROM {table}")
    print(f"Thread {wid}: {rows}")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()
```

Exercise 3. Write a request_context context manager using threading.local() that stores user_id and request_id. Write helper functions get_user() and get_request() that read from the context. Spawn 4 threads that each enter a context with different values, call a process() function that reads and prints both values, then exit the context. Verify there is no cross-thread leakage.

Solution to Exercise 3
```python
import threading
from contextlib import contextmanager

_ctx = threading.local()

@contextmanager
def request_context(user_id, request_id):
    _ctx.user_id = user_id
    _ctx.request_id = request_id
    try:
        yield
    finally:
        del _ctx.user_id
        del _ctx.request_id

def get_user():
    return getattr(_ctx, 'user_id', None)

def get_request():
    return getattr(_ctx, 'request_id', None)

def process():
    print(f"  [{threading.current_thread().name}] "
          f"user={get_user()}, request={get_request()}")

def handler(uid, rid):
    with request_context(uid, rid):
        process()
    assert get_user() is None  # cleaned up

threads = [
    threading.Thread(target=handler, args=(f"user{i}", f"req{i}"))
    for i in range(4)
]
for t in threads:
    t.start()
for t in threads:
    t.join()
print("No cross-thread leakage.")
```