Skip to content

Thread-Local Storage

threading.local() provides storage where each thread has its own independent copy of data.

The Problem: Shared State

Without thread-local storage, all threads share the same global variables:

import threading
import time

# Global variable - shared by all threads
request_id = None

def handle_request(req_id):
    global request_id
    request_id = req_id
    time.sleep(0.1)  # Simulate processing
    # BUG: request_id may have been overwritten by another thread!
    print(f"Processing request {request_id}")

threads = []
for i in range(5):
    t = threading.Thread(target=handle_request, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# Output might show wrong request IDs!

Solution: threading.local()

Each thread gets its own copy of the data:

import threading
import time

# Thread-local storage
local_data = threading.local()

def handle_request(req_id):
    local_data.request_id = req_id
    time.sleep(0.1)
    # Each thread sees its own request_id
    print(f"Thread {threading.current_thread().name}: request {local_data.request_id}")

threads = []
for i in range(5):
    t = threading.Thread(target=handle_request, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# Output: Each thread shows its correct request_id

How It Works

import threading

local = threading.local()

def show_value():
    thread_name = threading.current_thread().name
    if hasattr(local, 'value'):
        print(f"{thread_name}: value = {local.value}")
    else:
        print(f"{thread_name}: no value set")

def set_value(val):
    local.value = val
    show_value()

# Main thread
local.value = "main"
show_value()  # main: value = main

# Other threads don't see main's value
t1 = threading.Thread(target=show_value, name="Thread-1")
t1.start()
t1.join()  # Thread-1: no value set

# Each thread sets its own value
t2 = threading.Thread(target=set_value, args=("thread2",), name="Thread-2")
t2.start()
t2.join()  # Thread-2: value = thread2

# Main thread's value unchanged
show_value()  # main: value = main

Practical Examples

1. Request Context in Web Server

import threading
from contextlib import contextmanager

# Thread-local request context
_request_context = threading.local()

@contextmanager
def request_context(request_id, user_id):
    """Set request context for current thread."""
    _request_context.request_id = request_id
    _request_context.user_id = user_id
    try:
        yield
    finally:
        del _request_context.request_id
        del _request_context.user_id

def get_current_user():
    """Get user from current request context."""
    return getattr(_request_context, 'user_id', None)

def get_request_id():
    """Get current request ID."""
    return getattr(_request_context, 'request_id', None)

def process_data():
    user = get_current_user()
    req = get_request_id()
    print(f"Processing for user {user} (request {req})")

def handle_request(request_id, user_id):
    with request_context(request_id, user_id):
        process_data()

# Simulate concurrent requests
threads = []
for i in range(3):
    t = threading.Thread(target=handle_request, args=(f"req-{i}", f"user-{i}"))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

2. Database Connection Per Thread

import threading
import sqlite3

class ThreadLocalDB:
    def __init__(self, db_path):
        self.db_path = db_path
        self._local = threading.local()

    @property
    def connection(self):
        """Get connection for current thread, create if needed."""
        if not hasattr(self._local, 'conn'):
            self._local.conn = sqlite3.connect(self.db_path)
        return self._local.conn

    def execute(self, query, params=()):
        """Execute query on thread's connection."""
        cursor = self.connection.cursor()
        cursor.execute(query, params)
        return cursor.fetchall()

    def close(self):
        """Close current thread's connection."""
        if hasattr(self._local, 'conn'):
            self._local.conn.close()
            del self._local.conn

# Usage
db = ThreadLocalDB('mydb.sqlite')

def worker(worker_id):
    # Each thread gets its own connection
    db.execute("INSERT INTO logs VALUES (?)", (f"worker-{worker_id}",))
    db.connection.commit()

threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

3. Logging Context

import threading
import logging

class ContextFilter(logging.Filter):
    """Add thread-local context to log records."""

    def __init__(self):
        super().__init__()
        self._local = threading.local()

    def set_context(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self._local, key, value)

    def clear_context(self):
        self._local.__dict__.clear()

    def filter(self, record):
        for key, value in self._local.__dict__.items():
            setattr(record, key, value)
        return True

# Setup
context_filter = ContextFilter()
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
    '%(asctime)s [%(request_id)s] %(message)s'
))
handler.addFilter(context_filter)

logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

def process_request(request_id):
    context_filter.set_context(request_id=request_id)
    try:
        logger.info("Starting request")
        # ... do work ...
        logger.info("Request completed")
    finally:
        context_filter.clear_context()

