asyncio
是 Python 的异步 I/O 框架,用于编写并发代码,特别适合处理 I/O 密集型和高并发任务。它基于事件循环和协程构建,提供了丰富的工具和 API。
核心概念 #
1. 事件循环 (Event Loop) #
import asyncio
# 获取事件循环
loop = asyncio.get_event_loop()
# 创建异步任务
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# 运行直到完成
loop.run_until_complete(main())
# 关闭事件循环
loop.close()
2. 协程任务 (Coroutine Tasks) #
import asyncio
async def task(name, delay):
print(f"任务 {name} 开始")
await asyncio.sleep(delay)
print(f"任务 {name} 完成")
return f"{name} 的结果"
async def main():
# 创建任务
task1 = asyncio.create_task(task("A", 2))
task2 = asyncio.create_task(task("B", 1))
task3 = asyncio.create_task(task("C", 3))
# 等待任务完成
results = await asyncio.gather(task1, task2, task3)
print(f"所有任务完成: {results}")
asyncio.run(main())
核心功能 #
3. 并发执行 #
import asyncio
import time
async def worker(name, delay):
print(f"{name} 开始工作")
await asyncio.sleep(delay)
print(f"{name} 工作完成")
return f"{name} 的结果"
async def main():
start_time = time.time()
# 使用 gather 并发执行多个任务
results = await asyncio.gather(
worker("工人1", 2),
worker("工人2", 1),
worker("工人3", 3)
)
end_time = time.time()
print(f"所有工作完成,耗时: {end_time - start_time:.2f}秒")
print(f"结果: {results}")
asyncio.run(main())
4. 超时控制 #
import asyncio
async def long_running_task():
await asyncio.sleep(5) # 模拟长时间运行的任务
return "任务完成"
async def main():
try:
# 设置超时时间为3秒
result = await asyncio.wait_for(long_running_task(), timeout=3.0)
print(result)
except asyncio.TimeoutError:
print("任务超时")
asyncio.run(main())
5. 同步原语 #
import asyncio
async def worker(lock, name):
async with lock: # 获取锁
print(f"{name} 获取了锁")
await asyncio.sleep(1) # 模拟工作
print(f"{name} 释放了锁")
async def main():
lock = asyncio.Lock() # 创建锁
# 创建多个任务竞争锁
await asyncio.gather(
worker(lock, "任务A"),
worker(lock, "任务B"),
worker(lock, "任务C")
)
asyncio.run(main())
实际应用场景 #
6. 异步HTTP请求 #
import aiohttp
import asyncio
async def fetch_url(session, url):
try:
async with session.get(url) as response:
return await response.text()
except Exception as e:
return f"错误: {e}"
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, content in zip(urls, results):
print(f"{url}: 响应长度 {len(content)}")
asyncio.run(main())
7. 异步文件操作 #
import asyncio
import aiofiles
async def async_write_file(filename, content):
async with aiofiles.open(filename, 'w') as f:
await f.write(content)
print(f"文件 {filename} 写入完成")
async def async_read_file(filename):
async with aiofiles.open(filename, 'r') as f:
content = await f.read()
print(f"文件 {filename} 读取完成,内容长度: {len(content)}")
return content
async def main():
# 并发写入文件
await asyncio.gather(
async_write_file("file1.txt", "Hello, World!" * 100),
async_write_file("file2.txt", "Python asyncio is awesome!" * 100)
)
# 并发读取文件
contents = await asyncio.gather(
async_read_file("file1.txt"),
async_read_file("file2.txt")
)
print(f"读取到的内容总长度: {sum(len(c) for c in contents)}")
asyncio.run(main())
8. 异步Web服务器 #
from aiohttp import web
import asyncio
async def handle(request):
name = request.match_params.get('name', 'World')
text = f"Hello, {name}!"
return web.Response(text=text)
async def init_app():
app = web.Application()
app.router.add_get('/', handle)
app.router.add_get('/{name}', handle)
return app
async def main():
app = await init_app()
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
print("服务器运行在 http://localhost:8080")
# 保持服务器运行
await asyncio.Event().wait()
try:
asyncio.run(main())
except KeyboardInterrupt:
print("服务器已停止")
9. 与多线程/多进程结合 #
import asyncio
import concurrent.futures
import time
def cpu_intensive_task(n):
"""CPU密集型任务(模拟)"""
print(f"开始CPU密集型任务 {n}")
time.sleep(2) # 模拟CPU密集型工作
return f"CPU任务 {n} 完成"
async def main():
# 创建线程池执行器
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
loop = asyncio.get_event_loop()
# 将CPU密集型任务提交到线程池
tasks = [
loop.run_in_executor(executor, cpu_intensive_task, i)
for i in range(5)
]
# 等待所有任务完成
results = await asyncio.gather(*tasks)
print(f"所有CPU任务完成: {results}")
asyncio.run(main())
高级特性 #
10. 异步队列 #
import asyncio
import random
async def producer(queue, name):
for i in range(5):
item = f"{name}-产品{i}"
await queue.put(item)
print(f"{name} 生产了: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue, name):
while True:
item = await queue.get()
print(f"{name} 消费了: {item}")
await asyncio.sleep(random.uniform(0.2, 0.7))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=3)
# 创建生产者和消费者
producers = [
asyncio.create_task(producer(queue, f"生产者{i}"))
for i in range(2)
]
consumers = [
asyncio.create_task(consumer(queue, f"消费者{i}"))
for i in range(3)
]
# 等待所有生产者完成
await asyncio.gather(*producers)
# 等待队列清空
await queue.join()
# 取消消费者任务
for c in consumers:
c.cancel()
asyncio.run(main())
注意事项 #
- 使用
asyncio.run()
运行最外层的异步函数 - 避免在异步代码中使用阻塞操作,使用异步替代方案
- 合理设置并发任务数量,避免资源耗尽
- 使用适当的同步原语保护共享资源
- 注意异常处理,使用
try/except
捕获异步操作中的异常
asyncio
提供了强大的异步编程能力,特别适合处理 I/O 密集型和高并发场景。通过合理使用其提供的各种工具和 API,可以编写出高效、可扩展的异步应用程序。