I’ve recently had to figure out how a service that isn’t a web service actually works in asyncio. These notes are heavily based on the BBC Cloudfit docs.

Synchronous Concepts

Before jumping into the asyncio concepts we need to understand how the synchronous parts work first.

Standard Method Invocation

Function is invoked, something is returned, everything moves iteratively. In older style languages this would be called a subroutine.

def do_thing(i:int):
    return i
 
def do_things():
	for i in range(10):
	    print(f'-- {i}')
		print(do_thing(i))
		print('--')
 
do_things()

Call Stack

Most high level programming languages rely on the call stack abstraction. Starting a program initializes a call stack, and each method call tends to push a new stack frame that keeps track of local variables and the last evaluated instruction. When a return is evaluated, the frame is evaluated, which pops the frame and passes the return value to the previous frame as its current instruction.

def do_thing(x,y):
   return x+y
 
def main():
   print("in main")
   return do_thing(1, 2)
 
if __name__ == "__main__":
   print("calling main")
   print(main())

First stack is actually the module itself, triggered by the if conditional at the bottom. Then:

  1. Push the print call, which returns nothing and can be popped immediately
  2. Step forward and push the second print call, which in turn needs main and pushes a third frame
  3. The print in main pushes and immediately pops a fourth frame
  4. Step forward to the call to do_thing which pushes a new fourth frame including references the two ints
  5. Fourth frame evaluates to 3 and pops, returning to the final instruction of the third frame
  6. Same thing happens in the third frame, popping the 3 and passing it to the print waiting in the second frame
  7. After printing, second frame is popped and the first frame realizes it is done and also pops

This isn’t actually how this works - variables are generally allocated in heap memory and Python actually stores the stack frames themselves in heap memory. But is a simplified explanation that is good enough for explaining how all other styles work.

Traditional multithreaded programs effectively allocate a stack per thread. Each thread then can run through its own instructions on that stack.

Coroutine Pattern

Coroutines stop and hold state, yielding to other coroutines. That way, two coroutines can work cooperatively.

The generator pattern is a subset of this pattern. It holds state and can stop, yielding multiple times. But it does not allow specifying where execution resumes after a yield. Instead, it just returns to the caller.

This is still synchronous. A subroutine like above is effectively just a coroutine that has a single return and whose state is discarded between invocations.

def do_thing(i:int):
    return i*10
 
def do_things():
	for i in range(3):
		t = do_thing(i)
		yield t
		print(f'just yielded {t}')
	
for x in do_things():
	print(x)
	print('--')

The magic here is that a coroutine is aware of its own stack frame and can keep track of the last instruction, allowing us to return to the stack frame as needed and continue evaluation.

In Python, a generator always returns an iterator. The above generator could have been:

def do_things():
	t0 = do_thing(0)
	yield t0
	print(f'just yielded {t0}')
	t1 = do_thing(1)
	yield t1
	print(f'just yielded {t1}')
	t2 = do_thing(2)
	yield t2
	print(f'just yielded {t2}')

Even with a single yield function we could still iterate over the results:

def do_fewer_things():
	yield 0

In addition, we can use comprehension shorthand to create generators.

do_things = (do_thing(i) for in range(3))

Asynchronous Concepts

Now that was have some fundamentals from synchronous concepts we can add new asynchronous concepts.

Event Loops and Tasks

Even when using a generator to run cooperatively a program works through instructions iteratively, with each instruction blocking the process until complete so it can move to the next instruction. At any given time a single thread is focusing on a single call stack, even if the current instruction is not computationally intensive and waiting on something else (like I/O).

The fundamental premise of asyncio is that we can write more efficient programs by ceding control of a call stack in those situations. When a program reaches an instruction that would normally block while waiting, it can instead cede control, pausing that call stack and move to another other call stack. Once the task that is being waited for is done, the paused call stack can be continued.

To do this, we’d need to split our program into tasks, where each task has its own call stack, and then use a task queue to manage the tasks. Our main thread would then poll the task queue in a loop, taking the next task and running it until it cedes or completes. This event loop and task queue form the basis of the cooperative multiprocessing model used in asyncio.

