Skip to content

CPU-Bound vs I/O-Bound Tasks

Understanding whether your task is CPU-bound or I/O-bound is crucial for choosing the right concurrency strategy.

Mental Model

Ask yourself: "Is my program slow because the CPU is busy computing, or because it is sitting idle waiting for something external?" CPU-bound tasks need more cores (use processes); I/O-bound tasks need something to do while waiting (use threads or asyncio). Getting this distinction right is the single most important decision in concurrent Python.


Definitions

CPU-Bound

A task is CPU-bound when it spends most of its time doing computation — the CPU is the bottleneck.

Examples:

  • Mathematical calculations
  • Image/video processing
  • Data compression
  • Machine learning training
  • Cryptographic operations
  • Simulations

python def cpu_bound_task(n): """Compute sum of squares — CPU is working hard.""" total = 0 for i in range(n): total += i ** 2 return total

I/O-Bound

A task is I/O-bound when it spends most of its time waiting for input/output operations — the CPU is mostly idle.

Examples:

  • Network requests (HTTP, database queries)
  • File reading/writing
  • User input
  • API calls
  • Web scraping

```python import requests

def io_bound_task(url): """Fetch URL — CPU is mostly waiting.""" response = requests.get(url) # Waiting for network return response.text ```


Visual Comparison

CPU-Bound Execution

``` CPU Activity: ████████████████████████████████████████ 100% busy

Timeline: [compute][compute][compute][compute][done] ```

I/O-Bound Execution

``` CPU Activity: ██░░░░░░░░░░░░░░██░░░░░░░░░░░░░░██░░░░░░ ~20% busy

Timeline: [send request][.....waiting.....][process response] ```


Identifying Task Type

Method 1: CPU Usage Monitoring

```python import time import psutil

def monitor_cpu(func, *args): """Run function while monitoring CPU usage.""" process = psutil.Process()

start = time.perf_counter()
cpu_start = process.cpu_times()

result = func(*args)

cpu_end = process.cpu_times()
elapsed = time.perf_counter() - start

cpu_time = (cpu_end.user - cpu_start.user) + (cpu_end.system - cpu_start.system)
cpu_percent = (cpu_time / elapsed) * 100

print(f"Elapsed: {elapsed:.2f}s")
print(f"CPU time: {cpu_time:.2f}s")
print(f"CPU usage: {cpu_percent:.1f}%")

return result

CPU-bound: ~100% CPU usage

monitor_cpu(cpu_bound_task, 10_000_000)

I/O-bound: ~5% CPU usage

monitor_cpu(io_bound_task, "https://httpbin.org/delay/2") ```

Method 2: Analyze the Code

Look for... Task Type
Loops with computation CPU-bound
Mathematical operations CPU-bound
requests, urllib I/O-bound
File open(), read(), write() I/O-bound
Database queries I/O-bound
time.sleep() I/O-bound (simulated)
subprocess calls Usually I/O-bound

Concurrency Strategy by Task Type

CPU-Bound → Use Processes

```python from concurrent.futures import ProcessPoolExecutor import os

def compute_heavy(n): """CPU-intensive task.""" print(f"Process {os.getpid()}: computing...") return sum(i * i for i in range(n))

numbers = [10_000_000, 20_000_000, 30_000_000, 40_000_000]

ProcessPoolExecutor bypasses GIL

with ProcessPoolExecutor() as executor: results = list(executor.map(compute_heavy, numbers))

print(results) ```

Why processes?

  • Each process has its own Python interpreter
  • Each process has its own GIL
  • True parallel execution on multiple CPU cores

I/O-Bound → Use Threads

```python from concurrent.futures import ThreadPoolExecutor import requests

def fetch_url(url): """I/O-intensive task.""" response = requests.get(url) return len(response.content)

urls = [ "https://httpbin.org/delay/1", "https://httpbin.org/delay/1", "https://httpbin.org/delay/1", "https://httpbin.org/delay/1", ]

ThreadPoolExecutor works well — GIL released during I/O

with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(fetch_url, urls))

print(results) ```

Why threads?

  • GIL is released during I/O operations
  • Threads are lighter weight than processes
  • Shared memory makes data passing easy

Benchmark: Threads vs Processes

CPU-Bound Benchmark

