The Mental Model
AsyncIO is cooperative concurrency โ not parallelism. One thread, multiple tasks. Tasks yield control to each other at await points.
Task A: start โ make API call (waiting...) โ โ yield โ
Task B: start โ make API call (waiting...) โ โ yield โ
Task A: โ receive response โ finish
Task B: โ receive response โ finish
Both tasks overlap in time โ but only one runs at any given instant. The CPU isn't busy during network waits; AsyncIO fills that idle time with other tasks.
When asyncio wins: I/O-bound work (HTTP requests, database queries, file operations)
When asyncio doesn't win: CPU-bound work (image processing, ML inference) โ use multiprocessing
The Basics
import asyncio
async def greet(name, delay):
await asyncio.sleep(delay) # Non-blocking sleep
print(f"Hello, {name}!")
return name
# Run a coroutine
asyncio.run(greet("World", 1))
# Run multiple coroutines concurrently
async def main():
# Sequential: takes 3 seconds total
await greet("Alice", 1)
await greet("Bob", 2)
# Concurrent: takes 2 seconds (the longest)
await asyncio.gather(
greet("Alice", 1),
greet("Bob", 2),
)
asyncio.run(main())
async def and await
async defโ defines a coroutine function (returns a coroutine, doesn't execute yet)awaitโ suspends the current coroutine until the awaited thing completesasyncio.run()โ runs the top-level coroutine and starts the event loop
async def fetch_user(user_id: int) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(f'/api/users/{user_id}') as response:
return await response.json()
# โ Can't call directly from sync code โ it's a coroutine
result = fetch_user(1) # Returns a coroutine object, not the data
# โ
Must await it:
result = await fetch_user(1) # Inside async function
# or run it as the top-level:
result = asyncio.run(fetch_user(1))
asyncio.gather โ Run Tasks Concurrently
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.json()
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
urls = [
'https://api.example.com/users/1',
'https://api.example.com/users/2',
'https://api.example.com/users/3',
]
# Fetches all 3 concurrently instead of sequentially
results = asyncio.run(fetch_all(urls))
Sequential (sync): 3 ร 200ms = 600ms Concurrent (asyncio): ~200ms (all run simultaneously)
asyncio.gather vs asyncio.create_task
async def main():
# gather: starts tasks and waits for all to finish
results = await asyncio.gather(task1(), task2(), task3())
# create_task: starts task immediately in the background
task = asyncio.create_task(background_work())
# ... do other work ...
result = await task # Wait for it when you need the result
Use create_task when you want to start a task now but continue doing other things before collecting the result.
Error Handling
async def might_fail(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.json()
# Option 1: gather raises on first exception
try:
results = await asyncio.gather(
might_fail('https://good-url.com'),
might_fail('https://bad-url.com'), # Will raise
)
except Exception as e:
print(f"One failed: {e}")
# Option 2: return_exceptions=True โ collect all results including exceptions
results = await asyncio.gather(
might_fail('https://good-url.com'),
might_fail('https://bad-url.com'),
return_exceptions=True,
)
for result in results:
if isinstance(result, Exception):
print(f"Failed: {result}")
else:
print(f"Success: {result}")
Timeout Handling
async def fetch_with_timeout(url, timeout=5.0):
try:
async with asyncio.timeout(timeout): # Python 3.11+
return await fetch(url)
except asyncio.TimeoutError:
return None
# Pre-3.11 alternative:
async def fetch_with_timeout_old(url, timeout=5.0):
try:
return await asyncio.wait_for(fetch(url), timeout=timeout)
except asyncio.TimeoutError:
return None
Async Context Managers and Iterators
# Async context manager โ for resources that need async setup/teardown
class DatabaseConnection:
async def __aenter__(self):
self.conn = await create_connection()
return self.conn
async def __aexit__(self, *args):
await self.conn.close()
async with DatabaseConnection() as db:
result = await db.query("SELECT * FROM users")
# Async iterator โ for streaming data
async def stream_results(query):
async with database.transaction():
async for row in database.iterate(query):
yield row
async for result in stream_results("SELECT * FROM users"):
process(result)
Key Takeaways
- AsyncIO is cooperative concurrency โ one thread, tasks yield at
awaitpoints - Best for I/O-bound work (HTTP, DB, files) โ not CPU-bound work
async defdefines a coroutine;awaitruns it and suspends until doneasyncio.gather()runs multiple coroutines concurrently- Use
return_exceptions=Trueingatherto handle partial failures - Use
asyncio.timeout()(Python 3.11+) orasyncio.wait_for()for timeouts