Skip to content

Error Handling in Concurrent Code

Proper error handling is critical in concurrent programs where exceptions can occur in multiple threads or processes simultaneously.


Challenges in Concurrent Error Handling

  1. Exceptions don't propagate automatically across threads/processes
  2. Multiple failures can occur simultaneously
  3. Partial completion — some tasks succeed, others fail
  4. Resource cleanup must happen even when errors occur
  5. Deadlocks can occur if error handling isn't careful

Error Handling with submit()

Basic Exception Handling

from concurrent.futures import ThreadPoolExecutor

def risky_task(x):
    if x == 5:
        raise ValueError(f"Cannot process {x}")
    return x ** 2

with ThreadPoolExecutor() as executor:
    future = executor.submit(risky_task, 5)

    try:
        result = future.result()  # Exception raised here
    except ValueError as e:
        print(f"Task failed: {e}")

Check Exception Before Result

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
    future = executor.submit(risky_task, 5)

    # Wait without raising
    future.exception()  # Blocks until complete

    # Check if exception occurred
    exc = future.exception()
    if exc is not None:
        print(f"Exception: {exc}")
    else:
        print(f"Result: {future.result()}")

Error Handling with map()

First Exception Stops Iteration

from concurrent.futures import ThreadPoolExecutor

def process(x):
    if x == 3:
        raise ValueError(f"Bad value: {x}")
    return x ** 2

with ThreadPoolExecutor() as executor:
    try:
        # Raises on first exception encountered in order
        results = list(executor.map(process, range(10)))
    except ValueError as e:
        print(f"Error: {e}")
        # But some tasks may have completed or are still running!

Wrap Function for Safe map()

from concurrent.futures import ThreadPoolExecutor

def safe_process(x):
    """Wrap function to catch exceptions."""
    try:
        if x == 3:
            raise ValueError(f"Bad value: {x}")
        return ("success", x ** 2)
    except Exception as e:
        return ("error", str(e))

with ThreadPoolExecutor() as executor:
    results = list(executor.map(safe_process, range(10)))

    successes = [r[1] for r in results if r[0] == "success"]
    errors = [r[1] for r in results if r[0] == "error"]

    print(f"Successes: {len(successes)}")
    print(f"Errors: {errors}")

Error Handling with as_completed()

Handle Each Future Individually

from concurrent.futures import ThreadPoolExecutor, as_completed

def process(x):
    if x % 3 == 0:
        raise ValueError(f"Divisible by 3: {x}")
    return x ** 2

with ThreadPoolExecutor() as executor:
    futures = {executor.submit(process, x): x for x in range(10)}

    results = {}
    errors = {}

    for future in as_completed(futures):
        x = futures[future]
        try:
            results[x] = future.result()
        except Exception as e:
            errors[x] = str(e)

    print(f"Results: {results}")
    print(f"Errors: {errors}")

Continue on Errors

from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_data(url):
    import requests
    response = requests.get(url, timeout=5)
    response.raise_for_status()
    return response.json()

urls = [
    "https://api.example.com/data/1",
    "https://api.example.com/data/2",
    "https://invalid-url.com/fail",  # Will fail
    "https://api.example.com/data/3",
]

with ThreadPoolExecutor(max_workers=10) as executor:
    future_to_url = {executor.submit(fetch_data, url): url for url in urls}

    successful_data = []
    failed_urls = []

    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            successful_data.append(data)
        except Exception as e:
            failed_urls.append((url, str(e)))
            # Continue processing other results

    print(f"Fetched: {len(successful_data)}")
    print(f"Failed: {failed_urls}")

Callbacks for Error Handling

Error Callback with apply_async

from multiprocessing import Pool

def task(x):
    if x < 0:
        raise ValueError("Negative!")
    return x ** 2

def on_success(result):
    print(f"Success: {result}")

def on_error(error):
    print(f"Error: {error}")

if __name__ == "__main__":
    with Pool() as pool:
        # With callbacks
        pool.apply_async(task, args=(5,), 
                        callback=on_success, 
                        error_callback=on_error)

        pool.apply_async(task, args=(-1,), 
                        callback=on_success, 
                        error_callback=on_error)

        pool.close()
        pool.join()

Done Callback with Future

from concurrent.futures import ThreadPoolExecutor

def task(x):
    if x < 0:
        raise ValueError("Negative!")
    return x ** 2

def handle_completion(future):
    try:
        result = future.result()
        print(f"Success: {result}")
    except Exception as e:
        print(f"Failed: {e}")

with ThreadPoolExecutor() as executor:
    for x in [5, -1, 10, -5]:
        future = executor.submit(task, x)
        future.add_done_callback(handle_completion)

Timeout Handling

Single Task Timeout

from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time

def slow_task():
    time.sleep(10)
    return "done"

