Error Handling in Concurrent Code¶
Proper error handling is critical in concurrent programs where exceptions can occur in multiple threads or processes simultaneously.
Mental Model
In sequential code an exception stops everything immediately. In concurrent code, exceptions happen in worker threads or processes -- they are silently captured inside Futures and only re-raised when you call result(). If you never check, you never know something failed. Always retrieve results or attach error callbacks; otherwise failures vanish silently and corrupt your output.
Challenges in Concurrent Error Handling¶
- Exceptions don't propagate automatically across threads/processes
- Multiple failures can occur simultaneously
- Partial completion — some tasks succeed, others fail
- Resource cleanup must happen even when errors occur
- Deadlocks can occur if error handling isn't careful
Error Handling with submit()¶
Basic Exception Handling¶
```python from concurrent.futures import ThreadPoolExecutor
def risky_task(x): if x == 5: raise ValueError(f"Cannot process {x}") return x ** 2
with ThreadPoolExecutor() as executor: future = executor.submit(risky_task, 5)
try:
result = future.result() # Exception raised here
except ValueError as e:
print(f"Task failed: {e}")
```
Check Exception Before Result¶
```python from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor: future = executor.submit(risky_task, 5)
# Wait without raising
future.exception() # Blocks until complete
# Check if exception occurred
exc = future.exception()
if exc is not None:
print(f"Exception: {exc}")
else:
print(f"Result: {future.result()}")
```
Error Handling with map()¶
First Exception Stops Iteration¶
```python from concurrent.futures import ThreadPoolExecutor
def process(x): if x == 3: raise ValueError(f"Bad value: {x}") return x ** 2
with ThreadPoolExecutor() as executor: try: # Raises on first exception encountered in order results = list(executor.map(process, range(10))) except ValueError as e: print(f"Error: {e}") # But some tasks may have completed or are still running! ```
Wrap Function for Safe map()¶
```python from concurrent.futures import ThreadPoolExecutor
def safe_process(x): """Wrap function to catch exceptions.""" try: if x == 3: raise ValueError(f"Bad value: {x}") return ("success", x ** 2) except Exception as e: return ("error", str(e))
with ThreadPoolExecutor() as executor: results = list(executor.map(safe_process, range(10)))
successes = [r[1] for r in results if r[0] == "success"]
errors = [r[1] for r in results if r[0] == "error"]
print(f"Successes: {len(successes)}")
print(f"Errors: {errors}")
```
Error Handling with as_completed()¶
Handle Each Future Individually¶
```python from concurrent.futures import ThreadPoolExecutor, as_completed
def process(x): if x % 3 == 0: raise ValueError(f"Divisible by 3: {x}") return x ** 2
with ThreadPoolExecutor() as executor: futures = {executor.submit(process, x): x for x in range(10)}
results = {}
errors = {}
for future in as_completed(futures):
x = futures[future]
try:
results[x] = future.result()
except Exception as e:
errors[x] = str(e)
print(f"Results: {results}")
print(f"Errors: {errors}")
```
Continue on Errors¶
```python from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_data(url): import requests response = requests.get(url, timeout=5) response.raise_for_status() return response.json()
urls = [ "https://api.example.com/data/1", "https://api.example.com/data/2", "https://invalid-url.com/fail", # Will fail "https://api.example.com/data/3", ]
with ThreadPoolExecutor(max_workers=10) as executor: future_to_url = {executor.submit(fetch_data, url): url for url in urls}
successful_data = []
failed_urls = []
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
successful_data.append(data)
except Exception as e:
failed_urls.append((url, str(e)))
# Continue processing other results
print(f"Fetched: {len(successful_data)}")
print(f"Failed: {failed_urls}")
```
Callbacks for Error Handling¶
Error Callback with apply_async¶
```python from multiprocessing import Pool
def task(x): if x < 0: raise ValueError("Negative!") return x ** 2
def on_success(result): print(f"Success: {result}")
def on_error(error): print(f"Error: {error}")
if name == "main": with Pool() as pool: # With callbacks pool.apply_async(task, args=(5,), callback=on_success, error_callback=on_error)
pool.apply_async(task, args=(-1,),
callback=on_success,
error_callback=on_error)
pool.close()
pool.join()
```
Done Callback with Future¶
```python from concurrent.futures import ThreadPoolExecutor
def task(x): if x < 0: raise ValueError("Negative!") return x ** 2
def handle_completion(future): try: result = future.result() print(f"Success: {result}") except Exception as e: print(f"Failed: {e}")
with ThreadPoolExecutor() as executor: for x in [5, -1, 10, -5]: future = executor.submit(task, x) future.add_done_callback(handle_completion) ```
Timeout Handling¶
Single Task Timeout¶
```python from concurrent.futures import ThreadPoolExecutor, TimeoutError import time
def slow_task(): time.sleep(10) return "done"
with ThreadPoolExecutor() as executor: future = executor.submit(slow_task)
try:
result = future.result(timeout=2)
except TimeoutError:
print("Task timed out!")
# Note: Task may still be running!
# cancel() only works if task hasn't started
future.cancel()
```
Batch Timeout with as_completed¶
```python from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
def task(x): import time time.sleep(x) return x
with ThreadPoolExecutor() as executor: futures = [executor.submit(task, i) for i in [1, 5, 2, 8, 1]]
results = []
timed_out = []
try:
for future in as_completed(futures, timeout=3):
results.append(future.result())
except TimeoutError:
# Some futures didn't complete in time
for future in futures:
if not future.done():
timed_out.append(future)
future.cancel()
print(f"Completed: {results}")
print(f"Timed out: {len(timed_out)}")
```
Exception Chaining¶
Preserve Original Exception¶
```python from concurrent.futures import ThreadPoolExecutor
def worker(x): raise ValueError("Original error")
def safe_worker(x): try: return worker(x) except Exception as e: raise RuntimeError(f"Worker {x} failed") from e
with ThreadPoolExecutor() as executor: future = executor.submit(safe_worker, 5)
try:
future.result()
except RuntimeError as e:
print(f"Error: {e}")
print(f"Caused by: {e.__cause__}")
```
Logging Errors¶
Centralized Error Logging¶
```python from concurrent.futures import ThreadPoolExecutor, as_completed import logging
logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)
def process(x): if x % 5 == 0: raise ValueError(f"Bad value: {x}") return x ** 2
def process_with_logging(x): try: result = process(x) logger.info(f"Task {x}: Success") return result except Exception as e: logger.error(f"Task {x}: Failed - {e}") raise
with ThreadPoolExecutor() as executor: futures = [executor.submit(process_with_logging, x) for x in range(20)]
for future in as_completed(futures):
try:
future.result()
except Exception:
pass # Already logged
```
Cleanup on Error¶
Using try/finally¶
```python from concurrent.futures import ThreadPoolExecutor
def task_with_cleanup(resource): try: # Do work result = process(resource) return result finally: # Always cleanup resource.close()
with ThreadPoolExecutor() as executor: resources = [open_resource(i) for i in range(10)] futures = [executor.submit(task_with_cleanup, r) for r in resources]
# All resources will be cleaned up
results = [f.result() for f in futures]
```
Context Manager Pattern¶
```python from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager
@contextmanager def managed_resource(name): resource = acquire_resource(name) try: yield resource finally: release_resource(resource)
def task(name): with managed_resource(name) as resource: return process(resource)
with ThreadPoolExecutor() as executor: futures = [executor.submit(task, f"resource_{i}") for i in range(10)] results = [f.result() for f in futures] ```
Aggregating Multiple Errors¶
Collect All Errors¶
```python from concurrent.futures import ThreadPoolExecutor, as_completed
class MultipleErrors(Exception): def init(self, errors): self.errors = errors super().init(f"{len(errors)} errors occurred")
def process_all(items, func, raise_on_error=True): """Process all items, optionally raising aggregated errors.""" with ThreadPoolExecutor() as executor: future_to_item = {executor.submit(func, item): item for item in items}
results = []
errors = []
for future in as_completed(future_to_item):
item = future_to_item[future]
try:
results.append((item, future.result()))
except Exception as e:
errors.append((item, e))
if errors and raise_on_error:
raise MultipleErrors(errors)
return results, errors
Usage¶
try: results, errors = process_all(items, risky_func) except MultipleErrors as e: print(f"Failed items: {len(e.errors)}") for item, error in e.errors: print(f" {item}: {error}") ```
Best Practices¶
1. Always Handle Exceptions¶
```python
Bad: Exceptions silently ignored¶
for future in futures: future.result() # May raise!
Good: Handle each exception¶
for future in futures: try: result = future.result() except Exception as e: handle_error(e) ```
2. Use Specific Exception Types¶
```python
Bad: Catch everything¶
except Exception: pass
Good: Catch specific exceptions¶
except (ConnectionError, TimeoutError) as e: retry_task(e) except ValueError as e: log_invalid_input(e) ```
3. Don't Ignore Partial Results¶
```python
Bad: All or nothing¶
try: results = list(executor.map(func, items)) except Exception: results = []
Good: Collect partial results¶
results = [] errors = [] for future in as_completed(futures): try: results.append(future.result()) except Exception as e: errors.append(e) ```
4. Set Timeouts¶
```python
Bad: Wait forever¶
result = future.result()
Good: Set reasonable timeout¶
result = future.result(timeout=30) ```
Key Takeaways¶
- Exceptions in workers don't propagate automatically
- Use
future.result()to get or raise exceptions as_completed()allows handling each task's error individually- Wrap functions to capture exceptions without stopping
map() - Use callbacks for async error handling
- Always set timeouts for production code
- Log errors for debugging
- Ensure cleanup happens even on errors
- Collect partial results when some tasks fail
Exercises¶
Exercise 1.
Use ThreadPoolExecutor.submit() and as_completed() to process 10 items where items 3, 6, and 9 raise a ValueError. Collect successful results and errors separately into two dictionaries mapping input to result/error. Print both dictionaries.
Solution to Exercise 1
```python
from concurrent.futures import ThreadPoolExecutor, as_completed
def process(x):
if x in (3, 6, 9):
raise ValueError(f"Cannot process {x}")
return x ** 2
with ThreadPoolExecutor() as executor:
futures = {executor.submit(process, x): x for x in range(10)}
results = {}
errors = {}
for future in as_completed(futures):
x = futures[future]
try:
results[x] = future.result()
except Exception as e:
errors[x] = str(e)
print(f"Results: {dict(sorted(results.items()))}")
print(f"Errors: {dict(sorted(errors.items()))}")
```
Exercise 2.
Write a safe_map(executor, func, items) function that wraps each call so that exceptions are returned as ("error", str(e)) tuples instead of raising. Use executor.map() internally. Test it with a function that fails on negative inputs. Print the count of successes and errors.
Solution to Exercise 2
```python
from concurrent.futures import ThreadPoolExecutor
def safe_map(executor, func, items):
def safe_func(x):
try:
return ("success", func(x))
except Exception as e:
return ("error", str(e))
return list(executor.map(safe_func, items))
def process(x):
if x < 0:
raise ValueError(f"Negative: {x}")
return x ** 2
data = [3, -1, 5, -2, 7, 0, -4, 10]
with ThreadPoolExecutor() as executor:
results = safe_map(executor, process, data)
successes = [(r[1]) for r in results if r[0] == "success"]
errors = [(r[1]) for r in results if r[0] == "error"]
print(f"Successes ({len(successes)}): {successes}")
print(f"Errors ({len(errors)}): {errors}")
```
Exercise 3.
Implement a batch processor with timeout handling. Submit 8 tasks to a ThreadPoolExecutor, where each task sleeps for a random duration (0.1-2.0s). Use as_completed(futures, timeout=1.0) to collect results that finish within 1 second. Cancel remaining tasks and print how many completed, how many timed out, and the results of the completed ones.
Solution to Exercise 3
```python
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
def task(task_id):
delay = random.uniform(0.1, 2.0)
time.sleep(delay)
return (task_id, delay)
with ThreadPoolExecutor() as executor:
futures = {executor.submit(task, i): i for i in range(8)}
completed = []
try:
for future in as_completed(futures, timeout=1.0):
completed.append(future.result())
except TimeoutError:
pass
timed_out = 0
for future in futures:
if not future.done():
future.cancel()
timed_out += 1
print(f"Completed: {len(completed)}")
print(f"Timed out: {timed_out}")
for tid, delay in completed:
print(f" Task {tid}: {delay:.2f}s")
```