Back to Scaling Python Applications guides

Practical Guide to Asynchronous Programming in Python

Stanley Ulili
Updated on April 15, 2025

Python gives you powerful tools for asynchronous programming. This lets you write concurrent code without dealing with the headaches of traditional threading.

The asyncio library, added in Python 3.4 and improved in later versions, offers a clean way to write single-threaded concurrent code using coroutines, event loops, and Future objects.

In this guide, I'll show you how to create and use effective asynchronous patterns in your Python applications.

Prerequisites

Before you start, make sure you have a recent version of Python (3.7 or higher) installed on your computer.

Step 1 — Understanding the basics of asynchronous programming

For the best experience, create a new Python project to try out these concepts as you read.

Start by making a new directory and setting up a virtual environment:

 
mkdir python-async && cd python-async
 
python3 -m venv venv

Activate the virtual environment:

 
source venv/bin/activate

Let's start with a simple example that shows the main problem asynchronous programming solves - operations that block execution. Create a file named main.py with this code:

main.py
import time

def fetch_data(source):
    print(f"Fetching data from {source}...")
    time.sleep(2)  # Simulating I/O operation
    print(f"Done fetching from {source}")
    return f"Data from {source}"

def main():
    start_time = time.time()

    # Sequential execution of three operations
    data1 = fetch_data("source_1")
    data2 = fetch_data("source_2")
    data3 = fetch_data("source_3")

    results = [data1, data2, data3]

    end_time = time.time()
    print(f"Total execution time: {end_time - start_time:.2f} seconds")
    print(f"Results: {results}")

if __name__ == "__main__":
    main()

In this example, you fetch data from three sources using the fetch_data function, which has a 2-second delay to simulate I/O operations. The main function runs these tasks one after the other, so each one waits for the previous to finish before starting.

This means the total execution time is about 6 seconds (2 seconds per task). This shows the problem with blocking operations—each task delays the next. In the next step, you'll see how asynchronous programming solves this by running tasks at the same time.

Run your script with this command:

 
python main.py

You'll see output like this:

Output
Fetching data from source_1...
Done fetching from source_1
Fetching data from source_2...
Done fetching from source_2
Fetching data from source_3...
Done fetching from source_3
Total execution time: 6.01 seconds
Results: ['Data from source_1', 'Data from source_2', 'Data from source_3']

Notice that the total time is about 6 seconds (3 operations × 2 seconds each). This happens because each function call blocks execution until it finishes, forcing everything to run in sequence even though these operations don't depend on each other.

Asynchronous programming fixes this problem by letting your program work on other tasks while waiting for I/O operations to finish.

Step 2 — Your first async program with asyncio

Now that you understand the problem with blocking operations, let’s solve it using Python’s asyncio library. asyncio allows you to write asynchronous code that runs concurrently, so tasks don’t block each other. This means you can perform I/O operations (like fetching data) without waiting for one task to finish before starting the next.

In this step, you'll rewrite the previous example to use asynchronous programming. The goal is to run all the data-fetching operations concurrently, making the program run faster by handling multiple tasks simultaneously.

Update your main.py file with the highlighted code below:

main.py
import asyncio
import time
async def fetch_data(source):
print(f"Fetching data from {source}...")
await asyncio.sleep(2) # Simulating I/O operation
print(f"Done fetching from {source}")
return f"Data from {source}"
async def main():
start_time = time.time() # Concurrent execution of three operations
# Create tasks with the coroutine objects directly
data1_task = asyncio.create_task(fetch_data("source_1"))
data2_task = asyncio.create_task(fetch_data("source_2"))
data3_task = asyncio.create_task(fetch_data("source_3"))
# Wait for all tasks to complete
results = await asyncio.gather(data1_task, data2_task, data3_task)
end_time = time.time() print(f"Total execution time: {end_time - start_time:.2f} seconds") print(f"Results: {results}") if __name__ == "__main__":
asyncio.run(main())

