跳转至

协程

gather 并发运行

并发 运行序列中的 可等待对象。

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        await asyncio.sleep(0.1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    c = {"A":1, "B":2, "C":3, "D":4, "E":5, "F":6}

    L = await asyncio.gather(
        # 使用 * tasks 是确保每个协程对象都作为单独的参数传递的一种方式。
        *[factorial(name, number) for name, number in c.items()]
    )
    # 等价于下面的写法
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())

可等待对象

print(await factorial("A",4))  # 输出24 

run 主入口

asyncio.run(main())

timeout

用于限制等待某个操作所耗费时间的 异步上下文管理器
超时写法1

async def main():
    try:
        async with asyncio.timeout(1):
            L = await asyncio.gather(
                factorial("A", 2),
                factorial("B", 3),
                factorial("C", 4),
            )
            print(L)
    except asyncio.TimeoutError:
        print("Timed out!")
超时写法2
1
2
3
4
try:
    await asyncio.wait_for(factorial("A", 8), timeout=1.0)
except TimeoutError:
    print('timeout!')

锁 Lock

下面运行完大致需要7秒

lock = asyncio.Lock()
async def main():
    print(f"st = {datetime.datetime.now()}")
    async with lock:
        await asyncio.sleep(2)
        print('hello')
    await asyncio.sleep(1)
    print(f"end = {datetime.datetime.now()}")
async def runs():

    await asyncio.gather(main(), main(),  main())

asyncio.run(runs())

事件继续 wait

async def waiter(event):
    print('waiting for it ...')
    await event.wait() # 等待事件
    await asyncio.sleep(1)
    print('... got it!')

async def main():
    event = asyncio.Event()
    waiter_task = asyncio.create_task(waiter(event))
    await asyncio.sleep(3)

    event.set() # 唤醒等待任务
    await waiter_task  # 等待任务结束 保证任务不会提前结束  必须在事件发生后才

    print('done')

asyncio.run(main())

Semaphore,Condition( Event + Lock )

import asyncio
lock = asyncio.Lock()
condition = asyncio.Condition(lock)

# 当 acquire() 发现其值为零时,它将保持阻塞直到有某个任务调用了 release()。
sem = asyncio.Semaphore(2) # 次数限制

async def producer(condition, items):
    for item in items:
        await asyncio.sleep(0.2)

        async with condition:
            await sem.acquire()# 结合上面的设置,只能生产两个产品
            print(f"生产 {item}")
            condition.notify() # 没有这个消费者不知道生产出来了,不消费
            print(f"完成后通知 {item}")

async def consumer(condition):
    index = 0

    while True:
        await condition.acquire()
        index+=1
        print(f"🐱{index}提出订单")

        await condition.wait() # 等待产品生产结束的通知
        print("🐱拿到物品")

        print(f"🐱结束订单{index}")
        print("="*20)
        condition.release() # 没有这个,生产一轮结束后就不会再生产了
async def main():
    producer_task = asyncio.create_task(producer(condition, ["猫", "狗", "猪","老虎","狮子","大象"])) # 生产者
    consumer_task = asyncio.create_task(consumer(condition)) # 消费者

    await asyncio.gather(producer_task, consumer_task)

# 运行主程序
asyncio.run(main())