Error Handling in Concurrent Code¶
Proper error handling is critical in concurrent programs where exceptions can occur in multiple threads or processes simultaneously.
Challenges in Concurrent Error Handling¶
- Exceptions don't propagate automatically across threads/processes
- Multiple failures can occur simultaneously
- Partial completion — some tasks succeed, others fail
- Resource cleanup must happen even when errors occur
- Deadlocks can occur if error handling isn't careful
Error Handling with submit()¶
Basic Exception Handling¶
from concurrent.futures import ThreadPoolExecutor
def risky_task(x):
if x == 5:
raise ValueError(f"Cannot process {x}")
return x ** 2
with ThreadPoolExecutor() as executor:
future = executor.submit(risky_task, 5)
try:
result = future.result() # Exception raised here
except ValueError as e:
print(f"Task failed: {e}")
Check Exception Before Result¶
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
future = executor.submit(risky_task, 5)
# Wait without raising
future.exception() # Blocks until complete
# Check if exception occurred
exc = future.exception()
if exc is not None:
print(f"Exception: {exc}")
else:
print(f"Result: {future.result()}")
Error Handling with map()¶
First Exception Stops Iteration¶
from concurrent.futures import ThreadPoolExecutor
def process(x):
if x == 3:
raise ValueError(f"Bad value: {x}")
return x ** 2
with ThreadPoolExecutor() as executor:
try:
# Raises on first exception encountered in order
results = list(executor.map(process, range(10)))
except ValueError as e:
print(f"Error: {e}")
# But some tasks may have completed or are still running!
Wrap Function for Safe map()¶
from concurrent.futures import ThreadPoolExecutor
def safe_process(x):
"""Wrap function to catch exceptions."""
try:
if x == 3:
raise ValueError(f"Bad value: {x}")
return ("success", x ** 2)
except Exception as e:
return ("error", str(e))
with ThreadPoolExecutor() as executor:
results = list(executor.map(safe_process, range(10)))
successes = [r[1] for r in results if r[0] == "success"]
errors = [r[1] for r in results if r[0] == "error"]
print(f"Successes: {len(successes)}")
print(f"Errors: {errors}")
Error Handling with as_completed()¶
Handle Each Future Individually¶
from concurrent.futures import ThreadPoolExecutor, as_completed
def process(x):
if x % 3 == 0:
raise ValueError(f"Divisible by 3: {x}")
return x ** 2
with ThreadPoolExecutor() as executor:
futures = {executor.submit(process, x): x for x in range(10)}
results = {}
errors = {}
for future in as_completed(futures):
x = futures[future]
try:
results[x] = future.result()
except Exception as e:
errors[x] = str(e)
print(f"Results: {results}")
print(f"Errors: {errors}")
Continue on Errors¶
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_data(url):
import requests
response = requests.get(url, timeout=5)
response.raise_for_status()
return response.json()
urls = [
"https://api.example.com/data/1",
"https://api.example.com/data/2",
"https://invalid-url.com/fail", # Will fail
"https://api.example.com/data/3",
]
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_url = {executor.submit(fetch_data, url): url for url in urls}
successful_data = []
failed_urls = []
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
successful_data.append(data)
except Exception as e:
failed_urls.append((url, str(e)))
# Continue processing other results
print(f"Fetched: {len(successful_data)}")
print(f"Failed: {failed_urls}")
Callbacks for Error Handling¶
Error Callback with apply_async¶
from multiprocessing import Pool
def task(x):
if x < 0:
raise ValueError("Negative!")
return x ** 2
def on_success(result):
print(f"Success: {result}")
def on_error(error):
print(f"Error: {error}")
if __name__ == "__main__":
with Pool() as pool:
# With callbacks
pool.apply_async(task, args=(5,),
callback=on_success,
error_callback=on_error)
pool.apply_async(task, args=(-1,),
callback=on_success,
error_callback=on_error)
pool.close()
pool.join()
Done Callback with Future¶
from concurrent.futures import ThreadPoolExecutor
def task(x):
if x < 0:
raise ValueError("Negative!")
return x ** 2
def handle_completion(future):
try:
result = future.result()
print(f"Success: {result}")
except Exception as e:
print(f"Failed: {e}")
with ThreadPoolExecutor() as executor:
for x in [5, -1, 10, -5]:
future = executor.submit(task, x)
future.add_done_callback(handle_completion)
Timeout Handling¶
Single Task Timeout¶
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
def slow_task():
time.sleep(10)
return "done"
with ThreadPoolExecutor() as executor:
future = executor.submit(slow_task)
try:
result = future.result(timeout=2)
except TimeoutError:
print("Task timed out!")
# Note: Task may still be running!
# cancel() only works if task hasn't started
future.cancel()
Batch Timeout with as_completed¶
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
def task(x):
import time
time.sleep(x)
return x
with ThreadPoolExecutor() as executor:
futures = [executor.submit(task, i) for i in [1, 5, 2, 8, 1]]
results = []
timed_out = []
try:
for future in as_completed(futures, timeout=3):
results.append(future.result())
except TimeoutError:
# Some futures didn't complete in time
for future in futures:
if not future.done():
timed_out.append(future)
future.cancel()
print(f"Completed: {results}")
print(f"Timed out: {len(timed_out)}")
Exception Chaining¶
Preserve Original Exception¶
from concurrent.futures import ThreadPoolExecutor
def worker(x):
raise ValueError("Original error")
def safe_worker(x):
try:
return worker(x)
except Exception as e:
raise RuntimeError(f"Worker {x} failed") from e
with ThreadPoolExecutor() as executor:
future = executor.submit(safe_worker, 5)
try:
future.result()
except RuntimeError as e:
print(f"Error: {e}")
print(f"Caused by: {e.__cause__}")
Logging Errors¶
Centralized Error Logging¶
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def process(x):
if x % 5 == 0:
raise ValueError(f"Bad value: {x}")
return x ** 2
def process_with_logging(x):
try:
result = process(x)
logger.info(f"Task {x}: Success")
return result
except Exception as e:
logger.error(f"Task {x}: Failed - {e}")
raise
with ThreadPoolExecutor() as executor:
futures = [executor.submit(process_with_logging, x) for x in range(20)]
for future in as_completed(futures):
try:
future.result()
except Exception:
pass # Already logged
Cleanup on Error¶
Using try/finally¶
from concurrent.futures import ThreadPoolExecutor
def task_with_cleanup(resource):
try:
# Do work
result = process(resource)
return result
finally:
# Always cleanup
resource.close()
with ThreadPoolExecutor() as executor:
resources = [open_resource(i) for i in range(10)]
futures = [executor.submit(task_with_cleanup, r) for r in resources]
# All resources will be cleaned up
results = [f.result() for f in futures]
Context Manager Pattern¶
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
@contextmanager
def managed_resource(name):
resource = acquire_resource(name)
try:
yield resource
finally:
release_resource(resource)
def task(name):
with managed_resource(name) as resource:
return process(resource)
with ThreadPoolExecutor() as executor:
futures = [executor.submit(task, f"resource_{i}") for i in range(10)]
results = [f.result() for f in futures]
Aggregating Multiple Errors¶
Collect All Errors¶
from concurrent.futures import ThreadPoolExecutor, as_completed
class MultipleErrors(Exception):
def __init__(self, errors):
self.errors = errors
super().__init__(f"{len(errors)} errors occurred")
def process_all(items, func, raise_on_error=True):
"""Process all items, optionally raising aggregated errors."""
with ThreadPoolExecutor() as executor:
future_to_item = {executor.submit(func, item): item for item in items}
results = []
errors = []
for future in as_completed(future_to_item):
item = future_to_item[future]
try:
results.append((item, future.result()))
except Exception as e:
errors.append((item, e))
if errors and raise_on_error:
raise MultipleErrors(errors)
return results, errors
# Usage
try:
results, errors = process_all(items, risky_func)
except MultipleErrors as e:
print(f"Failed items: {len(e.errors)}")
for item, error in e.errors:
print(f" {item}: {error}")
Best Practices¶
1. Always Handle Exceptions¶
# Bad: Exceptions silently ignored
for future in futures:
future.result() # May raise!
# Good: Handle each exception
for future in futures:
try:
result = future.result()
except Exception as e:
handle_error(e)
2. Use Specific Exception Types¶
# Bad: Catch everything
except Exception:
pass
# Good: Catch specific exceptions
except (ConnectionError, TimeoutError) as e:
retry_task(e)
except ValueError as e:
log_invalid_input(e)
3. Don't Ignore Partial Results¶
# Bad: All or nothing
try:
results = list(executor.map(func, items))
except Exception:
results = []
# Good: Collect partial results
results = []
errors = []
for future in as_completed(futures):
try:
results.append(future.result())
except Exception as e:
errors.append(e)
4. Set Timeouts¶
# Bad: Wait forever
result = future.result()
# Good: Set reasonable timeout
result = future.result(timeout=30)
Key Takeaways¶
- Exceptions in workers don't propagate automatically
- Use
future.result()to get or raise exceptions as_completed()allows handling each task's error individually- Wrap functions to capture exceptions without stopping
map() - Use callbacks for async error handling
- Always set timeouts for production code
- Log errors for debugging
- Ensure cleanup happens even on errors
- Collect partial results when some tasks fail