with ThreadPoolExecutor() as executor:
    future = executor.submit(slow_task)

    try:
        result = future.result(timeout=2)
    except TimeoutError:
        print("Task timed out!")
        # Note: Task may still be running!
        # cancel() only works if task hasn't started
        future.cancel()

Batch Timeout with as_completed

from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError

def task(x):
    import time
    time.sleep(x)
    return x

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(task, i) for i in [1, 5, 2, 8, 1]]

    results = []
    timed_out = []

    try:
        for future in as_completed(futures, timeout=3):
            results.append(future.result())
    except TimeoutError:
        # Some futures didn't complete in time
        for future in futures:
            if not future.done():
                timed_out.append(future)
                future.cancel()

    print(f"Completed: {results}")
    print(f"Timed out: {len(timed_out)}")

Exception Chaining

Preserve Original Exception

from concurrent.futures import ThreadPoolExecutor

def worker(x):
    raise ValueError("Original error")

def safe_worker(x):
    try:
        return worker(x)
    except Exception as e:
        raise RuntimeError(f"Worker {x} failed") from e

with ThreadPoolExecutor() as executor:
    future = executor.submit(safe_worker, 5)

    try:
        future.result()
    except RuntimeError as e:
        print(f"Error: {e}")
        print(f"Caused by: {e.__cause__}")

Logging Errors

Centralized Error Logging

from concurrent.futures import ThreadPoolExecutor, as_completed
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def process(x):
    if x % 5 == 0:
        raise ValueError(f"Bad value: {x}")
    return x ** 2

def process_with_logging(x):
    try:
        result = process(x)
        logger.info(f"Task {x}: Success")
        return result
    except Exception as e:
        logger.error(f"Task {x}: Failed - {e}")
        raise

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(process_with_logging, x) for x in range(20)]

    for future in as_completed(futures):
        try:
            future.result()
        except Exception:
            pass  # Already logged

Cleanup on Error

Using try/finally

from concurrent.futures import ThreadPoolExecutor

def task_with_cleanup(resource):
    try:
        # Do work
        result = process(resource)
        return result
    finally:
        # Always cleanup
        resource.close()

with ThreadPoolExecutor() as executor:
    resources = [open_resource(i) for i in range(10)]
    futures = [executor.submit(task_with_cleanup, r) for r in resources]

    # All resources will be cleaned up
    results = [f.result() for f in futures]

Context Manager Pattern

from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager

@contextmanager
def managed_resource(name):
    resource = acquire_resource(name)
    try:
        yield resource
    finally:
        release_resource(resource)

def task(name):
    with managed_resource(name) as resource:
        return process(resource)

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(task, f"resource_{i}") for i in range(10)]
    results = [f.result() for f in futures]

Aggregating Multiple Errors

Collect All Errors

from concurrent.futures import ThreadPoolExecutor, as_completed

class MultipleErrors(Exception):
    def __init__(self, errors):
        self.errors = errors
        super().__init__(f"{len(errors)} errors occurred")

def process_all(items, func, raise_on_error=True):
    """Process all items, optionally raising aggregated errors."""
    with ThreadPoolExecutor() as executor:
        future_to_item = {executor.submit(func, item): item for item in items}

        results = []
        errors = []

        for future in as_completed(future_to_item):
            item = future_to_item[future]
            try:
                results.append((item, future.result()))
            except Exception as e:
                errors.append((item, e))

        if errors and raise_on_error:
            raise MultipleErrors(errors)

        return results, errors

# Usage
try:
    results, errors = process_all(items, risky_func)
except MultipleErrors as e:
    print(f"Failed items: {len(e.errors)}")
    for item, error in e.errors:
        print(f"  {item}: {error}")

Best Practices

1. Always Handle Exceptions

# Bad: Exceptions silently ignored
for future in futures:
    future.result()  # May raise!

# Good: Handle each exception
for future in futures:
    try:
        result = future.result()
    except Exception as e:
        handle_error(e)

2. Use Specific Exception Types

# Bad: Catch everything
except Exception:
    pass

# Good: Catch specific exceptions
except (ConnectionError, TimeoutError) as e:
    retry_task(e)
except ValueError as e:
    log_invalid_input(e)

3. Don't Ignore Partial Results

# Bad: All or nothing
try:
    results = list(executor.map(func, items))
except Exception:
    results = []

# Good: Collect partial results
results = []
errors = []
for future in as_completed(futures):
    try:
        results.append(future.result())
    except Exception as e:
        errors.append(e)

4. Set Timeouts

# Bad: Wait forever
result = future.result()

# Good: Set reasonable timeout
result = future.result(timeout=30)

Key Takeaways

  • Exceptions in workers don't propagate automatically
  • Use future.result() to get or raise exceptions
  • as_completed() allows handling each task's error individually
  • Wrap functions to capture exceptions without stopping map()
  • Use callbacks for async error handling
  • Always set timeouts for production code
  • Log errors for debugging
  • Ensure cleanup happens even on errors
  • Collect partial results when some tasks fail