```python import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def cpu_task(n): return sum(i ** 2 for i in range(n))

data = [5_000_000] * 4

Sequential

start = time.perf_counter() results = [cpu_task(n) for n in data] seq_time = time.perf_counter() - start print(f"Sequential: {seq_time:.2f}s")

Threads (limited by GIL)

start = time.perf_counter() with ThreadPoolExecutor(max_workers=4) as ex: results = list(ex.map(cpu_task, data)) thread_time = time.perf_counter() - start print(f"Threads: {thread_time:.2f}s")

Processes (true parallelism)

start = time.perf_counter() with ProcessPoolExecutor(max_workers=4) as ex: results = list(ex.map(cpu_task, data)) proc_time = time.perf_counter() - start print(f"Processes: {proc_time:.2f}s")

Typical results (4-core machine):

Sequential: 3.2s

Threads: 3.5s ← No speedup (GIL)

Processes: 0.9s ← ~4x speedup ✓

```

I/O-Bound Benchmark

```python import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def io_task(seconds): time.sleep(seconds) # Simulate I/O return seconds

data = [1] * 4

Sequential

start = time.perf_counter() results = [io_task(n) for n in data] seq_time = time.perf_counter() - start print(f"Sequential: {seq_time:.2f}s")

Threads

start = time.perf_counter() with ThreadPoolExecutor(max_workers=4) as ex: results = list(ex.map(io_task, data)) thread_time = time.perf_counter() - start print(f"Threads: {thread_time:.2f}s")

Processes

start = time.perf_counter() with ProcessPoolExecutor(max_workers=4) as ex: results = list(ex.map(io_task, data)) proc_time = time.perf_counter() - start print(f"Processes: {proc_time:.2f}s")

Typical results:

Sequential: 4.0s

Threads: 1.0s ← 4x speedup ✓

Processes: 1.1s ← Similar, but more overhead

```


Mixed Workloads

Many real applications have both CPU and I/O components.

Example: Download and Process Images

```python import requests from PIL import Image from io import BytesIO from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def download_image(url): """I/O-bound: fetch from network.""" response = requests.get(url) return response.content

def process_image(image_bytes): """CPU-bound: resize and transform.""" img = Image.open(BytesIO(image_bytes)) img = img.resize((100, 100)) # More CPU-intensive operations... return img

urls = ["https://example.com/img1.jpg", "https://example.com/img2.jpg", ...]

Stage 1: Download (I/O-bound) — use threads

with ThreadPoolExecutor(max_workers=10) as executor: image_bytes_list = list(executor.map(download_image, urls))

Stage 2: Process (CPU-bound) — use processes

with ProcessPoolExecutor() as executor: processed_images = list(executor.map(process_image, image_bytes_list)) ```

Example: Web Scraping with Parsing

```python import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor

def scrape_and_parse(url): """Mixed: I/O (fetch) + CPU (parse).""" # I/O-bound part response = requests.get(url)

# CPU-bound part (but usually fast enough)
soup = BeautifulSoup(response.text, 'html.parser')
return soup.find_all('a')

For mixed tasks where I/O dominates, threads work well

urls = [...] with ThreadPoolExecutor(max_workers=10) as executor: results = list(executor.map(scrape_and_parse, urls)) ```


Decision Matrix

Task Type Sequential Threads Processes
CPU-bound Baseline No speedup ✓ Best
I/O-bound Baseline ✓ Best Good (more overhead)
Mixed (I/O dominant) Baseline ✓ Best Good
Mixed (CPU dominant) Baseline Some speedup ✓ Best

Quick Decision Guide

```python def choose_executor(task_type): """Choose the right executor for your task."""

if task_type == "cpu_bound":
    # CPU-bound: need true parallelism
    from concurrent.futures import ProcessPoolExecutor
    return ProcessPoolExecutor()

elif task_type == "io_bound":
    # I/O-bound: threads work, lower overhead
    from concurrent.futures import ThreadPoolExecutor
    return ThreadPoolExecutor(max_workers=20)

elif task_type == "mixed_io_dominant":
    # Mixed but mostly waiting: threads fine
    from concurrent.futures import ThreadPoolExecutor
    return ThreadPoolExecutor(max_workers=10)

elif task_type == "mixed_cpu_dominant":
    # Mixed but mostly computing: processes
    from concurrent.futures import ProcessPoolExecutor
    return ProcessPoolExecutor()

```


