HTTPX — Async Patterns
AsyncClient Basics
Same API as sync Client, but with await and async with.
import httpx
async def fetch_users() -> list[dict]:
async with httpx.AsyncClient(
base_url="https://api.example.com",
timeout=10,
) as client:
r = await client.get("/users")
r.raise_for_status()
return r.json()
async with ensures the connection pool is properly closed on exit.
Concurrent Requests
Send multiple requests in parallel with asyncio.gather:
import asyncio
import httpx
async def fetch_all(urls: list[str]) -> list[dict]:
"""Fetch multiple URLs concurrently using a single connection pool."""
async with httpx.AsyncClient(timeout=10) as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [r.json() for r in responses if r.is_success]
async def main():
urls = [
"https://api.example.com/users/1",
"https://api.example.com/users/2",
"https://api.example.com/users/3",
]
results = await fetch_all(urls)
Limiting Concurrency
Use asyncio.Semaphore to avoid overwhelming the server:
import asyncio
import httpx
async def fetch_with_limit(
client: httpx.AsyncClient,
url: str,
sem: asyncio.Semaphore,
) -> httpx.Response:
async with sem: # limits concurrent requests
return await client.get(url)
async def fetch_all_limited(urls: list[str], max_concurrent: int = 10) -> list:
sem = asyncio.Semaphore(max_concurrent)
async with httpx.AsyncClient(timeout=10) as client:
tasks = [fetch_with_limit(client, url, sem) for url in urls]
return await asyncio.gather(*tasks)
Streaming Responses
For large downloads — process data without loading everything into memory.
Binary Streaming
import httpx
async def download_file(url: str, dest: str) -> None:
async with httpx.AsyncClient() as client:
async with client.stream("GET", url) as r:
r.raise_for_status()
with open(dest, "wb") as f:
async for chunk in r.aiter_bytes(chunk_size=8192):
f.write(chunk)
Line-by-Line Streaming (NDJSON, Logs, SSE)
async with client.stream("GET", "/events") as r:
async for line in r.aiter_lines():
if line.strip():
process_event(line)
Streaming Methods
| Method | Returns |
|---|---|
r.aiter_bytes() |
Chunks of bytes |
r.aiter_text() |
Chunks of decoded text |
r.aiter_lines() |
Lines of text |
r.aiter_raw() |
Raw bytes (no content decoding) |
Event Hooks
Execute callbacks on every request/response — great for logging and metrics.
import logging
import httpx
log = logging.getLogger(__name__)
def log_request(request: httpx.Request) -> None:
log.debug("Request: %s %s", request.method, request.url)
def log_response(response: httpx.Response) -> None:
response.read() # required — .elapsed is only available after read/close
log.debug(
"Response: %s %s (%dms)",
response.status_code,
response.url,
response.elapsed.total_seconds() * 1000,
)
client = httpx.Client(
event_hooks={
"request": [log_request],
"response": [log_response],
},
timeout=10,
)
For AsyncClient, use async def hooks — call await response.aread() before accessing .elapsed.
Async Fixtures (pytest)
import httpx
import pytest
import pytest_asyncio
@pytest_asyncio.fixture
async def api_client():
"""Async httpx client for API tests."""
async with httpx.AsyncClient(
base_url="https://api.example.com",
timeout=10,
) as client:
yield client
@pytest.mark.asyncio
async def test_list_users(api_client: httpx.AsyncClient):
r = await api_client.get("/users")
assert r.is_success
assert len(r.json()) > 0
Note: @pytest_asyncio.fixture is required in strict mode (default). In auto mode, @pytest.fixture works for async fixtures too.
Best Practices
| Practice | Why |
|---|---|
async with AsyncClient() |
Ensures connection pool cleanup |
| One client per service | Reuse connections, avoid pool exhaustion |
asyncio.gather for fan-out |
Parallel I/O without threads |
Semaphore for rate limiting |
Prevent server overload |
| Event hooks for logging | Centralized, no per-call boilerplate |
| Stream large responses | Avoid OOM on big downloads |
raise_for_status() in hook |
Auto-fail on errors globally |