4. Transaction Scope

import threading
from contextlib import contextmanager

class TransactionManager:
    def __init__(self):
        self._local = threading.local()

    @property
    def current_transaction(self):
        return getattr(self._local, 'transaction', None)

    @contextmanager
    def transaction(self):
        if self.current_transaction is not None:
            raise RuntimeError("Nested transactions not supported")

        self._local.transaction = Transaction()
        try:
            yield self._local.transaction
            self._local.transaction.commit()
        except Exception:
            self._local.transaction.rollback()
            raise
        finally:
            del self._local.transaction

class Transaction:
    def __init__(self):
        self.operations = []

    def add(self, op):
        self.operations.append(op)

    def commit(self):
        print(f"Committing {len(self.operations)} operations")

    def rollback(self):
        print("Rolling back")

# Usage
tm = TransactionManager()

def worker(worker_id):
    with tm.transaction() as txn:
        txn.add(f"operation from worker {worker_id}")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Subclassing threading.local

For initialization logic, subclass threading.local:

import threading

class MyLocal(threading.local):
    def __init__(self, default_value):
        # Called once per thread when first accessed
        self.value = default_value
        self.initialized = True

local = MyLocal("default")

def show():
    print(f"{threading.current_thread().name}: {local.value}")

# Main thread
show()  # MainThread: default
local.value = "main"
show()  # MainThread: main

# New thread gets fresh default
t = threading.Thread(target=show)
t.start()
t.join()  # Thread-1: default

Important Considerations

Data is Thread-Specific, Not Task-Specific

Thread-local data persists for the lifetime of the thread:

import threading
from concurrent.futures import ThreadPoolExecutor

local = threading.local()

def task(task_id):
    # May see leftover data from previous task on same thread!
    old = getattr(local, 'task_id', 'none')
    local.task_id = task_id
    return f"Task {task_id} (previous: {old})"

with ThreadPoolExecutor(max_workers=2) as executor:
    results = list(executor.map(task, range(10)))
    print(results)
# Tasks reuse threads, so they may see previous task's data

Cleanup Pattern

def task_with_cleanup(task_id):
    local.task_id = task_id
    try:
        # ... do work ...
        pass
    finally:
        del local.task_id  # Clean up

threading.local vs contextvars

Feature threading.local contextvars
Thread isolation
Async task isolation
Copy on task creation
Python version 2.4+ 3.7+

For async code, use contextvars instead.

Key Takeaways

  • threading.local() provides thread-isolated storage
  • Each thread sees its own copy of attributes
  • Useful for request context, connections, transactions
  • Data persists for thread lifetime (clean up in thread pools)
  • For async code, use contextvars instead
  • Subclass for custom initialization logic

Runnable Example: thread_event_pattern.py

"""
TUTORIAL: Using threading.Event for Thread Coordination
========================================================

In this tutorial, you'll learn how to use threading.Event as a simple but
powerful synchronization mechanism for coordinating threads.

KEY CONCEPTS:
- threading.Event: A simple flag-like object for thread communication
- event.set() and event.wait(): Basic signaling mechanism
- Practical pattern: Using events to signal threads to stop gracefully
- Real-world example: Animated spinner that responds to task completion

CREDITS: Adapted from Michele Simionato's example in python-list:
https://mail.python.org/pipermail/python-list/2009-February/675659.html
"""

import itertools
import time
from threading import Thread, Event


print("=" * 70)
print("THREADING.EVENT FOR THREAD COORDINATION")
print("=" * 70)
print()


# ============ EXAMPLE 1: Understanding threading.Event
# =====================================================

print("EXAMPLE 1: What is threading.Event?")
print("-" * 70)
print()
print("threading.Event is like a simple 'flag' that threads can use to")
print("communicate with each other. It has two states: set or unset.")
print()

event = Event()
print(f"• Created an event: {event}")
print(f"• Is it set? {event.is_set()}")
print()

# When we call event.set(), the flag becomes True
event.set()
print(f"After event.set():")
print(f"• Is it set now? {event.is_set()}")
print()

# When we call event.clear(), the flag becomes False again
event.clear()
print(f"After event.clear():")
print(f"• Is it set now? {event.is_set()}")
print()


# ============ EXAMPLE 2: The Spinner Function - Showing Activity
# ================================================================