The async keyword before a function marks it as a coroutine, which is a special type of function that can pause and resume execution.

The await keyword pauses the execution of the coroutine until the awaited task finishes. asyncio.sleep(2) is the async version of time.sleep(2) and allows control to be given back to the event loop during the delay.

asyncio.create_task() is used to schedule coroutines to run concurrently, while asyncio.gather() waits for multiple tasks to finish and returns their results.

Finally, asyncio.run() runs the main coroutine and manages the event loop, ensuring the proper execution of the asynchronous tasks.

Run this updated script:

 
python main.py

You'll see output like this:

Output
Fetching data from source_1...
Fetching data from source_2...
Fetching data from source_3...
Done fetching from source_1
Done fetching from source_2
Done fetching from source_3
Total execution time: 2.00 seconds
Results: ['Data from source_1', 'Data from source_2', 'Data from source_3']

The most significant difference is the execution time. Instead of 6 seconds in the blocking version, the async version finishes in just about 2 seconds. This happens because all three operations run concurrently, waiting for their delays in parallel instead of one after another.

Gantt chart comparing blocking vs asynchronous execution

Notice how all three "Fetching data" messages appear before any "Done fetching" messages. This shows that the operations are indeed running at the same time.

Step 3 — Working with async generators and for loops

Async programming becomes even more powerful when combined with async generators and async for loops. This allows you to efficiently generate and process a collection of data sources concurrently.

Update your example to process a collection of data sources using an async for loop:

main.py
import asyncio
import time

async def fetch_data(source):
    print(f"Fetching data from {source}...")
    await asyncio.sleep(2)  # Simulating I/O operation
    print(f"Done fetching from {source}")
    return f"Data from {source}"

async def process_sources(sources):
tasks = []
for source in sources:
task = asyncio.create_task(fetch_data(source))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
async def data_generator(count):
for i in range(1, count + 1):
yield f"async_source_{i}"
async def main(): start_time = time.time() # Generate sources asynchronously
sources = []
async for source in data_generator(5):
sources.append(source)
# Process all sources concurrently
results = await process_sources(sources)
end_time = time.time() print(f"\nProcessed {len(results)} sources") print(f"Total execution time: {end_time - start_time:.2f} seconds") print(f"Results: {results}") if __name__ == "__main__": asyncio.run(main())

In this example, an async generator function, data_generator, is used to yield data source names. Instead of iterating over a regular list, you use an async for loop to collect these sources asynchronously. This allows the source list to be generated concurrently, without blocking other tasks.

The process_sources function handles all the data sources concurrently. It creates and manages tasks for each source using asyncio.create_task() and waits for them to complete with asyncio.gather(). This ensures all tasks run in parallel, improving efficiency compared to sequential execution.

The program generates the data sources asynchronously and processes them concurrently, speeding up execution and enabling you to handle multiple tasks at once.

Run the updated script:

 
python main.py

You'll see output similar to this:

Output
Fetching data from async_source_1...
Fetching data from async_source_2...
Fetching data from async_source_3...
Fetching data from async_source_4...
Fetching data from async_source_5...
Done fetching from async_source_1
Done fetching from async_source_2
Done fetching from async_source_3
Done fetching from async_source_4
Done fetching from async_source_5
Total execution time: 2.00 seconds
Results: ['Data from async_source_1', 'Data from async_source_2', 'Data from async_source_3', 'Data from async_source_4', 'Data from async_source_5']

The power of this approach becomes clear as you scale up. Even though you’ve added more sources (5 instead of 3), the execution time stays around 2 seconds because all operations run concurrently.

Step 4 — Managing many async tasks with semaphores

When you have many concurrent tasks running, limiting how many tasks can run simultaneously is often necessary. This prevents overloading system resources and ensures your program runs efficiently. Asyncio provides Semaphore objects, which are useful for controlling the maximum number of concurrent operations.

In this example, you’ll modify your code to limit the number of simultaneous tasks using a semaphore. This allows you to control concurrency, so only a certain number of tasks can run in parallel at any given time.

