producer consumer 1

import asyncio
 
async def producer(q):
    for i in range(5):
        print(f"Producer putting {i}")
        await q.put(i)
        await asyncio.sleep(0.1) # Simulate some work
 
async def consumer(q):
    while True:
        item = await q.get()
        print(f"Consumer got {item}")
        q.task_done()
        await asyncio.sleep(0.5) # Simulate some work
 
async def main():
    # Create a queue with a maximum size of 2
    q = asyncio.Queue(maxsize=2) 
 
    producer_task = asyncio.create_task(producer(q))
    consumer_task = asyncio.create_task(consumer(q))
 
    await producer_task ## This is important, this means, when this line is done, producer should have returned. Then when 'q' become empty, we can claim all items are done.
    await q.join() # Wait until all items in the queue have been processed
    consumer_task.cancel() # Cancel the consumer as it's an infinite loop
    try:
        await consumer_task
    except asyncio.CancelledError:
        print("Consumer cancelled.")
 
if __name__ == "__main__":
    asyncio.run(main())

producer consumer 2

  • asyncio.Queue
import asyncio
import random
 
# An object to signal a shutdown
SHUTDOWN = object()
 
async def producer(queue):
    """Producer that runs until a shutdown is signaled."""
    while True:
        item = random.uniform(0.1, 1.0)
        # Simulate some I/O for production
        await asyncio.sleep(item / 2)
        
        await queue.put(item) ### Because the queue can be full, so we should have 'await' here.
        print(f'Produced {item:.2f}')
        
        # Randomly decide to take a break or stop
        if random.random() < 0.1:
            print("Producer taking a longer break...")
            await asyncio.sleep(2)
 
 
async def consumer(name, queue):
    """Consumer that runs until it sees the shutdown signal."""
    while True:
        item = await queue.get() ### Because the queue can be empty, so we need to have 'await' here.
        if item is SHUTDOWN:
            # Put the shutdown signal back in the queue for other consumers
            await queue.put(SHUTDOWN)
            break # Exit the loop
 
        # "Consume" the work item
        await asyncio.sleep(item)
        queue.task_done()
        print(f'{name} consumed {item:.2f}')
 
async def main():
    """Main function to run the system for a fixed duration."""
    queue = asyncio.Queue() ### Note that you can specify max size at init.
    num_consumers = 3
 
    # Start consumers
    consumers = [
        asyncio.create_task(consumer(f'consumer-{i}', queue)) 
        for i in range(num_consumers)
    ]
    
    # Start the producer
    producer_task = asyncio.create_task(producer(queue))
 
    # Let the system run for 10 seconds
    print("System running for 10 seconds...")
    await asyncio.sleep(10)
 
    # --- Graceful Shutdown ---
    print("\n--- Initiating shutdown ---")
    
    # 1. Cancel the producer task. It will stop producing new items.
    producer_task.cancel()
    try:
        await producer_task
    except asyncio.CancelledError:
        print("Producer has been shut down.")
 
    # 2. Wait for the queue to be empty. Consumers will finish remaining items.
    await queue.join()
    print("All items in queue have been processed.")
 
    # 3. Signal consumers to shut down by putting the SHUTDOWN object in the queue.
    # We only need to put it once; the first consumer to get it will put it back
    # for the next consumer.
    await queue.put(SHUTDOWN)
    
    # 4. Wait for all consumers to finish.
    await asyncio.gather(*consumers)
    
    print("All consumer tasks have been shut down. System halted.")
 
 
if __name__ == '__main__':
    asyncio.run(main())

Use with concurrent.futures.ThreadPoolExecutor

Note the loop = asyncio.get_running_loop() and

  • future = loop.run_in_executor(thread_pool, <slow-function>, "task name") pattern.
  • result = await future
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
 
def blocking_io_task(name):
    """A synchronous, blocking I/O operation."""
    print(f"[{name}] Starting blocking I/O...")
    time.sleep(2) # Simulate a blocking call (e.g., reading a large file, network request)
    print(f"[{name}] Finished blocking I/O.")
    return f"Result from {name}"
 
async def main():
    print("Main: Starting...")
 
    # Get the current event loop
    loop = asyncio.get_running_loop()
 
    # Create a custom ThreadPoolExecutor (optional, None uses default)
    # You might want to do this to limit the number of threads or manage their lifecycle.
    with ThreadPoolExecutor(max_workers=3) as thread_pool:
        # Offload blocking_io_task to the thread pool
        # This returns an awaitable Future
        future1 = loop.run_in_executor(thread_pool, blocking_io_task, "Task A")
        future2 = loop.run_in_executor(thread_pool, blocking_io_task, "Task B")
 
        # Other async tasks can run concurrently while blocking_io_task is executing
        print("Main: Doing other async work...")
        await asyncio.sleep(0.5)
        print("Main: Still doing other async work...")
 
        # Await the results from the blocking tasks
        result1 = await future1
        result2 = await future2
 
        print(f"Main: Received result: {result1}")
        print(f"Main: Received result: {result2}")
 
    print("Main: All done.")
 
if __name__ == "__main__":
    asyncio.run(main())