Skip to content

Async HTTP with aiohttp

aiohttp is the standard library for async HTTP in Python, supporting both client and server functionality.

Installation

pip install aiohttp

Basic Client Usage

Simple GET Request

import asyncio
import aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            print(f"Status: {response.status}")
            return await response.text()

async def main():
    html = await fetch("https://example.com")
    print(html[:200])

asyncio.run(main())

Session Best Practices

# WRONG: Creating session per request (slow)
async def fetch_bad(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

# CORRECT: Reuse session for multiple requests
async def fetch_many(urls):
    async with aiohttp.ClientSession() as session:
        results = []
        for url in urls:
            async with session.get(url) as response:
                results.append(await response.text())
        return results

# BEST: Pass session as parameter
async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ["https://example.com"] * 10
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

Request Methods

async def http_methods_example():
    async with aiohttp.ClientSession() as session:
        # GET
        async with session.get('https://api.example.com/items') as r:
            data = await r.json()

        # POST
        async with session.post('https://api.example.com/items', 
                                json={'name': 'item'}) as r:
            result = await r.json()

        # PUT
        async with session.put('https://api.example.com/items/1',
                               json={'name': 'updated'}) as r:
            result = await r.json()

        # DELETE
        async with session.delete('https://api.example.com/items/1') as r:
            print(r.status)

        # PATCH
        async with session.patch('https://api.example.com/items/1',
                                 json={'name': 'patched'}) as r:
            result = await r.json()

Request Parameters

Query Parameters

async def with_params():
    async with aiohttp.ClientSession() as session:
        params = {'key': 'value', 'page': 1}
        async with session.get('https://api.example.com/search', 
                               params=params) as r:
            # URL becomes: https://api.example.com/search?key=value&page=1
            return await r.json()

Headers

async def with_headers():
    async with aiohttp.ClientSession() as session:
        headers = {
            'Authorization': 'Bearer token123',
            'Accept': 'application/json'
        }
        async with session.get('https://api.example.com', 
                               headers=headers) as r:
            return await r.json()

# Or set default headers for session
async def with_default_headers():
    headers = {'Authorization': 'Bearer token123'}
    async with aiohttp.ClientSession(headers=headers) as session:
        # All requests use these headers
        async with session.get('https://api.example.com') as r:
            return await r.json()

POST Data

async def post_data():
    async with aiohttp.ClientSession() as session:
        # JSON data
        async with session.post('https://api.example.com',
                                json={'key': 'value'}) as r:
            return await r.json()

        # Form data
        async with session.post('https://api.example.com',
                                data={'key': 'value'}) as r:
            return await r.text()

        # Form data with file
        data = aiohttp.FormData()
        data.add_field('file', open('file.txt', 'rb'),
                       filename='file.txt',
                       content_type='text/plain')
        async with session.post('https://api.example.com/upload',
                                data=data) as r:
            return await r.json()

Response Handling

async def handle_response():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com') as r:
            # Status
            print(r.status)        # 200
            print(r.reason)        # 'OK'

            # Headers
            print(r.headers)
            print(r.headers['Content-Type'])

            # URL (after redirects)
            print(r.url)

            # Response body
            text = await r.text()          # As string
            data = await r.json()          # As JSON
            binary = await r.read()        # As bytes

            # Streaming
            async for chunk in r.content.iter_chunked(1024):
                process_chunk(chunk)

Timeouts

async def with_timeout():
    # Per-request timeout
    timeout = aiohttp.ClientTimeout(total=30)

    async with aiohttp.ClientSession(timeout=timeout) as session:
        try:
            async with session.get('https://slow-api.example.com') as r:
                return await r.json()
        except asyncio.TimeoutError:
            print("Request timed out")

# More granular timeouts
timeout = aiohttp.ClientTimeout(
    total=60,        # Total timeout
    connect=10,      # Connection timeout
    sock_read=30,    # Read timeout
    sock_connect=10  # Socket connect timeout
)

Error Handling

async def safe_fetch(url):
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url) as response:
                response.raise_for_status()  # Raise on 4xx/5xx
                return await response.json()

        except aiohttp.ClientConnectionError:
            print("Connection failed")
        except aiohttp.ClientResponseError as e:
            print(f"HTTP error: {e.status}")
        except aiohttp.ClientError as e:
            print(f"Client error: {e}")
        except asyncio.TimeoutError:
            print("Request timed out")