Remove all code in main.py and add the following:

main.py
import asyncio
import time
import random

async def fetch_data(source, semaphore):
    # The semaphore limits how many coroutines can enter this block simultaneously
    async with semaphore:
        print(f"Fetching data from {source}...")
        # Random delay between 1 and 3 seconds to simulate variable I/O time
        delay = random.uniform(1, 3)
        await asyncio.sleep(delay)
        print(f"Done fetching from {source} (took {delay:.2f}s)")
        return f"Data from {source}"

async def main():
    # Generate 10 data sources
    sources = [f"limited_source_{i}" for i in range(1, 11)]

    # Limit concurrency to 3 simultaneous operations
    semaphore = asyncio.Semaphore(3)    
    start_time = time.time()

    # Create tasks with the semaphore
    tasks = [fetch_data(source, semaphore) for source in sources]
    results = await asyncio.gather(*tasks)

    end_time = time.time()
    print(f"\nProcessed {len(results)} sources")
    print(f"Total execution time: {end_time - start_time:.2f} seconds")

if __name__ == "__main__":
    asyncio.run(main())

In this example:

  1. We create a Semaphore with a value of 3, limiting concurrency to three tasks at a time.
  2. Each fetch_data coroutine acquires the semaphore before executing and releases it when done.
  3. We simulate variable response times using random.uniform() and asyncio.sleep().

This ensures that no more than 3 operations are active at any time, regardless of how many total sources we're processing.

Run the script:

 
python main.py

You'll see output similar to this:

Output
Fetching data from limited_source_1...
Fetching data from limited_source_2...
Fetching data from limited_source_3...
...
Done fetching from limited_source_8 (took 1.91s)
Done fetching from limited_source_10 (took 1.43s)
Done fetching from limited_source_9 (took 2.25s)

Processed 10 sources
Total execution time: 7.55 seconds

Notice that precisely 3 operations are running at the start, and as each one finishes, a new one begins. This pattern continues until all sources are processed. The semaphore ensures we maintain a constant level of concurrency, as illustrated in the diagram below:

Semaphore limiting concurrency to 3 tasks at a time. The top shows unlimited concurrent execution, while the bottom shows controlled execution with the semaphore.

Without a semaphore, all tasks would start simultaneously (top section), but with the semaphore limiting to 3 concurrent tasks (bottom section), we see a controlled execution pattern that prevents resource overload.

Final thoughts

In this guide, you’ve learned how to use Python's asyncio library to write concurrent code efficiently. You can handle multiple tasks concurrently without blocking your program by using async functions, async generators, and semaphores. This is especially useful for I/O-bound tasks or large datasets.

Asynchronous programming significantly improves performance, and asyncio makes it easy to implement concurrency in a single-threaded environment. Check the official Python documentation on asyncio.

Author's avatar
Article by
Stanley Ulili
Stanley Ulili is a technical educator at Better Stack based in Malawi. He specializes in backend development and has freelanced for platforms like DigitalOcean, LogRocket, and AppSignal. Stanley is passionate about making complex topics accessible to developers.
Got an article suggestion? Let us know
Next article
Getting Started with AnyIO in Python
AnyIO simplifies asynchronous programming in Python by providing a unified interface for different async backends like asyncio and trio. This guide covers key features such as task concurrency, error handling, and timeouts, helping you build efficient, non-blocking applications with ease.
Licensed under CC-BY-NC-SA

This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.

Make your mark

Join the writer's program

Are you a developer and love writing and sharing your knowledge with the world? Join our guest writing program and get paid for writing amazing technical guides. We'll get them to the right readers that will appreciate them.

Write for us
Writer of the month
Marin Bezhanov
Marin is a software engineer and architect with a broad range of experience working...
Build on top of Better Stack

Write a script, app or project on top of Better Stack and share it with the world. Make a public repository and share it with us at our email.

community@betterstack.com

or submit a pull request and help us build better products for everyone.

See the full list of amazing projects on github