元素码农
基础
UML建模
数据结构
算法
设计模式
网络
TCP/IP协议
HTTPS安全机制
WebSocket实时通信
数据库
sqlite
postgresql
clickhouse
后端
rust
go
java
php
mysql
redis
mongodb
etcd
nats
zincsearch
前端
浏览器
javascript
typescript
vue3
react
游戏
unity
unreal
C++
C#
Lua
App
android
ios
flutter
react-native
安全
Web安全
测试
软件测试
自动化测试 - Playwright
人工智能
Python
langChain
langGraph
运维
linux
docker
工具
git
svn
🌞
🌙
目录
▶
Python基础语法
Python环境安装与配置
第一个Python程序
变量与数据类型
字面量详解
基本运算符
流程控制语句
包管理与虚拟环境
▶
Python数据结构
列表(List)详解
元组(Tuple)使用指南
字典(Dict)完全解析
集合(Set)操作大全
▶
函数与模块
函数定义与参数传递
Lambda表达式
模块导入与使用
常用内置函数
▶
面向对象编程
类与对象
继承与多态
魔术方法解析
装饰器原理与应用
▶
Python类型系统
类型注解(Type Hints)
Pydantic基础
Pydantic高级特性
typing模块基础
泛型类型详解
泛型类详解
Callable类型详解
Awaitable类型详解
类型变量与约束
类型别名与Protocol
TypedDict详解
Annotated类型
Reducer类型
类型检查工具使用
类型注解最佳实践
▶
关键字
pass关键字
raise关键字
global关键字
nonlocal关键字
yield关键字
assert关键字
with关键字
async/await关键字
▶
包管理
pip包管理基础
虚拟环境管理
包管理工具对比
requirements.txt规范
依赖管理与requirements.txt
setup.py配置说明
Poetry项目管理工具
Conda包管理系统
打包与发布Python包
PyPI发布流程
私有PyPI仓库
▶
Python高级特性
迭代器与生成器
多线程编程
协程与异步IO
元编程入门
反射机制详解
描述符协议
上下文管理器协议
垃圾回收机制
内存管理深度解析
性能优化指南
▶
文件与异常处理
文件读写操作
JSON数据解析
异常处理机制
上下文管理器
发布时间:
2025-03-24 13:28
↑
☰
# Python多线程编程详解 本文将详细介绍Python中的多线程编程,包括线程的基本概念、创建方式、同步机制以及实际应用场景,帮助你掌握多线程编程技术。 ## 多线程基础 ### 什么是线程 线程是程序执行的最小单位,一个进程可以包含多个线程。线程共享进程的资源,但拥有自己的栈空间和程序计数器。Python中使用threading模块来实现多线程编程。 ### 创建线程 ```python import threading import time def worker(): print(f'线程 {threading.current_thread().name} 开始工作') time.sleep(2) print(f'线程 {threading.current_thread().name} 结束工作') # 创建线程 thread = threading.Thread(target=worker, name='WorkerThread') # 启动线程 thread.start() # 等待线程结束 thread.join() ``` ### 继承Thread类 ```python class MyThread(threading.Thread): def __init__(self, name): super().__init__() self.name = name def run(self): print(f'线程 {self.name} 开始执行') time.sleep(2) print(f'线程 {self.name} 执行完成') # 创建自定义线程 thread = MyThread('CustomThread') thread.start() thread.join() ``` ## 线程同步 ### 锁机制(Lock) ```python import threading lock = threading.Lock() counter = 0 def increment(): global counter for _ in range(100000): with lock: # 使用上下文管理器自动获取和释放锁 counter += 1 # 创建多个线程 threads = [] for _ in range(5): t = threading.Thread(target=increment) threads.append(t) t.start() # 等待所有线程完成 for t in threads: t.join() print(f'最终计数: {counter}') ``` ### 条件变量(Condition) ```python class Buffer: def __init__(self, size): self.buffer = [] self.size = size self.condition = threading.Condition() def produce(self, item): with self.condition: while len(self.buffer) >= self.size: self.condition.wait() self.buffer.append(item) self.condition.notify() def consume(self): with self.condition: while len(self.buffer) == 0: self.condition.wait() item = self.buffer.pop(0) self.condition.notify() return item # 使用示例 def producer(buffer): for i in range(5): buffer.produce(f'item-{i}') print(f'生产: item-{i}') time.sleep(1) def consumer(buffer): for _ in range(5): item = buffer.consume() print(f'消费: {item}') time.sleep(2) buffer = Buffer(2) p = threading.Thread(target=producer, args=(buffer,)) c = threading.Thread(target=consumer, args=(buffer,)) p.start() c.start() p.join() c.join() ``` ## 线程池 ### concurrent.futures线程池 ```python from concurrent.futures import ThreadPoolExecutor import requests def download_url(url): response = requests.get(url) return f'Downloaded {url}, status: {response.status_code}' urls = [ 'https://api.github.com/events', 'https://api.github.com/emojis', 'https://api.github.com/meta' ] # 使用线程池 with ThreadPoolExecutor(max_workers=3) as executor: # 提交任务并获取Future对象 future_to_url = {executor.submit(download_url, url): url for url in urls} # 获取结果 for future in future_to_url: url = future_to_url[future] try: result = future.result() print(result) except Exception as e: print(f'{url} generated an exception: {e}') ``` ## 实际应用示例 ### 1. 并发文件下载器 ```python import threading import requests import os class FileDownloader: def __init__(self, urls, output_dir): self.urls = urls self.output_dir = output_dir self.lock = threading.Lock() def download_file(self, url): try: response = requests.get(url, stream=True) filename = os.path.join(self.output_dir, url.split('/')[-1]) with self.lock: with open(filename, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) print(f'Downloaded: {filename}') except Exception as e: print(f'Error downloading {url}: {e}') def start_downloads(self): threads = [] for url in self.urls: thread = threading.Thread(target=self.download_file, args=(url,)) threads.append(thread) thread.start() for thread in threads: thread.join() # 使用示例 urls = [ 'https://example.com/file1.pdf', 'https://example.com/file2.pdf', 'https://example.com/file3.pdf' ] downloader = FileDownloader(urls, './downloads') downloader.start_downloads() ``` ### 2. 多线程网络爬虫 ```python import threading import requests from queue import Queue from bs4 import BeautifulSoup class WebCrawler: def __init__(self, base_url, max_threads=5): self.base_url = base_url self.queue = Queue() self.results = [] self.visited = set() self.lock = threading.Lock() self.max_threads = max_threads def crawl_page(self): while True: try: url = self.queue.get(timeout=3) if url in self.visited: self.queue.task_done() continue response = requests.get(url) soup = BeautifulSoup(response.text, 'html.parser') with self.lock: self.visited.add(url) self.results.append({ 'url': url, 'title': soup.title.string if soup.title else 'No title' }) # 获取新链接 for link in soup.find_all('a'): new_url = link.get('href') if new_url and new_url.startswith(self.base_url): self.queue.put(new_url) self.queue.task_done() except Queue.Empty: break except Exception as e: print(f'Error crawling {url}: {e}') self.queue.task_done() def start_crawling(self): self.queue.put(self.base_url) threads = [] for _ in range(self.max_threads): t = threading.Thread(target=self.crawl_page) t.start() threads.append(t) for t in threads: t.join() return self.results # 使用示例 crawler = WebCrawler('https://example.com') results = crawler.start_crawling() for result in results: print(f'URL: {result["url"]}, Title: {result["title"]}') ``` ### 3. 多线程图像处理 ```python import threading from PIL import Image import os class ImageProcessor: def __init__(self, input_dir, output_dir, max_threads=4): self.input_dir = input_dir self.output_dir = output_dir self.max_threads = max_threads self.queue = Queue() def process_image(self): while True: try: image_file = self.queue.get(timeout=3) input_path = os.path.join(self.input_dir, image_file) output_path = os.path.join(self.output_dir, f'processed_{image_file}') # 打开图像 with Image.open(input_path) as img: # 调整大小 resized = img.resize((800, 600)) # 转换为灰度图 gray = resized.convert('L') # 保存处理后的图像 gray.save(output_path) print(f'Processed: {image_file}') self.queue.task_done() except Queue.Empty: break except Exception as e: print(f'Error processing {image_file}: {e}') self.queue.task_done() def start_processing(self): # 确保输出目录存在 os.makedirs(self.output_dir, exist_ok=True) # 获取所有图像文件 image_files = [f for f in os.listdir(self.input_dir) if f.endswith(('.png', '.jpg', '.jpeg'))] # 将文件添加到队列 for image_file in image_files: self.queue.put(image_file) # 创建并启动线程 threads = [] for _ in range(min(self.max_threads, len(image_files))): t = threading.Thread(target=self.process_image) t.start() threads.append(t) # 等待所有线程完成 for t in threads: t.join() # 使用示例 processor = ImageProcessor('./input_images', './output_images') processor.start_processing() ``` ## 最佳实践 1. **合理使用线程数量** ```python import multiprocessing # 获取CPU核心数 num_cores = multiprocessing.cpu_count() # 设置线程数为核心数的2倍 num_threads = num_cores * 2 ``` 2. **避免死锁** ```python # 不好的做法 def transfer(account1, account2, amount): with account1.lock: with account2.lock: # 可能导致死锁 account1.withdraw(amount) account2.deposit(amount) # 好的做法 def transfer(account1, account2, amount): # 始终按照固定顺序获取锁 first = min(account1, account2, key=id) second = max(account1, account2, key=id) with first.lock: with second.lock: account1.withdraw(amount) account2.deposit(amount) ``` 3. **使用线程池而不是手动创建线程** ```python # 不好的做法 threads = [] for url in urls: t = threading.Thread(target=download, args=(url,)) threads.append(t) t.start() # 好的做法 with ThreadPoolExecutor(max_workers=5) as executor: executor.map(download, urls) ``` 4. **正确处理异常** ```python def worker(): try: # 执行任务 process_data() except Exception as e: print(f'Error in thread: {e}') finally: # 清理资源 cleanup_resources() ``` 通过本文的学习,你应该已经掌握了Python多线程编程的基本概念和使用方法。多线程编程可以显著提高程序的执行效率,特别是在I/O密集型任务中。继续练习和探索,你会发现更多多线程编程的应用场景!