元素码农
基础
UML建模
数据结构
算法
设计模式
网络
TCP/IP协议
HTTPS安全机制
WebSocket实时通信
数据库
sqlite
postgresql
clickhouse
后端
rust
go
java
php
mysql
redis
mongodb
etcd
nats
zincsearch
前端
浏览器
javascript
typescript
vue3
react
游戏
unity
unreal
C++
C#
Lua
App
android
ios
flutter
react-native
安全
Web安全
测试
软件测试
自动化测试 - Playwright
人工智能
Python
langChain
langGraph
运维
linux
docker
工具
git
svn
🌞
🌙
目录
▶
JVM架构
类加载器机制
运行时数据区
执行引擎工作原理
JIT编译器优化
▶
内存管理
垃圾回收算法
分代收集机制
内存分配策略
内存泄漏诊断
▶
并发编程
线程池实现原理
synchronized锁优化
AQS框架解析
并发集合类原理
▶
字节码技术
字节码指令集解析
ASM操作指南
动态代理实现
字节码验证机制
▶
性能调优
JVM参数解析
GC日志分析
堆内存诊断
JIT性能优化
发布时间:
2025-03-22 09:32
↑
☰
# Java线程池实现原理 Java线程池是并发编程中非常重要的组件,它可以有效地管理和复用线程,避免频繁创建和销毁线程带来的开销。本文将详细介绍Java线程池的实现原理、核心组件以及实践应用。 ## 线程池基础 ### 1. 为什么使用线程池 ```java public class ThreadCreationDemo { public static void main(String[] args) { // 不使用线程池 for (int i = 0; i < 100; i++) { new Thread(() -> { System.out.println("执行任务"); }).start(); } // 使用线程池 ExecutorService executor = Executors.newFixedThreadPool(5); for (int i = 0; i < 100; i++) { executor.execute(() -> { System.out.println("执行任务"); }); } executor.shutdown(); } } ``` 使用线程池的优势: 1. 降低资源消耗 2. 提高响应速度 3. 提高线程的可管理性 4. 提供更多更强大的功能 ### 2. 线程池的核心参数 ```java public class ThreadPoolParamsDemo { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, // 核心线程数 4, // 最大线程数 60L, // 空闲线程存活时间 TimeUnit.SECONDS, // 时间单位 new LinkedBlockingQueue<>(100), // 工作队列 Executors.defaultThreadFactory(),// 线程工厂 new ThreadPoolExecutor.AbortPolicy() // 拒绝策略 ); // 提交任务 executor.execute(() -> { System.out.println("任务执行"); }); } } ``` 核心参数说明: - corePoolSize:核心线程数 - maximumPoolSize:最大线程数 - keepAliveTime:空闲线程存活时间 - workQueue:工作队列 - threadFactory:线程工厂 - handler:拒绝策略 ## 工作原理 ### 1. 任务提交流程 ```java public class TaskSubmissionDemo { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2)); // 模拟任务提交流程 for (int i = 0; i < 10; i++) { final int taskId = i; executor.execute(() -> { System.out.println("任务" + taskId + "开始执行"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务" + taskId + "执行完成"); }); } executor.shutdown(); } } ``` 任务提交步骤: 1. 核心线程数未满,创建新线程 2. 核心线程数已满,加入队列 3. 队列已满,创建新线程(未超过最大线程数) 4. 达到最大线程数,执行拒绝策略 ### 2. 线程池状态 ```java public class ThreadPoolStateDemo { public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100)); // 提交任务 executor.execute(() -> { System.out.println("任务1执行"); }); // 关闭线程池 executor.shutdown(); // 等待任务完成 if (executor.awaitTermination(10, TimeUnit.SECONDS)) { System.out.println("所有任务执行完成"); } } } ``` 线程池状态: 1. RUNNING:接收新任务并处理队列中的任务 2. SHUTDOWN:不接收新任务,但处理队列中的任务 3. STOP:不接收新任务,不处理队列中的任务 4. TIDYING:所有任务已终止,线程池为空 5. TERMINATED:terminated()方法执行完成 ## 任务队列 ### 1. 常用队列类型 ```java public class QueueTypeDemo { public static void main(String[] args) { // 1. 无界队列 ExecutorService linkedQueue = new ThreadPoolExecutor( 2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); // 2. 有界队列 ExecutorService arrayQueue = new ThreadPoolExecutor( 2, 4, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100)); // 3. 同步队列 ExecutorService syncQueue = new ThreadPoolExecutor( 2, 4, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>()); // 4. 优先级队列 ExecutorService priorityQueue = new ThreadPoolExecutor( 2, 4, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<>()); } } ``` 队列特点: 1. LinkedBlockingQueue:无界队列,可能导致OOM 2. ArrayBlockingQueue:有界队列,需要指定容量 3. SynchronousQueue:不存储元素的阻塞队列 4. PriorityBlockingQueue:优先级队列 ### 2. 队列选择策略 ```java public class QueueStrategyDemo { public static void main(String[] args) { // 1. CPU密集型任务 ThreadPoolExecutor cpuPool = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100)); // 2. IO密集型任务 ThreadPoolExecutor ioPool = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1000)); } } ``` 选择建议: 1. CPU密集型:使用有界队列 2. IO密集型:使用无界队列 3. 混合型:根据实际情况选择 ## 拒绝策略 ### 1. 内置拒绝策略 ```java public class RejectionPolicyDemo { public static void main(String[] args) { // 1. AbortPolicy ThreadPoolExecutor abortPool = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.AbortPolicy()); // 2. CallerRunsPolicy ThreadPoolExecutor callerRunsPool = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.CallerRunsPolicy()); // 3. DiscardPolicy ThreadPoolExecutor discardPool = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy()); // 4. DiscardOldestPolicy ThreadPoolExecutor discardOldestPool = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.DiscardOldestPolicy()); } } ``` 拒绝策略说明: 1. AbortPolicy:抛出异常 2. CallerRunsPolicy:由调用线程执行任务 3. DiscardPolicy:直接丢弃任务 4. DiscardOldestPolicy:丢弃最旧的任务 ### 2. 自定义拒绝策略 ```java public class CustomRejectionPolicyDemo { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), (r, e) -> { // 自定义拒绝策略 System.out.println("任务被拒绝"); // 可以保存到数据库 saveTask(r); // 或者重试 retryTask(r); }); } private static void saveTask(Runnable r) { // 保存任务到数据库 } private static void retryTask(Runnable r) { // 重试任务 } } ``` 自定义策略场景: 1. 任务持久化 2. 任务重试 3. 任务降级 4. 告警通知 ## 线程池监控 ### 1. 监控指标 ```java public class ThreadPoolMonitorDemo { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100)); // 定期监控线程池状态 ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1); monitor.scheduleAtFixedRate(() -> { System.out.println("==========线程池状态=========="); System.out.println("核心线程数:" + executor.getCorePoolSize()); System.out.println("活动线程数:" + executor.getActiveCount()); System.out.println("最大线程数:" + executor.getMaximumPoolSize()); System.out.println("线程池大小:" + executor.getPoolSize()); System.out.println("队列任务数:" + executor.getQueue().size()); System.out.println("已完成任务数:" + executor.getCompletedTaskCount()); System.out.println("总任务数:" + executor.getTaskCount()); System.out.println("=============================="); }, 0, 1, TimeUnit.SECONDS); } } ``` 监控指标: 1. 线程数量指标 2. 任务数量指标 3. 队列大小指标 4. 任务执行指标 ### 2. 扩展线程池 ```java public class ExtendedThreadPoolDemo { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100)) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("任务执行前:" + r.toString()); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("任务执行后:" + r.toString()); if (t != null) { System.out.println("任务执行异常:" + t.getMessage()); } } @Override protected void terminated() { System.out.println("线程池已关闭"); } }; // 提交任务 executor.execute(() -> { System.out.println("任务执行中"); }); executor.shutdown(); } } ``` 扩展点: 1. beforeExecute:任务执行前的处理 2. afterExecute:任务执行后的处理 3. terminated:线程池终止时的处理 ## 最佳实践 ### 1. 线程池配置 ```java public class ThreadPoolConfigDemo { public static void main(String[] args) { // 1. CPU密集型任务配置 ThreadPoolExecutor cpuIntensivePool = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() + 1, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy()); // 2. IO密集型任务配置 ThreadPoolExecutor ioIntensivePool = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); // 3. 混合型任务配置 ThreadPoolExecutor mixedPool = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors() + 1, Runtime.getRuntime().available