Concurrent Requests

Using gather()

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        async def fetch_one(url):
            async with session.get(url) as r:
                return await r.json()

        tasks = [fetch_one(url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

async def main():
    urls = [f"https://api.example.com/item/{i}" for i in range(100)]
    results = await fetch_all(urls)

Rate Limiting with Semaphore

async def fetch_with_rate_limit(urls, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_one(session, url):
        async with semaphore:
            async with session.get(url) as r:
                return await r.json()

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        return await asyncio.gather(*tasks)

With Retry Logic

async def fetch_with_retry(session, url, max_retries=3, backoff=1.0):
    for attempt in range(max_retries):
        try:
            async with session.get(url) as r:
                r.raise_for_status()
                return await r.json()
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt == max_retries - 1:
                raise

            wait = backoff * (2 ** attempt)
            print(f"Retry {attempt + 1} in {wait}s: {e}")
            await asyncio.sleep(wait)

Practical Examples

1. API Client Class

class APIClient:
    def __init__(self, base_url, api_key):
        self.base_url = base_url
        self.api_key = api_key
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            headers={'Authorization': f'Bearer {self.api_key}'}
        )
        return self

    async def __aexit__(self, *args):
        await self.session.close()

    async def get(self, endpoint, **params):
        url = f"{self.base_url}/{endpoint}"
        async with self.session.get(url, params=params) as r:
            r.raise_for_status()
            return await r.json()

    async def post(self, endpoint, data):
        url = f"{self.base_url}/{endpoint}"
        async with self.session.post(url, json=data) as r:
            r.raise_for_status()
            return await r.json()

# Usage
async def main():
    async with APIClient("https://api.example.com", "secret") as client:
        users = await client.get("users", page=1)
        new_user = await client.post("users", {"name": "Alice"})

2. Parallel API Fetcher

async def parallel_api_fetch(endpoints, base_url, max_concurrent=5):
    semaphore = asyncio.Semaphore(max_concurrent)
    results = {}

    async def fetch_endpoint(session, endpoint):
        async with semaphore:
            try:
                async with session.get(f"{base_url}/{endpoint}") as r:
                    results[endpoint] = await r.json()
            except Exception as e:
                results[endpoint] = {"error": str(e)}

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_endpoint(session, ep) for ep in endpoints]
        await asyncio.gather(*tasks)

    return results

# Usage
async def main():
    endpoints = ["users", "posts", "comments", "todos"]
    data = await parallel_api_fetch(endpoints, "https://jsonplaceholder.typicode.com")
    print(data)

3. Streaming Download

async def download_file(url, filepath, chunk_size=8192):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            with open(filepath, 'wb') as f:
                async for chunk in response.content.iter_chunked(chunk_size):
                    f.write(chunk)

# With progress
async def download_with_progress(url, filepath):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            total = int(response.headers.get('content-length', 0))
            downloaded = 0

            with open(filepath, 'wb') as f:
                async for chunk in response.content.iter_chunked(8192):
                    f.write(chunk)
                    downloaded += len(chunk)
                    progress = (downloaded / total * 100) if total else 0
                    print(f"\rProgress: {progress:.1f}%", end='')

4. WebSocket Client

async def websocket_client():
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect('wss://echo.websocket.org') as ws:
            # Send message
            await ws.send_str('Hello!')

            # Receive messages
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    print(f"Received: {msg.data}")
                    if msg.data == 'close':
                        await ws.close()
                        break
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    break

Connection Pooling

# Configure connector for connection pooling
connector = aiohttp.TCPConnector(
    limit=100,           # Total connection limit
    limit_per_host=10,   # Per-host limit
    ttl_dns_cache=300,   # DNS cache TTL
    keepalive_timeout=30 # Keep-alive timeout
)

async with aiohttp.ClientSession(connector=connector) as session:
    # All requests share the connection pool
    pass

Key Takeaways

  • Always use async with for sessions and responses
  • Reuse sessions across multiple requests
  • Use semaphores for rate limiting
  • Implement retry logic for reliability
  • Handle timeouts and errors gracefully
  • Use connection pooling for high-throughput applications
  • Stream large responses with iter_chunked()