元素码农
基础
UML建模
数据结构
算法
设计模式
网络
TCP/IP协议
HTTPS安全机制
WebSocket实时通信
数据库
sqlite
postgresql
clickhouse
后端
rust
go
java
php
mysql
redis
mongodb
etcd
nats
zincsearch
前端
浏览器
javascript
typescript
vue3
react
游戏
unity
unreal
C++
C#
Lua
App
android
ios
flutter
react-native
安全
Web安全
测试
软件测试
自动化测试 - Playwright
人工智能
Python
langChain
langGraph
运维
linux
docker
工具
git
svn
🌞
🌙
目录
▶
Python基础语法
Python环境安装与配置
第一个Python程序
变量与数据类型
字面量详解
基本运算符
流程控制语句
包管理与虚拟环境
▶
Python数据结构
列表(List)详解
元组(Tuple)使用指南
字典(Dict)完全解析
集合(Set)操作大全
▶
函数与模块
函数定义与参数传递
Lambda表达式
模块导入与使用
常用内置函数
▶
面向对象编程
类与对象
继承与多态
魔术方法解析
装饰器原理与应用
▶
Python类型系统
类型注解(Type Hints)
Pydantic基础
Pydantic高级特性
typing模块基础
泛型类型详解
泛型类详解
Callable类型详解
Awaitable类型详解
类型变量与约束
类型别名与Protocol
TypedDict详解
Annotated类型
Reducer类型
类型检查工具使用
类型注解最佳实践
▶
关键字
pass关键字
raise关键字
global关键字
nonlocal关键字
yield关键字
assert关键字
with关键字
async/await关键字
▶
包管理
pip包管理基础
虚拟环境管理
包管理工具对比
requirements.txt规范
依赖管理与requirements.txt
setup.py配置说明
Poetry项目管理工具
Conda包管理系统
打包与发布Python包
PyPI发布流程
私有PyPI仓库
▶
Python高级特性
迭代器与生成器
多线程编程
协程与异步IO
元编程入门
反射机制详解
描述符协议
上下文管理器协议
垃圾回收机制
内存管理深度解析
性能优化指南
▶
文件与异常处理
文件读写操作
JSON数据解析
异常处理机制
上下文管理器
发布时间:
2025-03-29 10:20
↑
☰
# Python协程与异步IO详解 本文将详细介绍Python中的协程(Coroutine)和异步IO编程,包括asyncio库的使用、异步编程模式以及实际应用场景,帮助你掌握这一强大的编程特性。 ## 协程基础 ### 什么是协程 协程是一种用户态的轻量级线程,它允许在一个线程中实现多任务协作。协程通过yield和async/await语法实现任务的挂起和恢复,从而实现非阻塞的并发编程。 ### 基本概念 ```python import asyncio async def hello(): print('Hello') await asyncio.sleep(1) # 模拟IO操作 print('World') # 运行协程 asyncio.run(hello()) ``` ### async/await语法详解 #### async关键字 async关键字用于定义一个协程函数。被async修饰的函数具有以下特点: 1. 函数内部可以使用await语句 2. 调用时不会立即执行,而是返回一个协程对象 3. 必须在事件循环中运行 ```python # 定义协程函数 async def fetch_data(): print('开始获取数据') await asyncio.sleep(2) # 模拟网络请求 return {'data': 'hello'} # 调用协程函数会返回协程对象,不会立即执行 coro = fetch_data() # 这里不会打印'开始获取数据' # 必须在事件循环中运行 await coro # 或者使用asyncio.run(coro) ``` #### await关键字 await关键字用于等待一个协程执行完成。它的作用是: 1. 暂停当前协程的执行 2. 将控制权交给事件循环 3. 等待被等待的协程完成后继续执行 ```python async def main(): print('主函数开始') # await会暂停当前协程,等待fetch_data完成 result = await fetch_data() print(f'获取到数据: {result}') # 可以等待多个协程 results = await asyncio.gather( fetch_data(), fetch_data() ) print(f'获取到多个数据: {results}') # 运行主函数 asyncio.run(main()) ``` ## asyncio库详解 ### 事件循环 ```python async def task1(): await asyncio.sleep(1) return 'task1 完成' async def task2(): await asyncio.sleep(2) return 'task2 完成' async def main(): # 创建任务 tasks = [ asyncio.create_task(task1()), asyncio.create_task(task2()) ] # 等待所有任务完成 results = await asyncio.gather(*tasks) print(results) # 运行事件循环 asyncio.run(main()) ``` ### 任务和Future Future是Python异步编程中的一个重要概念,它代表一个异步操作的最终结果。Future对象是一个低层级的可等待对象,具有以下特点: 1. **状态管理**:Future对象可以处于以下状态 - Pending:初始状态,操作尚未完成 - Done:操作已完成(成功或失败) - Cancelled:操作被取消 2. **结果处理**: - 可以通过`result()`方法获取操作结果 - 如果操作失败,会抛出相应的异常 - 支持添加回调函数在操作完成时执行 3. **Task是Future的子类**: - Task继承自Future,专门用于封装和管理协程 - 当我们使用`asyncio.create_task()`创建任务时,实际上创建了一个Task对象 下面是一个展示Future和Task使用的完整示例: ```python import asyncio async def long_operation(): print('开始长时间操作') await asyncio.sleep(3) print('长时间操作完成') return '操作结果' async def main(): # 创建任务(Task是Future的子类) task = asyncio.create_task(long_operation()) # 检查任务状态 print(f'任务状态: {task.done()}') # 添加回调函数 def callback(future): print(f'回调函数获取结果: {future.result()}') task.add_done_callback(callback) # 等待任务完成 try: result = await asyncio.wait_for(task, timeout=5.0) print(f'主函数获取结果: {result}') except asyncio.TimeoutError: print('操作超时') # 取消任务 task.cancel() except Exception as e: print(f'操作失败: {e}') asyncio.run(main()) ``` ## 实际应用示例 ### 1. 异步Web请求 ```python import aiohttp import asyncio async def fetch_url(session, url): async with session.get(url) as response: return await response.text() async def main(): urls = [ 'https://api.github.com/events', 'https://api.github.com/emojis', 'https://api.github.com/meta' ] async with aiohttp.ClientSession() as session: tasks = [fetch_url(session, url) for url in urls] results = await asyncio.gather(*tasks) for url, html in zip(urls, results): print(f'{url}: {len(html)} bytes') # 运行异步请求 asyncio.run(main()) ``` ### 2. 异步文件操作 ```python import aiofiles import asyncio async def write_file(filename, content): async with aiofiles.open(filename, 'w') as f: await f.write(content) async def read_file(filename): async with aiofiles.open(filename, 'r') as f: return await f.read() async def main(): # 写入文件 await write_file('test.txt', 'Hello, Async IO!') # 读取文件 content = await read_file('test.txt') print(f'文件内容: {content}') asyncio.run(main()) ``` ### 3. 异步数据库操作 ```python import asyncpg async def db_operations(): # 连接数据库 conn = await asyncpg.connect( user='user', password='password', database='database', host='localhost' ) # 创建表 await conn.execute(''' CREATE TABLE IF NOT EXISTS users ( id serial PRIMARY KEY, name text, email text ) ''') # 插入数据 await conn.execute(''' INSERT INTO users(name, email) VALUES($1, $2) ''', 'John Doe', 'john@example.com') # 查询数据 rows = await conn.fetch('SELECT * FROM users') for row in rows: print(row) await conn.close() # 运行数据库操作 asyncio.run(db_operations()) ``` ### 4. 异步Web服务器 ```python from aiohttp import web routes = web.RouteTableDef() @routes.get('/') async def hello(request): return web.Response(text='Hello, Async World!') @routes.get('/api/data') async def get_data(request): # 模拟异步数据处理 await asyncio.sleep(1) return web.json_response({'message': 'Data processed'}) async def init_app(): app = web.Application() app.add_routes(routes) return app if __name__ == '__main__': app = init_app() web.run_app(app) ``` ### 5. 异步爬虫 ```python import aiohttp import asyncio from bs4 import BeautifulSoup class AsyncCrawler: def __init__(self, urls): self.urls = urls self.results = [] async def fetch_page(self, session, url): async with session.get(url) as response: return await response.text() async def parse_page(self, html): # 使用BeautifulSoup解析HTML soup = BeautifulSoup(html, 'html.parser') title = soup.title.string if soup.title else 'No title' return title async def process_url(self, session, url): try: html = await self.fetch_page(session, url) title = await self.parse_page(html) return {'url': url, 'title': title} except Exception as e: return {'url': url, 'error': str(e)} async def crawl(self): async with aiohttp.ClientSession() as session: tasks = [self.process_url(session, url) for url in self.urls] self.results = await asyncio.gather(*tasks) return self.results # 使用示例 urls = [ 'https://python.org', 'https://github.com', 'https://stackoverflow.com' ] crawler = AsyncCrawler(urls) asyncio.run(crawler.crawl()) ``` ## 最佳实践 1. **合理使用异步上下文管理器** ```python # 不好的做法 async def bad_practice(): session = aiohttp.ClientSession() response = await session.get('http://example.com') await session.close() # 好的做法 async def good_practice(): async with aiohttp.ClientSession() as session: async with session.get('http://example.com') as response: return await response.text() ``` 2. **避免在协程中使用阻塞操作** ```python # 不好的做法 async def bad_practice(): time.sleep(1) # 阻塞操作 # 好的做法 async def good_practice(): await asyncio.sleep(1) # 非阻塞操作 ``` 3. **正确处理异常** ```python async def handle_errors(): try: async with aiohttp.ClientSession() as session: async with session.get('http://example.com') as response: return await response.text() except aiohttp.ClientError as e: print(f'网络错误: {e}') except Exception as e: print(f'其他错误: {e}') ``` 4. **使用asyncio.gather处理多任务** ```python async def process_multiple_tasks(): async with aiohttp.ClientSession() as session: tasks = [ fetch_data(session, 'url1'), fetch_data(session, 'url2'), fetch_data(session, 'url3') ] return await asyncio.gather(*tasks, return_exceptions=True) ``` 通过本文的学习,你应该已经掌握了Python中协程和异步IO的基本概念和使用方法。这些特性可以帮助你编写高效的并发程序,特别是在处理I/O密集型任务时。继续练习和探索,你会发现更多异步编程的应用场景!