Practical Guide to Asynchronous Programming in Python
Python gives you powerful tools for asynchronous programming. This lets you write concurrent code without dealing with the headaches of traditional threading.
The asyncio
library, added in Python 3.4 and improved in later versions, offers a clean way to write single-threaded concurrent code using coroutines, event loops, and Future objects.
In this guide, I'll show you how to create and use effective asynchronous patterns in your Python applications.
Prerequisites
Before you start, make sure you have a recent version of Python (3.7 or higher) installed on your computer.
Step 1 — Understanding the basics of asynchronous programming
For the best experience, create a new Python project to try out these concepts as you read.
Start by making a new directory and setting up a virtual environment:
mkdir python-async && cd python-async
python3 -m venv venv
Activate the virtual environment:
source venv/bin/activate
Let's start with a simple example that shows the main problem asynchronous programming solves - operations that block execution. Create a file named main.py
with this code:
import time
def fetch_data(source):
print(f"Fetching data from {source}...")
time.sleep(2) # Simulating I/O operation
print(f"Done fetching from {source}")
return f"Data from {source}"
def main():
start_time = time.time()
# Sequential execution of three operations
data1 = fetch_data("source_1")
data2 = fetch_data("source_2")
data3 = fetch_data("source_3")
results = [data1, data2, data3]
end_time = time.time()
print(f"Total execution time: {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
if __name__ == "__main__":
main()
In this example, you fetch data from three sources using the fetch_data
function, which has a 2-second delay to simulate I/O operations. The main
function runs these tasks one after the other, so each one waits for the previous to finish before starting.
This means the total execution time is about 6 seconds (2 seconds per task). This shows the problem with blocking operations—each task delays the next. In the next step, you'll see how asynchronous programming solves this by running tasks at the same time.
Run your script with this command:
python main.py
You'll see output like this:
Fetching data from source_1...
Done fetching from source_1
Fetching data from source_2...
Done fetching from source_2
Fetching data from source_3...
Done fetching from source_3
Total execution time: 6.01 seconds
Results: ['Data from source_1', 'Data from source_2', 'Data from source_3']
Notice that the total time is about 6 seconds (3 operations × 2 seconds each). This happens because each function call blocks execution until it finishes, forcing everything to run in sequence even though these operations don't depend on each other.
Asynchronous programming fixes this problem by letting your program work on other tasks while waiting for I/O operations to finish.
Step 2 — Your first async program with asyncio
Now that you understand the problem with blocking operations, let’s solve it using Python’s asyncio
library. asyncio
allows you to write asynchronous code that runs concurrently, so tasks don’t block each other. This means you can perform I/O operations (like fetching data) without waiting for one task to finish before starting the next.
In this step, you'll rewrite the previous example to use asynchronous programming. The goal is to run all the data-fetching operations concurrently, making the program run faster by handling multiple tasks simultaneously.
Update your main.py
file with the highlighted code below:
import asyncio
import time
async def fetch_data(source):
print(f"Fetching data from {source}...")
await asyncio.sleep(2) # Simulating I/O operation
print(f"Done fetching from {source}")
return f"Data from {source}"
async def main():
start_time = time.time()
# Concurrent execution of three operations
# Create tasks with the coroutine objects directly
data1_task = asyncio.create_task(fetch_data("source_1"))
data2_task = asyncio.create_task(fetch_data("source_2"))
data3_task = asyncio.create_task(fetch_data("source_3"))
# Wait for all tasks to complete
results = await asyncio.gather(data1_task, data2_task, data3_task)
end_time = time.time()
print(f"Total execution time: {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
if __name__ == "__main__":
asyncio.run(main())
The async
keyword before a function marks it as a coroutine, which is a special type of function that can pause and resume execution.
The await
keyword pauses the execution of the coroutine until the awaited task finishes. asyncio.sleep(2)
is the async version of time.sleep(2)
and allows control to be given back to the event loop during the delay.
asyncio.create_task()
is used to schedule coroutines to run concurrently, while asyncio.gather()
waits for multiple tasks to finish and returns their results.
Finally, asyncio.run()
runs the main coroutine and manages the event loop, ensuring the proper execution of the asynchronous tasks.
Run this updated script:
python main.py
You'll see output like this:
Fetching data from source_1...
Fetching data from source_2...
Fetching data from source_3...
Done fetching from source_1
Done fetching from source_2
Done fetching from source_3
Total execution time: 2.00 seconds
Results: ['Data from source_1', 'Data from source_2', 'Data from source_3']
The most significant difference is the execution time. Instead of 6 seconds in the blocking version, the async version finishes in just about 2 seconds. This happens because all three operations run concurrently, waiting for their delays in parallel instead of one after another.
Notice how all three "Fetching data" messages appear before any "Done fetching" messages. This shows that the operations are indeed running at the same time.
Step 3 — Working with async generators and for loops
Async programming becomes even more powerful when combined with async generators and async for
loops. This allows you to efficiently generate and process a collection of data sources concurrently.
Update your example to process a collection of data sources using an async for loop:
import asyncio
import time
async def fetch_data(source):
print(f"Fetching data from {source}...")
await asyncio.sleep(2) # Simulating I/O operation
print(f"Done fetching from {source}")
return f"Data from {source}"
async def process_sources(sources):
tasks = []
for source in sources:
task = asyncio.create_task(fetch_data(source))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
async def data_generator(count):
for i in range(1, count + 1):
yield f"async_source_{i}"
async def main():
start_time = time.time()
# Generate sources asynchronously
sources = []
async for source in data_generator(5):
sources.append(source)
# Process all sources concurrently
results = await process_sources(sources)
end_time = time.time()
print(f"\nProcessed {len(results)} sources")
print(f"Total execution time: {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
if __name__ == "__main__":
asyncio.run(main())
In this example, an async generator function, data_generator
, is used to yield data source names. Instead of iterating over a regular list, you use an async for
loop to collect these sources asynchronously. This allows the source list to be generated concurrently, without blocking other tasks.
The process_sources
function handles all the data sources concurrently. It creates and manages tasks for each source using asyncio.create_task()
and waits for them to complete with asyncio.gather()
. This ensures all tasks run in parallel, improving efficiency compared to sequential execution.
The program generates the data sources asynchronously and processes them concurrently, speeding up execution and enabling you to handle multiple tasks at once.
Run the updated script:
python main.py
You'll see output similar to this:
Fetching data from async_source_1...
Fetching data from async_source_2...
Fetching data from async_source_3...
Fetching data from async_source_4...
Fetching data from async_source_5...
Done fetching from async_source_1
Done fetching from async_source_2
Done fetching from async_source_3
Done fetching from async_source_4
Done fetching from async_source_5
Total execution time: 2.00 seconds
Results: ['Data from async_source_1', 'Data from async_source_2', 'Data from async_source_3', 'Data from async_source_4', 'Data from async_source_5']
The power of this approach becomes clear as you scale up. Even though you’ve added more sources (5 instead of 3), the execution time stays around 2 seconds because all operations run concurrently.
Step 4 — Managing many async tasks with semaphores
When you have many concurrent tasks running, limiting how many tasks can run simultaneously is often necessary. This prevents overloading system resources and ensures your program runs efficiently. Asyncio provides Semaphore
objects, which are useful for controlling the maximum number of concurrent operations.
In this example, you’ll modify your code to limit the number of simultaneous tasks using a semaphore. This allows you to control concurrency, so only a certain number of tasks can run in parallel at any given time.
Remove all code in main.py
and add the following:
import asyncio
import time
import random
async def fetch_data(source, semaphore):
# The semaphore limits how many coroutines can enter this block simultaneously
async with semaphore:
print(f"Fetching data from {source}...")
# Random delay between 1 and 3 seconds to simulate variable I/O time
delay = random.uniform(1, 3)
await asyncio.sleep(delay)
print(f"Done fetching from {source} (took {delay:.2f}s)")
return f"Data from {source}"
async def main():
# Generate 10 data sources
sources = [f"limited_source_{i}" for i in range(1, 11)]
# Limit concurrency to 3 simultaneous operations
semaphore = asyncio.Semaphore(3)
start_time = time.time()
# Create tasks with the semaphore
tasks = [fetch_data(source, semaphore) for source in sources]
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"\nProcessed {len(results)} sources")
print(f"Total execution time: {end_time - start_time:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())
In this example:
- We create a
Semaphore
with a value of 3, limiting concurrency to three tasks at a time. - Each
fetch_data
coroutine acquires the semaphore before executing and releases it when done. - We simulate variable response times using
random.uniform()
andasyncio.sleep()
.
This ensures that no more than 3 operations are active at any time, regardless of how many total sources we're processing.
Run the script:
python main.py
You'll see output similar to this:
Fetching data from limited_source_1...
Fetching data from limited_source_2...
Fetching data from limited_source_3...
...
Done fetching from limited_source_8 (took 1.91s)
Done fetching from limited_source_10 (took 1.43s)
Done fetching from limited_source_9 (took 2.25s)
Processed 10 sources
Total execution time: 7.55 seconds
Notice that precisely 3 operations are running at the start, and as each one finishes, a new one begins. This pattern continues until all sources are processed. The semaphore ensures we maintain a constant level of concurrency, as illustrated in the diagram below:
Without a semaphore, all tasks would start simultaneously (top section), but with the semaphore limiting to 3 concurrent tasks (bottom section), we see a controlled execution pattern that prevents resource overload.
Final thoughts
In this guide, you’ve learned how to use Python's asyncio
library to write concurrent code efficiently. You can handle multiple tasks concurrently without blocking your program by using async functions, async generators, and semaphores. This is especially useful for I/O-bound tasks or large datasets.
Asynchronous programming significantly improves performance, and asyncio
makes it easy to implement concurrency in a single-threaded environment. Check the official Python documentation on asyncio.
Make your mark
Join the writer's program
Are you a developer and love writing and sharing your knowledge with the world? Join our guest writing program and get paid for writing amazing technical guides. We'll get them to the right readers that will appreciate them.
Write for us
Build on top of Better Stack
Write a script, app or project on top of Better Stack and share it with the world. Make a public repository and share it with us at our email.
community@betterstack.comor submit a pull request and help us build better products for everyone.
See the full list of amazing projects on github