Thinking about this way, we can draw a few conclusions.

  1. asyncio is not natively faster than synchronous code. It requires more setup to run a single task and therefore likely slower for a single task.
  2. asyncio is not inherently multithreaded nor does it benefit from a higher core count since (generally speaking) each event loop would run on a single core in a single thread.
  3. asyncio performance gains come solely from better CPU utilization — we are still doing a single thing at a time but can switch between tasks when the CPU isn’t needed. The hope is that the overhead of context switching is negated by efficient use of resources.
  4. Tasks that hold or block the event loop without ceding control while waiting lose these gains and actually slow the entire event loop. While some tasks can be converted (e.g switching a DB lookup to asynchronous), computationally intensive tasks (e.g. serializing JSON) will always bog down the loop.
  5. The time a task takes is affected by three main factors:
    1. how much time the computation takes (time awake)
    2. how much time the I/O takes (time asleep)
    3. the time spent waiting for the event loop to prioritize the task (time waiting) The first is generally consistent for a given task assuming no external dependencies. The second one is variable relative to compute time, but ideally also mostly consistent if we can assume certain SLAs. The third purely depends on the current state of the loop.
  6. The waiting time reveals both the best case and worst case scenarios. In the best case scenario, there are exactly enough tasks in the loop that one task is always available while all other tasks are sleeping and that task can finish itself just in time for another task to wake up or be enqueued with no delay. In this case, the CPU is always 100% utilized but nothing is delayed. The worst case is when the event loop becomes over burdened, unable to keep up with newly enqueued tasks and until system resources saturate and the program fails.
  7. The delay time also gives us two potential optimization strategies, each with their own downside. If we aim for using less than 100% of compute resources we are guaranteed no real delay, leading to relatively consistent task completion times but wasting compute resources. If we aim to use more than 100% of compute resources every task we are guaranteed that every task will be delayed, leading to inconsistent latency and the risk of over-saturating the event loop.

Actual Implementation

Now that we’ve worked through the theory, we can focus on the actual implementation in Python.

asyncio currently has effectively two different APIs

  • the high level API intended for most users that centers on async/await patterns
  • the low level API intended for library and framework developers to manage event loops, implement protocols, etc.

EventLoop is part of the low level API, while Task is part of the high level API. I’ll use EventLoop and Task to reference to the Python classes or their instances. Without formatting (i.e. loop and task) will still refer to the concepts introduced above.

Low Level API

The low level API is mostly meant for frameworks and libraries, not for general usage. For example, uvloop, an event loop based on the same libuv that backs node.js, implements its own EventLoop that matches the low level API and can be used as a drop-in.

As a result, it makes sense to focus on the high level API instead.

The distinction between high and low level APIs is a relatively new change. In earlier versions of asyncio it was necessary to initialize an EventLoop and then enqueue tasks on it manually. This is an example from the python 3.6 docs:

import asyncio
 
async def hello_world():
    print("Hello World!")
 
loop = asyncio.get_event_loop()
loop.run_until_complete(hello_world())
loop.close()

Since Python 3.7, there is an equivalent high level API we can use instead

import asyncio
 
async def hello_world():
    print("Hello World!")
 
asyncio.run(hello_world())

This is a good reminder that the API for asyncio changes iteratively with each subsequent Python release. Best practices and examples online may be out of date.

High Level API

As mentioned before, the high level API focuses on async/await patterns. As a result, we need to understand async, await, and awaitable objects, the types that can be used in an await expression.

asyncis the keyword that defines native Python coroutines.

async def hello_world():
    print("Hello World!")

Coroutines, along with Task and Future, are the three main types of awaitables. Future is a low level type so we’ll ignore for now.

Native coroutines are the return type when calling an async function - we can consider them a reference to a running asynchronous function. The tricky part is the final keyword: await.

So what happens when we await the previous async function?

await hello_world()

Since await can only be used in an async context (e.g. within asyncio.run), we can guarantee that an event loop already exists. await immediately enqueues the task for the native coroutine on the running event loop and cedes control of the current task until it is resolved.

So if we have:

async def hello():
	print("Hello")
 
async def world():
	print("World!")
 
await hello()
await world()

The program runs hello() first, waits for it be done, and then does the same for world().

It doesn’t matter when world() or hello() is called. The coroutines are only enqueued when they are awaited.

