Skip to content

EX25 - Asyncio

With the concept of event loops in Python, using asyncio, we can benefit of faster execution of IO-bounded tasks.

Question 1 - Threaded HTTP server

Take the example code seen in the lecture producing tasks that will be put on the event loop. Code-snippets/280/05_tasks

What happen if the entire line 9 (await) will be removed? What will be the behavior of the program? Why?

Solution

The main event loop continues and finishes the main-coroutine. The finishing of the coroutine then finishes the program and the running task (coroutine taking 3 secs) will be cleaned up by the garbage collector tasks

Question 2 - Producer consumer example

Here is an implementation of the producer consumer problem with asyncio.

prod_cons.py
#!/usr/bin/env python3
# asyncq.py

import asyncio
import os
import random
import time

"""
Example of producer consumer problem solved concurrently with Python's asyncio
"""

# constants for the entire prg
MAX_SLEEP_TIME = 2
MAX_SLEEP_TIME_PROD = 3
ITEM_SIZE = 2
DEF_PROD = 5
DEF_CONS = 10


# coroutine that creates a new random item for the producer
# args: size (int): size of the generated random value
#       returns a random value in string
async def makeitem(size: int = ITEM_SIZE) -> str:
    return os.urandom(size).hex()


# corouinte that sleeps for a random time
# args: caller (str):   string describing the caller of the coroutine
async def randsleep(caller=None) -> None:
    i = random.randint(0, MAX_SLEEP_TIME)
    if caller:
        print(f"{caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)


# coroutine that produces elements and puts them on the asynchronous queue
# args: name (int):     ID that describes the producer
#       q (Queue):      shared asyncio.queue between the different producers and consumers
async def produce(name: int, q: asyncio.Queue) -> None:
    n = random.randint(1, MAX_SLEEP_TIME_PROD)
    for _ in range(n):
        await randsleep(caller=f"Producer {name}")
        i = await makeitem()
        t = time.perf_counter()
        await q.put((i, t))
        print(f"Producer {name} added <{i}> to queue.")


# coroutine that consumes elements and prints them on the screen
# args: name (int):     ID that describes the producer
#       q (Queue):      shared asyncio.queue between the different producers and consumers
async def consume(name: int, q: asyncio.Queue) -> None:
    while True:
        await randsleep(caller=f"Consumer {name}")
        i, t = await q.get()
        now = time.perf_counter()
        print(f"Consumer {name} got element <{i}>"
              f" in {now-t:0.5f} seconds.")
        q.task_done()


# main coroutine that creates nprod producer and ncon consumer tasks
# args: nprod (int):    number of producer tasks
#       ncon (int):     number of consumer tasks
async def main(nprod: int, ncon: int) -> None:
    shared_item_queue = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, shared_item_queue)) for n in range(nprod)]
    consumers = [asyncio.create_task(consume(n, shared_item_queue)) for n in range(ncon)]
    await asyncio.gather(*producers)    # wait until all producer completed their tasks
    await shared_item_queue.join()      # Wait until all elements are consumed --> implicitly awaits consumers
    for c in consumers:                 # Here the consumers will be destroyed, because they work in an endless loop
        c.cancel()


# main entry point
if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--nprod", type=int, default=DEF_PROD)
    parser.add_argument("-c", "--ncon", type=int, default=DEF_CONS)
    ns = parser.parse_args()
    start = time.perf_counter()
    asyncio.run(main(**ns.__dict__))
    elapsed = time.perf_counter() - start
    print(f"Program completed in {elapsed:0.5f} seconds.")

Play with this script and try to understand how it is working.