8 minutes
Understanding concurrency with asyncio
Introduction
I/O is slow. In fact, in most cases, it’s orders of magnitude slower than anything else your program is doing. Here’s a visual representation of typical operation speeds:
gantt title Operation Speed Comparison (Log Scale) dateFormat X axisFormat %s section Memory L1 Cache (0.5ns) :0, 1 RAM (100ns) :0, 200 section Storage SSD (16μs) :0, 32000 section Network Network (150ms) :0, 15000000
This latency can cause a program that executes sequentially to waste resources waiting on I/O operations. Consider this common example:
import requests
resp = requests.get('https://www.example.com/')
# program is blocked here until request is completed
print(resp.status_code)
In this example, the program is blocked from continuing its flow of execution until the https request is completed.This post is an attempt at understanding how Python3’s asyncio
tackles this problem using concurrency. There’s multiple ways to minimize the execution time of such programs, by allowing them to execute other tasks while waiting for the request to respond. And they generally fall into two major categories: concurrency and parallelism. You can think of concurrency as dealing with multiple things at the same time and parallelism doing multiple things at the same time. The former has overlapping time periods of execution while the latter does simultaneous execution. There are tons of articles and blog posts about this so I won’t go into detail here. Parallelism is implemented in Python with the multiprocessing
module and concurrency with threading
and asyncio
. Asyncio’s design is particularly interesting because multiprocessing and multithreading have large resource overheads while asyncio is a single-threaded, single-process design which uses cooperative multitasking. In cooperative multitasking, tasks yield to the scheduler as opposed to preemptive multitasking where the scheduler interrupts tasks. If you want to know how asyncio works under the hood, you can check out this detailed article written by A. Jesse Jiryu Davis and Guido van Rossum, and another one by Brett Cannon.
Example
The asyncio
module provides tools for building concurrent applications using an event loop and coroutines. Coroutines are functions that can be suspended and resumed while being executed. On await they release control back to the event loop. The event loop schedules concurrent tasks and manages their execution. Let’s have a look at a simple example to illustrate how this pattern can be implemented at a high level:
import asyncio
import datetime
import random
import time
def run_task(task_id):
start = datetime.datetime.now()
print(f"Starting task{task_id} at {start.strftime('%H:%M:%S')}")
# simulate i/o operation using sleep
time.sleep(random.random())
finish = datetime.datetime.now()
timer = finish - start
print(f"Task{task_id} completed at {finish.strftime('%H:%M:%S')}, in {timer.total_seconds():.2f} seconds.")
async def coro(task_id):
start = datetime.datetime.now()
print(f"Starting task{task_id} at {start.strftime('%H:%M:%S')}")
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
finish = datetime.datetime.now()
timer = finish - start
print(f"Task{task_id} completed at {finish.strftime('%H:%M:%S')}, in {timer.total_seconds():.2f} seconds.")
def run_all_tasks():
print("Running tasks sequentially... ")
start = time.time()
for i in range(1, 11):
run_task(i)
print(f"Process took: {time.time() - start : .2f} seconds. \n")
async def main():
print("Running tasks concurrently... ")
start = time.time()
tasks = [asyncio.create_task(coro(i)) for i in range(1, 11)]
await asyncio.gather(*tasks)
print(f"Process took: {time.time() - start : .2f} seconds.")
run_all_tasks()
asyncio.run(main())
Output:
$ python example.py
Running tasks sequentially...
Starting task1 at 21:48:30
Task1 completed at 21:48:31, in 0.89 seconds.
Starting task2 at 21:48:31
Task2 completed at 21:48:31, in 0.26 seconds.
Starting task3 at 21:48:31
Task3 completed at 21:48:31, in 0.21 seconds.
Starting task4 at 21:48:31
Task4 completed at 21:48:32, in 0.59 seconds.
Starting task5 at 21:48:32
Task5 completed at 21:48:33, in 0.86 seconds.
Starting task6 at 21:48:33
Task6 completed at 21:48:33, in 0.04 seconds.
Starting task7 at 21:48:33
Task7 completed at 21:48:33, in 0.29 seconds.
Starting task8 at 21:48:33
Task8 completed at 21:48:34, in 0.58 seconds.
Starting task9 at 21:48:34
Task9 completed at 21:48:34, in 0.28 seconds.
Starting task10 at 21:48:34
Task10 completed at 21:48:34, in 0.38 seconds.
Process took: 4.37 seconds.
Running tasks concurrently...
Starting task1 at 21:48:34
Starting task2 at 21:48:34
Starting task3 at 21:48:34
Starting task4 at 21:48:34
Starting task5 at 21:48:34
Starting task6 at 21:48:34
Starting task7 at 21:48:34
Starting task8 at 21:48:34
Starting task9 at 21:48:34
Starting task10 at 21:48:34
Task3 completed at 21:48:35, in 0.37 seconds.
Task5 completed at 21:48:35, in 0.40 seconds.
Task9 completed at 21:48:35, in 0.61 seconds.
Task4 completed at 21:48:35, in 0.64 seconds.
Task1 completed at 21:48:35, in 0.70 seconds.
Task8 completed at 21:48:35, in 0.70 seconds.
Task7 completed at 21:48:35, in 0.76 seconds.
Task6 completed at 21:48:35, in 0.79 seconds.
Task10 completed at 21:48:35, in 0.79 seconds.
Task2 completed at 21:48:35, in 0.90 seconds.
Process took: 0.90 seconds.
The first thing to note here is that when we run the tasks asynchronously, even though we’re starting all of the tasks at the same time the order of execution and completion depend on the random number of seconds in each coroutine, i.e. the latency of i/o in each coroutine. The second (and more important) thing to note is that when running sequentially, the process took at least as much time as all 10 tasks combined while in asynchronous execution the process took about as long as the slowest task. As the number of tasks increases, running them asynchronously becomes exponentially faster. To demonstrate this, let’s run the same function and coroutine using 100 tasks, while reducing the sleep time by a factor of 10.
import asyncio
import datetime
import random
import time
def run_task(task_id, factor):
start = datetime.datetime.now()
print(f"Starting task{task_id} at {start.strftime('%H:%M:%S')}")
# simulate i/o operation using sleep
time.sleep(random.random()*factor)
finish = datetime.datetime.now()
timer = finish - start
print(f"Task{task_id} completed at {finish.strftime('%H:%M:%S')}, in {timer.total_seconds():.3f} seconds.")
async def coro(task_id, factor):
start = datetime.datetime.now()
print(f"Starting task{task_id} at {start.strftime('%H:%M:%S')}")
# simulate i/o operation using sleep
await asyncio.sleep(random.random()*factor)
finish = datetime.datetime.now()
timer = finish - start
print(f"Task{task_id} completed at {finish.strftime('%H:%M:%S')}, in {timer.total_seconds():.3f} seconds.")
def run_all_tasks(how_many):
print("Running tasks sequentially... ")
start = time.time()
for i in range(1, how_many + 1):
run_task(i, 10/how_many)
print(f"Process took: {time.time() - start : .2f} seconds. \n")
async def main(how_many):
print("Running tasks concurrently... ")
start = time.time()
tasks = [asyncio.create_task(coro(i, 10/how_many)) for i in range(1, how_many + 1)]
await asyncio.gather(*tasks)
print(f"Process took: {time.time() - start : .2f} seconds.")
run_all_tasks(100)
asyncio.run(main(100))
Output:
$ python example.py
Running tasks sequentially...
Starting task1 at 00:15:21
Task1 completed at 00:15:21, in 0.025 seconds.
Starting task2 at 00:15:21
Task2 completed at 00:15:21, in 0.050 seconds.
Starting task3 at 00:15:21
Task3 completed at 00:15:21, in 0.049 seconds.
Starting task4 at 00:15:21
Task4 completed at 00:15:22, in 0.069 seconds.
Starting task5 at 00:15:22
Task5 completed at 00:15:22, in 0.006 seconds.
Starting task6 at 00:15:22
Task6 completed at 00:15:22, in 0.067 seconds.
Starting task7 at 00:15:22
Task7 completed at 00:15:22, in 0.090 seconds.
.
.
.
Task94 completed at 00:15:26, in 0.026 seconds.
Starting task95 at 00:15:26
Task95 completed at 00:15:26, in 0.001 seconds.
Starting task96 at 00:15:26
Task96 completed at 00:15:26, in 0.096 seconds.
Starting task97 at 00:15:26
Task97 completed at 00:15:26, in 0.034 seconds.
Starting task98 at 00:15:26
Task98 completed at 00:15:26, in 0.065 seconds.
Starting task99 at 00:15:26
Task99 completed at 00:15:26, in 0.038 seconds.
Starting task100 at 00:15:26
Task100 completed at 00:15:26, in 0.055 seconds.
Process took: 4.61 seconds.
Running tasks concurrently...
Starting task1 at 00:15:26
Starting task2 at 00:15:26
Starting task3 at 00:15:26
Starting task4 at 00:15:26
Starting task5 at 00:15:26
Starting task6 at 00:15:26
Starting task7 at 00:15:26
Starting task8 at 00:15:26
Starting task9 at 00:15:26
Starting task10 at 00:15:26
Starting task11 at 00:15:26
Starting task12 at 00:15:26
.
.
.
Task81 completed at 00:15:26, in 0.087 seconds.
Task30 completed at 00:15:26, in 0.090 seconds.
Task80 completed at 00:15:26, in 0.087 seconds.
Task88 completed at 00:15:26, in 0.089 seconds.
Task62 completed at 00:15:26, in 0.092 seconds.
Task63 completed at 00:15:26, in 0.094 seconds.
Task60 completed at 00:15:26, in 0.096 seconds.
Task55 completed at 00:15:26, in 0.098 seconds.
Task31 completed at 00:15:26, in 0.100 seconds.
Process took: 0.10 seconds.
The benchmarks demonstrate the dramatic scaling benefits of asynchronous programming, with performance improvements jumping from 5x with 10 tasks to 46x with 100 tasks. While larger numbers of tasks introduce some overhead from context switching - meaning the total process time may exceed the longest individual task - the asynchronous approach still significantly outperforms sequential execution. This pattern shows that asyncio excels at handling many I/O-bound tasks, though like all tools, it has practical limits that depend on your specific use case.
When NOT to Use asyncio
Not every situation benefits from asyncio. Here’s when to avoid it:
- CPU-Bound Tasks
# Bad use of asyncio - CPU intensive
async def compute_fibonacci(n):
if n <= 1: return n
return await compute_fibonacci(n-1) + await compute_fibonacci(n-2)
# Better to use multiprocessing
from multiprocessing import Pool
def compute_fibonacci(n):
if n <= 1: return n
return compute_fibonacci(n-1) + compute_fibonacci(n-2)
with Pool() as pool:
results = pool.map(compute_fibonacci, [30, 31, 32, 33])
- Simple Sequential Tasks
# Overkill for asyncio
async def process_small_list():
items = [1, 2, 3, 4, 5]
results = []
for item in items:
results.append(item * 2)
return results
# Just use regular functions
def process_small_list():
return [item * 2 for item in [1, 2, 3, 4, 5]]
- When Code Readability is Critical
# Async code can be harder to reason about
async def complex_business_logic():
async with context_manager() as ctx:
try:
result = await operation1()
if result:
await operation2()
else:
await operation3()
except Exception:
await cleanup()
# Simple synchronous code might be clearer
def complex_business_logic():
with context_manager() as ctx:
try:
result = operation1()
operation2() if result else operation3()
except Exception:
cleanup()
Conclusion
That’s it for this post. Some key things to keep in mind:
- Simply calling a coroutine will not schedule it to be executed
- Everything you call from a coroutine should be non blocking or else you risk stalling the entire system
- asyncio does not magically make things non-blocking. You have to explicitly yield to the event loop