Key Takeaways

  • CPU-bound: CPU is busy computing → use processes
  • I/O-bound: CPU is waiting for I/O → use threads
  • GIL blocks thread parallelism for CPU work, but releases during I/O
  • Measure your task's CPU usage to determine type
  • Mixed workloads: Consider which component dominates, or use two-stage pipeline
  • When in doubt: Start with threads for simplicity, switch to processes if no speedup

Exercises

Exercise 1. Write a function classify_task that accepts a callable and a set of arguments, runs it while measuring wall-clock time and CPU time (using time.perf_counter and time.process_time), and returns "cpu_bound" if the CPU-time-to-wall-time ratio is above 0.8, or "io_bound" otherwise. Test it with a pure computation (sum of squares up to 5,000,000) and a simulated I/O task (time.sleep(1)).

Solution to Exercise 1
```python
import time

def classify_task(func, *args):
    wall_start = time.perf_counter()
    cpu_start = time.process_time()
    func(*args)
    cpu_elapsed = time.process_time() - cpu_start
    wall_elapsed = time.perf_counter() - wall_start
    ratio = cpu_elapsed / wall_elapsed if wall_elapsed > 0 else 1.0
    label = "cpu_bound" if ratio > 0.8 else "io_bound"
    print(f"Wall: {wall_elapsed:.3f}s, CPU: {cpu_elapsed:.3f}s, "
          f"Ratio: {ratio:.2f} -> {label}")
    return label

def compute(n):
    return sum(i * i for i in range(n))

def io_wait():
    time.sleep(1)

print(classify_task(compute, 5_000_000))   # cpu_bound
print(classify_task(io_wait))               # io_bound
```

Exercise 2. Create a two-stage pipeline: (1) an I/O stage that uses ThreadPoolExecutor with 4 workers to simulate downloading 4 files (time.sleep(0.5) each, returning a random list of 100,000 integers), and (2) a CPU stage that uses ProcessPoolExecutor to sort each list. Measure the total time and compare it against a fully sequential version.

Solution to Exercise 2
```python
import time
import random
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def download(file_id):
    time.sleep(0.5)
    return [random.randint(0, 1_000_000) for _ in range(100_000)]

def sort_list(data):
    return sorted(data)

if __name__ == "__main__":
    ids = list(range(4))

    # Sequential
    start = time.perf_counter()
    for i in ids:
        sort_list(download(i))
    seq_time = time.perf_counter() - start

    # Pipeline
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=4) as tex:
        raw = list(tex.map(download, ids))
    with ProcessPoolExecutor() as pex:
        sorted_data = list(pex.map(sort_list, raw))
    pipe_time = time.perf_counter() - start

    print(f"Sequential: {seq_time:.2f}s")
    print(f"Pipeline:   {pipe_time:.2f}s")
    print(f"Speedup:    {seq_time / pipe_time:.2f}x")
```

Exercise 3. Write a benchmark that runs the same pure-Python CPU-bound function (computing the sum of cubes up to 2,000,000) four times using: (a) sequential execution, (b) ThreadPoolExecutor with 4 workers, and (c) ProcessPoolExecutor with 4 workers. Print the elapsed time for each and compute the speedup relative to sequential.

Solution to Exercise 3
```python
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sum_of_cubes(n):
    return sum(i ** 3 for i in range(n))

if __name__ == "__main__":
    N = 2_000_000
    args = [N] * 4

    # Sequential
    start = time.perf_counter()
    [sum_of_cubes(n) for n in args]
    seq = time.perf_counter() - start

    # Threads
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=4) as ex:
        list(ex.map(sum_of_cubes, args))
    thr = time.perf_counter() - start

    # Processes
    start = time.perf_counter()
    with ProcessPoolExecutor(max_workers=4) as ex:
        list(ex.map(sum_of_cubes, args))
    proc = time.perf_counter() - start

    print(f"Sequential: {seq:.2f}s")
    print(f"Threads:    {thr:.2f}s  (speedup {seq/thr:.2f}x)")
    print(f"Processes:  {proc:.2f}s  (speedup {seq/proc:.2f}x)")
```