EX25 - Asyncio
With the concept of event loops in Python, using asyncio, we can benefit of faster execution of IO-bounded tasks.
Question 1 - Threaded HTTP server
Take the example code seen in the lecture producing tasks that will be put on the event loop. Code-snippets/280/05_tasks
What happen if the entire line 9 (await) will be removed? What will be the behavior of the program? Why?
Solution
The main event loop continues and finishes the main-coroutine. The finishing of the coroutine then finishes the program and the running task (coroutine taking 3 secs) will be cleaned up by the garbage collector tasks
Question 2 - Producer consumer example
Here is an implementation of the producer consumer problem with asyncio.
prod_cons.py
#!/usr/bin/env python3
# asyncq.py
import asyncio
import os
import random
import time
"""
Example of producer consumer problem solved concurrently with Python's asyncio
"""
# constants for the entire prg
MAX_SLEEP_TIME = 2
MAX_SLEEP_TIME_PROD = 3
ITEM_SIZE = 2
DEF_PROD = 5
DEF_CONS = 10
# coroutine that creates a new random item for the producer
# args: size (int): size of the generated random value
# returns a random value in string
async def makeitem(size: int = ITEM_SIZE) -> str:
return os.urandom(size).hex()
# corouinte that sleeps for a random time
# args: caller (str): string describing the caller of the coroutine
async def randsleep(caller=None) -> None:
i = random.randint(0, MAX_SLEEP_TIME)
if caller:
print(f"{caller} sleeping for {i} seconds.")
await asyncio.sleep(i)
# coroutine that produces elements and puts them on the asynchronous queue
# args: name (int): ID that describes the producer
# q (Queue): shared asyncio.queue between the different producers and consumers
async def produce(name: int, q: asyncio.Queue) -> None:
n = random.randint(1, MAX_SLEEP_TIME_PROD)
for _ in range(n):
await randsleep(caller=f"Producer {name}")
i = await makeitem()
t = time.perf_counter()
await q.put((i, t))
print(f"Producer {name} added <{i}> to queue.")
# coroutine that consumes elements and prints them on the screen
# args: name (int): ID that describes the producer
# q (Queue): shared asyncio.queue between the different producers and consumers
async def consume(name: int, q: asyncio.Queue) -> None:
while True:
await randsleep(caller=f"Consumer {name}")
i, t = await q.get()
now = time.perf_counter()
print(f"Consumer {name} got element <{i}>"
f" in {now-t:0.5f} seconds.")
q.task_done()
# main coroutine that creates nprod producer and ncon consumer tasks
# args: nprod (int): number of producer tasks
# ncon (int): number of consumer tasks
async def main(nprod: int, ncon: int) -> None:
shared_item_queue = asyncio.Queue()
producers = [asyncio.create_task(produce(n, shared_item_queue)) for n in range(nprod)]
consumers = [asyncio.create_task(consume(n, shared_item_queue)) for n in range(ncon)]
await asyncio.gather(*producers) # wait until all producer completed their tasks
await shared_item_queue.join() # Wait until all elements are consumed --> implicitly awaits consumers
for c in consumers: # Here the consumers will be destroyed, because they work in an endless loop
c.cancel()
# main entry point
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--nprod", type=int, default=DEF_PROD)
parser.add_argument("-c", "--ncon", type=int, default=DEF_CONS)
ns = parser.parse_args()
start = time.perf_counter()
asyncio.run(main(**ns.__dict__))
elapsed = time.perf_counter() - start
print(f"Program completed in {elapsed:0.5f} seconds.")
Play with this script and try to understand how it is working.