Therefore, this is the exact same as before.

async def hello():
	print("Hello")
 
async def world():
	print("World!")
 
h = hello()
w = world()
 
await h
await w

Since await is what enqueues and pauses the event loop, we await h, which stops the task till h is done before doing the same with w.

So how can we do two things in parallel? Only one task is run at a time, but we can enqueue multiple tasks and let them execute cooperatively with Task.

h = asyncio.create_task(hello())
w = asyncio.create_task(world())
await h
await w

This is acts just like the previous minimum examples, but technically is a little different. Unlike await, create_task adds a task to the event loop but does not cede control the loop to it. So after h and w are created the only reason they are run is because of the await on the third line and fourth lines, which cede control and let them run.

But if we didn’t await, these would run whenever they were allowed to do so. For example:

async def main():
	asyncio.create_task(hello())
	asyncio.create_task(world())
	print("main is finished")
 
 
asyncio.run(main())

Then run command creates an event loop and they are run after main() completed.

This becomes even more apparent if we use asyncio.sleep. It takes a delay parameter and creates a coroutine that instantly cedes itself, allowing other tasks to run, and only returns when the delay is over.

h = asyncio.create_task(hello())
w = asyncio.create_task(world())
await asyncio.sleep(1)
print("now we await")
await h
await w

The "Hello World" messages show up before the program ends because the tasks run in the time sleep gave them. The "now we await" message only runs after the delay is over, and the two final await functions at the end are instantaneous since their coroutines have already been run.

Like most other APIs in asyncio, the Task creation interface has evolved over releases. For simple use cases we can use asyncio.gather which creates the necessary Tasksfor us.

await asyncio.gather(hello(), world())

Or we can use TaskGroup

async with asyncio.TaskGroup() as tg:
        h = tg.create_task(hello())
        w = tg.create_task(world())

At the end of the await for gather and at the end of the with with TaskGroup we can guarantee the the tasks are completed.

It is worth mentioning that both of these strategies allow us to sidestep a well known heisenbug where Python can lose a reference to a Task if nobody holds onto it.

Another benefit of the Task api is that unlike regular await we can also call cancel on a Task if we no longer need it. We can also cancel the result from gather if we no longer want any of the pending tasks to run.

Async Flow Control

We can layer awaitables with regular Python flow management.

Try/Finally

The asyncio docs recommend using try/finally to handle task cancellation and other potential errors. Like KeyboardInterrupt, asyncio.CancellationError extends BaseException and therefore does not get caught by a standard except Exception block. The finally block will always be called so it can be used for task cleanup when triggered.

Context Managers

Just like with for regular context managers, we can create async context managers using async with. This example is from the Python documentation:

from contextlib import asynccontextmanager
 
@asynccontextmanager
async def get_connection():
    conn = await acquire_db_connection()
    try:
        yield conn
    finally:
        await release_db_connection(conn)
 
async def get_all_users():
    async with get_connection() as conn:
        return conn.query('SELECT ...')

The benefit here is that the context manager itself is its own task and can use further awaitables in setup and teardown.

Iterators and Iterables

The pattern here is very similar to the existing synchronous pattern. A regular iterable implements __iter__ and a regular iterator implements __next__. The asynchronous variants implement __aiter__ and __anext__.

Then, just as we can use for for synchronous variants, we can use async for to actually iterate through.

The following example is from PEP492:

class Cursor:
    def __init__(self):
        self.buffer = collections.deque()
 
    async def _prefetch(self):
        ...
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if not self.buffer:
            self.buffer = await self._prefetch()
            if not self.buffer:
                raise StopAsyncIteration
        return self.buffer.popleft()
 
 
async for row in Cursor():
    print(row)

Since __anext__ returns a native coroutine, each iteration forward has its own task on the loop.

Async Generators

If adding yield to a regular function makes it a generator, what what happens if we yield from a native coroutine?

async def what_happens_now():
   yield 32

Earlier we said that synchronous generators were conceptual coroutines because yield allows us to pause the running function and resume it later. We also said that native coroutines allow us to pause the running task with await.

Now we the best of both worlds. We can pause and resume a function within a task on the event loop while also sometimes ceding control of the loop.