print("EXAMPLE 2: Creating a Spinner Thread")
print("-" * 70)
print()
print("A spinner shows the user that work is happening. We use an Event")
print("to let the spinner know when the work is complete so it can stop.")
print()


def spin(msg: str, done: Event) -> None:
    """
    Display an animated spinner while work is being done.

    WHY THIS DESIGN:
    - itertools.cycle creates a never-ending loop of characters
    - done.wait(timeout) checks if work is finished every 0.1 seconds
    - Using \r (carriage return) overwrites the same line for animation
    - flush=True ensures output appears immediately

    Args:
        msg: The message to display next to the spinner
        done: An Event that signals when to stop spinning
    """
    print(f"\nStarting spinner with message: '{msg}'")
    print("(Watch the animation below - it's cycling through characters)")
    print()

    spinner_chars = r'\|/-'
    char_count = 0

    for char in itertools.cycle(spinner_chars):
        # WHY cycle()? It lets us loop through 4 characters infinitely.
        # When we reach the end, it automatically starts over.

        status = f'\r{char} {msg}'
        print(status, end='', flush=True)

        # done.wait(0.1) does two things:
        # 1. Waits up to 0.1 seconds for the event to be set
        # 2. Returns True if event was set, False if timeout occurred
        if done.wait(0.1):
            # The event was set! This means we should stop spinning.
            break

        char_count += 1

    # Clear the spinner line so it doesn't stay visible
    blanks = ' ' * len(status)
    print(f'\r{blanks}\r', end='')

    print(f"Spinner stopped after {char_count} iterations")
    print()


def slow_task() -> int:
    """
    Simulate a long-running task that takes 3 seconds.

    WHY: This represents real work - downloading, processing, etc.
    We'll run the spinner while this happens.
    """
    print("Long task: Starting 3-second sleep...")
    time.sleep(3)
    print("Long task: Finished!")
    return 42


# ============ EXAMPLE 3: The Supervisor - Coordinating Threads
# ==============================================================

print("EXAMPLE 3: Coordinating Threads with an Event")
print("-" * 70)
print()
print("The supervisor orchestrates two things:")
print("1. A worker thread (spinner) showing progress")
print("2. The main thread doing the actual work")
print("Both use an Event to coordinate.")
print()


def supervisor() -> int:
    """
    Manage the spinner thread and the work.

    WHY THIS PATTERN:
    - done Event starts as unset (clear). This tells spinner: keep spinning!
    - We start() the spinner thread to run in parallel
    - Main thread calls slow_task() to do the actual work
    - When work is done, we call done.set() to stop the spinner
    - We join() to wait for the spinner thread to fully exit

    This is a clean, safe way to coordinate threads.
    """

    # Create an Event object. It starts in the "unset" state.
    done = Event()

    # Create a Thread object that will run the spin() function
    # We pass the message and the Event object
    spinner = Thread(target=spin, args=('thinking!', done))

    print(f"Created spinner thread: {spinner}")
    print()

    # Start the spinner thread. It now runs in parallel with this code.
    spinner.start()
    print("Spinner thread started (running in parallel now)")
    print()

    # Do the work. Meanwhile, the spinner thread is still animating.
    result = slow_task()

    # Now tell the spinner to stop by setting the event
    done.set()
    print("Main thread: Set the done event (telling spinner to stop)")
    print()

    # Wait for the spinner thread to fully finish
    spinner.join()
    print("Main thread: Joined with spinner thread (both are done now)")
    print()

    return result


# ============ EXAMPLE 4: Running the Full Demo
# ==============================================

def main() -> None:
    """Run the complete demonstration."""
    print("EXAMPLE 4: Running the Full Demonstration")
    print("-" * 70)
    print()

    result = supervisor()

    print()
    print("=" * 70)
    print(f"RESULT: The answer is {result}")
    print("=" * 70)


# ============ EXAMPLE 5: Key Takeaways
# ======================================

print()
print("=" * 70)
print("KEY CONCEPTS TO REMEMBER:")
print("=" * 70)
print()
print("1. threading.Event is like a flag threads can check/set")
print()
print("2. event.set() signals 'True' - work is done, stop waiting")
print("3. event.wait(timeout) pauses until set or timeout, returns bool")
print()
print("4. This pattern is much safer than forcefully killing threads")
print()
print("5. Perfect for: progress indicators, graceful shutdown, signals")
print()
print("=" * 70)
print()


if __name__ == '__main__':
    main()