producer consumer 1
import asyncio
async def producer(q):
for i in range(5):
print(f"Producer putting {i}")
await q.put(i)
await asyncio.sleep(0.1) # Simulate some work
async def consumer(q):
while True:
item = await q.get()
print(f"Consumer got {item}")
q.task_done()
await asyncio.sleep(0.5) # Simulate some work
async def main():
# Create a queue with a maximum size of 2
q = asyncio.Queue(maxsize=2)
producer_task = asyncio.create_task(producer(q))
consumer_task = asyncio.create_task(consumer(q))
await producer_task ## This is important, this means, when this line is done, producer should have returned. Then when 'q' become empty, we can claim all items are done.
await q.join() # Wait until all items in the queue have been processed
consumer_task.cancel() # Cancel the consumer as it's an infinite loop
try:
await consumer_task
except asyncio.CancelledError:
print("Consumer cancelled.")
if __name__ == "__main__":
asyncio.run(main())
producer consumer 2
asyncio.Queue
import asyncio
import random
# An object to signal a shutdown
SHUTDOWN = object()
async def producer(queue):
"""Producer that runs until a shutdown is signaled."""
while True:
item = random.uniform(0.1, 1.0)
# Simulate some I/O for production
await asyncio.sleep(item / 2)
await queue.put(item) ### Because the queue can be full, so we should have 'await' here.
print(f'Produced {item:.2f}')
# Randomly decide to take a break or stop
if random.random() < 0.1:
print("Producer taking a longer break...")
await asyncio.sleep(2)
async def consumer(name, queue):
"""Consumer that runs until it sees the shutdown signal."""
while True:
item = await queue.get() ### Because the queue can be empty, so we need to have 'await' here.
if item is SHUTDOWN:
# Put the shutdown signal back in the queue for other consumers
await queue.put(SHUTDOWN)
break # Exit the loop
# "Consume" the work item
await asyncio.sleep(item)
queue.task_done()
print(f'{name} consumed {item:.2f}')
async def main():
"""Main function to run the system for a fixed duration."""
queue = asyncio.Queue() ### Note that you can specify max size at init.
num_consumers = 3
# Start consumers
consumers = [
asyncio.create_task(consumer(f'consumer-{i}', queue))
for i in range(num_consumers)
]
# Start the producer
producer_task = asyncio.create_task(producer(queue))
# Let the system run for 10 seconds
print("System running for 10 seconds...")
await asyncio.sleep(10)
# --- Graceful Shutdown ---
print("\n--- Initiating shutdown ---")
# 1. Cancel the producer task. It will stop producing new items.
producer_task.cancel()
try:
await producer_task
except asyncio.CancelledError:
print("Producer has been shut down.")
# 2. Wait for the queue to be empty. Consumers will finish remaining items.
await queue.join()
print("All items in queue have been processed.")
# 3. Signal consumers to shut down by putting the SHUTDOWN object in the queue.
# We only need to put it once; the first consumer to get it will put it back
# for the next consumer.
await queue.put(SHUTDOWN)
# 4. Wait for all consumers to finish.
await asyncio.gather(*consumers)
print("All consumer tasks have been shut down. System halted.")
if __name__ == '__main__':
asyncio.run(main())
Use with concurrent.futures.ThreadPoolExecutor
Note the loop = asyncio.get_running_loop() and
future = loop.run_in_executor(thread_pool, <slow-function>, "task name")
pattern.result = await future
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_io_task(name):
"""A synchronous, blocking I/O operation."""
print(f"[{name}] Starting blocking I/O...")
time.sleep(2) # Simulate a blocking call (e.g., reading a large file, network request)
print(f"[{name}] Finished blocking I/O.")
return f"Result from {name}"
async def main():
print("Main: Starting...")
# Get the current event loop
loop = asyncio.get_running_loop()
# Create a custom ThreadPoolExecutor (optional, None uses default)
# You might want to do this to limit the number of threads or manage their lifecycle.
with ThreadPoolExecutor(max_workers=3) as thread_pool:
# Offload blocking_io_task to the thread pool
# This returns an awaitable Future
future1 = loop.run_in_executor(thread_pool, blocking_io_task, "Task A")
future2 = loop.run_in_executor(thread_pool, blocking_io_task, "Task B")
# Other async tasks can run concurrently while blocking_io_task is executing
print("Main: Doing other async work...")
await asyncio.sleep(0.5)
print("Main: Still doing other async work...")
# Await the results from the blocking tasks
result1 = await future1
result2 = await future2
print(f"Main: Received result: {result1}")
print(f"Main: Received result: {result2}")
print("Main: All done.")
if __name__ == "__main__":
asyncio.run(main())