语言基础 — asyncio 包的使用方法及典型例子

asyncio 包的使用方法及典型例子

This article is extracted from the chat log with AI. Please identify it with caution.

asyncio 是 Python 的异步 I/O 框架,用于编写并发代码,特别适合处理 I/O 密集型和高并发任务。它基于事件循环和协程构建,提供了丰富的工具和 API。

核心概念 #

1. 事件循环 (Event Loop) #

import asyncio

# 获取事件循环
loop = asyncio.get_event_loop()

# 创建异步任务
async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 运行直到完成
loop.run_until_complete(main())

# 关闭事件循环
loop.close()

2. 协程任务 (Coroutine Tasks) #

import asyncio

async def task(name, delay):
    print(f"任务 {name} 开始")
    await asyncio.sleep(delay)
    print(f"任务 {name} 完成")
    return f"{name} 的结果"

async def main():
    # 创建任务
    task1 = asyncio.create_task(task("A", 2))
    task2 = asyncio.create_task(task("B", 1))
    task3 = asyncio.create_task(task("C", 3))
    
    # 等待任务完成
    results = await asyncio.gather(task1, task2, task3)
    print(f"所有任务完成: {results}")

asyncio.run(main())

核心功能 #

3. 并发执行 #

import asyncio
import time

async def worker(name, delay):
    print(f"{name} 开始工作")
    await asyncio.sleep(delay)
    print(f"{name} 工作完成")
    return f"{name} 的结果"

async def main():
    start_time = time.time()
    
    # 使用 gather 并发执行多个任务
    results = await asyncio.gather(
        worker("工人1", 2),
        worker("工人2", 1),
        worker("工人3", 3)
    )
    
    end_time = time.time()
    print(f"所有工作完成,耗时: {end_time - start_time:.2f}秒")
    print(f"结果: {results}")

asyncio.run(main())

4. 超时控制 #

import asyncio

async def long_running_task():
    await asyncio.sleep(5)  # 模拟长时间运行的任务
    return "任务完成"

async def main():
    try:
        # 设置超时时间为3秒
        result = await asyncio.wait_for(long_running_task(), timeout=3.0)
        print(result)
    except asyncio.TimeoutError:
        print("任务超时")

asyncio.run(main())

5. 同步原语 #

import asyncio

async def worker(lock, name):
    async with lock:  # 获取锁
        print(f"{name} 获取了锁")
        await asyncio.sleep(1)  # 模拟工作
        print(f"{name} 释放了锁")

async def main():
    lock = asyncio.Lock()  # 创建锁
    
    # 创建多个任务竞争锁
    await asyncio.gather(
        worker(lock, "任务A"),
        worker(lock, "任务B"),
        worker(lock, "任务C")
    )

asyncio.run(main())

实际应用场景 #

6. 异步HTTP请求 #

import aiohttp
import asyncio

async def fetch_url(session, url):
    try:
        async with session.get(url) as response:
            return await response.text()
    except Exception as e:
        return f"错误: {e}"

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/3",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2"
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, content in zip(urls, results):
            print(f"{url}: 响应长度 {len(content)}")

asyncio.run(main())

7. 异步文件操作 #

import asyncio
import aiofiles

async def async_write_file(filename, content):
    async with aiofiles.open(filename, 'w') as f:
        await f.write(content)
    print(f"文件 {filename} 写入完成")

async def async_read_file(filename):
    async with aiofiles.open(filename, 'r') as f:
        content = await f.read()
    print(f"文件 {filename} 读取完成,内容长度: {len(content)}")
    return content

async def main():
    # 并发写入文件
    await asyncio.gather(
        async_write_file("file1.txt", "Hello, World!" * 100),
        async_write_file("file2.txt", "Python asyncio is awesome!" * 100)
    )
    
    # 并发读取文件
    contents = await asyncio.gather(
        async_read_file("file1.txt"),
        async_read_file("file2.txt")
    )
    
    print(f"读取到的内容总长度: {sum(len(c) for c in contents)}")

asyncio.run(main())

8. 异步Web服务器 #

from aiohttp import web
import asyncio

async def handle(request):
    name = request.match_params.get('name', 'World')
    text = f"Hello, {name}!"
    return web.Response(text=text)

async def init_app():
    app = web.Application()
    app.router.add_get('/', handle)
    app.router.add_get('/{name}', handle)
    return app

async def main():
    app = await init_app()
    runner = web.AppRunner(app)
    await runner.setup()
    
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()
    
    print("服务器运行在 http://localhost:8080")
    
    # 保持服务器运行
    await asyncio.Event().wait()

try:
    asyncio.run(main())
except KeyboardInterrupt:
    print("服务器已停止")

9. 与多线程/多进程结合 #

import asyncio
import concurrent.futures
import time

def cpu_intensive_task(n):
    """CPU密集型任务(模拟)"""
    print(f"开始CPU密集型任务 {n}")
    time.sleep(2)  # 模拟CPU密集型工作
    return f"CPU任务 {n} 完成"

async def main():
    # 创建线程池执行器
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        loop = asyncio.get_event_loop()
        
        # 将CPU密集型任务提交到线程池
        tasks = [
            loop.run_in_executor(executor, cpu_intensive_task, i)
            for i in range(5)
        ]
        
        # 等待所有任务完成
        results = await asyncio.gather(*tasks)
        print(f"所有CPU任务完成: {results}")

asyncio.run(main())

高级特性 #

10. 异步队列 #

import asyncio
import random

async def producer(queue, name):
    for i in range(5):
        item = f"{name}-产品{i}"
        await queue.put(item)
        print(f"{name} 生产了: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

async def consumer(queue, name):
    while True:
        item = await queue.get()
        print(f"{name} 消费了: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.7))
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=3)
    
    # 创建生产者和消费者
    producers = [
        asyncio.create_task(producer(queue, f"生产者{i}"))
        for i in range(2)
    ]
    
    consumers = [
        asyncio.create_task(consumer(queue, f"消费者{i}"))
        for i in range(3)
    ]
    
    # 等待所有生产者完成
    await asyncio.gather(*producers)
    
    # 等待队列清空
    await queue.join()
    
    # 取消消费者任务
    for c in consumers:
        c.cancel()

asyncio.run(main())

注意事项 #

  1. 使用 asyncio.run() 运行最外层的异步函数
  2. 避免在异步代码中使用阻塞操作,使用异步替代方案
  3. 合理设置并发任务数量,避免资源耗尽
  4. 使用适当的同步原语保护共享资源
  5. 注意异常处理,使用 try/except 捕获异步操作中的异常

asyncio 提供了强大的异步编程能力,特别适合处理 I/O 密集型和高并发场景。通过合理使用其提供的各种工具和 API,可以编写出高效、可扩展的异步应用程序。

本文共 1564 字,创建于 Aug 26, 2025

相关标签: Python, 并发编程, ByAI, DeepSeek