This allows us to work around the limitation that __anext__ on an async iterator will normally create a brand new task per iteration, meaning that state between calls needs to be shared on the iterator class itself.

In this example from PEP 582, the iterator version would look like:

class Ticker:
    """Yield numbers from 0 to `to` every `delay` seconds."""
 
    def __init__(self, delay, to):
        self.delay = delay
        self.i = 0
        self.to = to
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        i = self.i
        if i >= self.to:
            raise StopAsyncIteration
        self.i += 1
        if i:
            await asyncio.sleep(self.delay)
        return i

while the generator is the much simpler and apparently much more performant

async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

This magic does have a big downside. Despite being tagged with the async keyword, async generator methods are actually synchronous! They do not return a native coroutine and therefore are not awaitable. The following throws an exception:

async def oops():
	yield 1
 
await oops()
# TypeError: object async_generator can't be used in 'await' expression

Instead, they return an object that conforms to the async iterator protocol.

async for o in oops():
	print(o)

The asynchronous execution is still tucked away in __anext__. By extension, this means we can actually define asynchronous generators in synchronous code. The following is valid code:

def boring_old_function(old_gen):
	new_gen = (await x.func() async for x in old_gen if not(await x.other()))
	return new_gen

Like before, all of the asynchronous bits are being hidden called in other asynchronous contexts, not this one.

Just to make it even more confusing — while there are also asynchronous set, list, and dictionary comprehensions, these all run in place and thus can only be called inside async functions.

s = {i async for i in oops()}
l = [i async for i in oops()]
d = {i:i async for i in oops()}

Another downside is that asynchronous generators are a little trickier to clean up. They can be cancelled just like native coroutines. However, unlike cancelled coroutines, they would still have their own paused stacks at the point a finally is triggered. As a result, EventLoop provides a function shutdown_asyncgens that can finalize all running async generators. Luckily, the high level asyncio.run handles this for us.

Using asyncio

For Web

Luckily, for most web applications, we don’t even need to think about initializing our event loop at all. The ASGI specification splits responsibilities between a protocol server and an application that is hosted within it. While some frameworks (e.g. Quart or Sanic) implement both portions of the specification, most (e.g. Starlette, FastAPI, and Django) focus only on the application portion and defer the protocol server to another project (e.g. Daphne, Uvicorn, or Hypercorn). In both situations, the protocol server is responsible for creating the event loop lifecycle and initializing the application within a native coroutine. As a result, we can mostly focus on writing routes and business logic.

from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
 
async def homepage(request):
    return JSONResponse({'hello': 'world'})
 
routes = [
    Route("/", endpoint=homepage)
]
 
app = Starlette(debug=True, routes=routes)

Implementing your own Protocols

If for some reason you wanted to implement your own server, the low level API includes loop.create_server. This is used by uvicorn and aiohttp.

The majority of your network handling functionality will need to be handled by your Protocol factory, the first argument to create_server.

Your Own Long Running Task

What if you don’t need a network server implementation and just want to keep your process alive for a long time?

async def main():
   while True:
      try:
         await asyncio.sleep(1)
         print("woke up")
      except asyncio.CancelledError as e:
         print("Caught the cancel before throwing again")
         raise e
 
try:
   print("Starting up")
   asyncio.run(main())
except KeyboardInterrupt:
   print("Catching shutdown")
finally:
   print("Starting shutting down")

This of course doesn’t do very much, but is a minimal example of a process that runs forever and handles a process interruption gracefully.

The while True loop blocks the main() coroutine while the asyncio.sleep within it suspends the main() coroutine for 1s at a time, creating time for other tasks on the loop to run.

It is not required for the top level coroutine to have the while True block. For example, in aiokafka, the Consumer class has the while True loop within __anext__. Iterating through the records in the consumer actually creates a new blocking loop each time we ask for another record that only breaks when a new record is received.

While it isn’t harmful to have multiple tasks each have their own while True loop, it is important to make sure that the tasks we enqueue do not block the entire loop or the thread. For example, the example above could work just as well with 0.1 or 1000 for the sleep. It would affect how much time is spent actively in the main()coroutine and we likely would benefit from a higher value since that that gives other tasks more time to get done.

Good Reads: