元素码农
基础
UML建模
数据结构
算法
设计模式
网络
TCP/IP协议
HTTPS安全机制
WebSocket实时通信
数据库
sqlite
postgresql
clickhouse
后端
rust
go
java
php
mysql
redis
mongodb
etcd
nats
zincsearch
前端
浏览器
javascript
typescript
vue3
react
游戏
unity
unreal
C++
C#
Lua
App
android
ios
flutter
react-native
安全
Web安全
测试
软件测试
自动化测试 - Playwright
人工智能
Python
langChain
langGraph
运维
linux
docker
工具
git
svn
🌞
🌙
目录
▶
算法基础
▶
复杂度分析
时间复杂度
空间复杂度
渐进符号
▶
算法思想
分治法
贪心算法
回溯法
概率算法
枚举算法
递推算法
递归算法
动态规划
分支限界法
模拟算法
▶
算法分析
正确性证明
性能优化
算法比较
▶
搜索算法
▶
基本搜索
顺序搜索
二分搜索
插值搜索
▶
图搜索
深度优先搜索
广度优先搜索
启发式搜索
▶
高级搜索
双向搜索
A*算法
模式匹配
▶
排序算法
▶
比较类排序
冒泡排序
快速排序
归并排序
插入排序
选择排序
▶
非比较类排序
计数排序
基数排序
桶排序
▶
高级排序
堆排序
希尔排序
外部排序
拓扑排序
▶
图论算法
▶
图的表示
邻接矩阵
邻接表
边集数组
▶
最短路径
Dijkstra算法
Floyd算法
Bellman-Ford算法
最短路径概述
▶
生成树
Prim算法
Kruskal算法
并查集
最小生成树
▶
图的连通性
强连通分量
割点与桥
双连通分量
欧拉路径
▶
动态规划
▶
基础概念
最优子结构
重叠子问题
状态转移方程
▶
经典问题
背包问题
最长公共子序列
编辑距离
▶
优化技巧
空间优化
状态压缩
区间动态规划
▶
字符串算法
▶
字符串匹配
KMP算法
Boyer-Moore算法
Sunday算法
▶
字符串处理
字典树
后缀数组
字符串哈希
▶
压缩算法
游程编码
Huffman编码
LZ77算法
发布时间:
2025-03-21 18:45
↑
☰
# 桶排序 桶排序(Bucket Sort)是一种分布式排序算法,它通过将元素分配到不同的桶中,然后对每个桶中的元素进行排序,最后将所有桶中的元素按顺序合并。这种算法特别适用于输入数据均匀分布的情况。 ## 基本原理 1. 创建一定数量的桶(通常是空的数组) 2. 遍历原始数组,根据某种映射函数将元素分配到对应的桶中 3. 对每个非空桶中的元素进行排序 4. 按顺序遍历桶,将元素放回原数组 ## 动画演示 ``` 原始数组:[ 0.78, 0.17, 0.39, 0.26, 0.72, 0.94, 0.21, 0.12, 0.23, 0.68 ] 创建桶(这里使用5个桶): Bucket 0 (0.0-0.2): [ 0.17, 0.12 ] Bucket 1 (0.2-0.4): [ 0.26, 0.39, 0.21, 0.23 ] Bucket 2 (0.4-0.6): [ ] Bucket 3 (0.6-0.8): [ 0.78, 0.72, 0.68 ] Bucket 4 (0.8-1.0): [ 0.94 ] 对每个桶排序: Bucket 0: [ 0.12, 0.17 ] Bucket 1: [ 0.21, 0.23, 0.26, 0.39 ] Bucket 2: [ ] Bucket 3: [ 0.68, 0.72, 0.78 ] Bucket 4: [ 0.94 ] 合并结果:[ 0.12, 0.17, 0.21, 0.23, 0.26, 0.39, 0.68, 0.72, 0.78, 0.94 ] ``` ## 代码实现 ### 基本实现 ```python def bucket_sort(arr): if not arr: return arr # 找出最大值和最小值 max_val = max(arr) min_val = min(arr) # 确定桶的数量(这里使用数组长度的平方根) bucket_count = int(len(arr) ** 0.5) # 创建桶 buckets = [[] for _ in range(bucket_count)] # 计算每个桶的范围 bucket_range = (max_val - min_val) / bucket_count # 将元素分配到桶中 for num in arr: if num == max_val: bucket_index = bucket_count - 1 else: bucket_index = int((num - min_val) / bucket_range) buckets[bucket_index].append(num) # 对每个桶进行排序 for bucket in buckets: bucket.sort() # 合并桶中的元素 result = [] for bucket in buckets: result.extend(bucket) return result # 使用示例 arr = [0.78, 0.17, 0.39, 0.26, 0.72, 0.94, 0.21, 0.12, 0.23, 0.68] print("排序前:", arr) sorted_arr = bucket_sort(arr) print("排序后:", sorted_arr) ``` ### 整数版本 ```python def bucket_sort_int(arr): if not arr: return arr # 找出最大值和最小值 max_val = max(arr) min_val = min(arr) # 创建桶 bucket_range = (max_val - min_val) // 10 + 1 # 每个桶的范围 buckets = [[] for _ in range(10)] # 使用10个桶 # 将元素分配到桶中 for num in arr: bucket_index = (num - min_val) // bucket_range buckets[bucket_index].append(num) # 对每个桶进行排序 for bucket in buckets: if bucket: # 只对非空桶排序 bucket.sort() # 合并桶中的元素 result = [] for bucket in buckets: result.extend(bucket) return result # 使用示例 arr = [29, 15, 28, 45, 39, 89, 92, 11, 12, 42] print("排序前:", arr) sorted_arr = bucket_sort_int(arr) print("排序后:", sorted_arr) ``` ### 优化版本 ```python class AdaptiveBucketSort: def __init__(self): self.insertion_sort_threshold = 32 def insertion_sort(self, arr): for i in range(1, len(arr)): key = arr[i] j = i - 1 while j >= 0 and arr[j] > key: arr[j + 1] = arr[j] j -= 1 arr[j + 1] = key return arr def estimate_bucket_count(self, n, data_range): # 根据数据范围和元素数量动态调整桶的数量 if n < 50: return max(int(n/2), 5) elif data_range < n: return int(data_range ** 0.5) else: return int(n ** 0.5) def sort(self, arr): if not arr: return arr # 找出最大值和最小值 max_val = max(arr) min_val = min(arr) data_range = max_val - min_val # 确定桶的数量 bucket_count = self.estimate_bucket_count(len(arr), data_range) bucket_range = data_range / bucket_count + 1 # 创建桶 buckets = [[] for _ in range(bucket_count)] # 将元素分配到桶中 for num in arr: if num == max_val: bucket_index = bucket_count - 1 else: bucket_index = int((num - min_val) / bucket_range) buckets[bucket_index].append(num) # 对每个桶进行排序 result = [] for bucket in buckets: if len(bucket) <= self.insertion_sort_threshold: # 小数组使用插入排序 result.extend(self.insertion_sort(bucket)) else: # 大数组递归使用桶排序 result.extend(self.sort(bucket)) return result # 使用示例 sorter = AdaptiveBucketSort() arr = [0.78, 0.17, 0.39, 0.26, 0.72, 0.94, 0.21, 0.12, 0.23, 0.68] print("排序前:", arr) sorted_arr = sorter.sort(arr) print("排序后:", sorted_arr) ``` ## 性能分析 ### 时间复杂度 - 最好情况:O(n + k),当数据均匀分布时 - 最坏情况:O(n²),当数据分布极不均匀时 - 平均情况:O(n + k),其中k是桶的数量 ### 空间复杂度 - O(n + k),需要额外的桶空间 ### 稳定性 - 稳定排序算法(取决于桶内排序算法的选择) ## 优缺点 ### 优点 1. 当输入数据均匀分布时,效率很高 2. 可以并行化实现 3. 适合外部排序 4. 可以与其他排序算法结合 ### 缺点 1. 需要额外的空间 2. 对数据分布敏感 3. 桶的数量需要合理选择 ## 应用场景 1. 数据分布均匀的场景 2. 外部排序 3. 并行计算环境 4. 数值范围有限的数据排序 ## 实际应用示例 ### 1. 成绩分组统计 ```python class GradeAnalyzer: def __init__(self): self.grades = [] self.buckets = [[] for _ in range(10)] # 0-9, 10-19, ..., 90-100 def add_grade(self, student_name, grade): self.grades.append((student_name, grade)) def analyze(self): # 将成绩分配到不同的分数段 for name, grade in self.grades: bucket_index = min(grade // 10, 9) self.buckets[bucket_index].append((name, grade)) # 对每个分数段内的成绩排序 for bucket in self.buckets: bucket.sort(key=lambda x: x[1], reverse=True) # 生成分析报告 report = {} ranges = ["0-9", "10-19", "20-29", "30-39", "40-49", "50-59", "60-69", "70-79", "80-89", "90-100"] for i, bucket in enumerate(self.buckets): if bucket: report[ranges[i]] = { "count": len(bucket), "average": sum(grade for _, grade in bucket) / len(bucket), "students": bucket } return report # 使用示例 analyzer = GradeAnalyzer() grades = [ ("Alice", 85), ("Bob", 92), ("Charlie", 78), ("David", 95), ("Eve", 88), ("Frank", 73), ("Grace", 68), ("Henry", 91), ("Ivy", 82) ] for name, grade in grades: analyzer.add_grade(name, grade) report = analyzer.analyze() print("成绩分析报告:") for range_str, data in report.items(): print(f"\n分数段 {range_str}:") print(f"人数: {data['count']}") print(f"平均分: {data['average']:.2f}") print("学生列表:") for name, grade in data['students']: print(f" {name}: {grade}") ``` ### 2. 数据分布分析 ```python class DataDistributionAnalyzer: def __init__(self, bucket_count=10): self.bucket_count = bucket_count self.data = [] def add_data(self, value): self.data.append(value) def analyze(self): if not self.data: return {} # 找出数据范围 min_val = min(self.data) max_val = max(self.data) data_range = max_val - min_val # 创建桶 buckets = [[] for _ in range(self.bucket_count)] bucket_range = data_range / self.bucket_count # 分配数据 for value in self.data: if value == max_val: bucket_index = self.bucket_count - 1 else: bucket_index = int((value - min_val) / bucket_range) buckets[bucket_index].append(value) # 生成分析报告 report = {} for i, bucket in enumerate(buckets): if bucket: lower_bound = min_val + i * bucket_range upper_bound = lower_bound + bucket_range report[f"{lower_bound:.2f}-{upper_bound:.2f}"] = { "count": len(bucket), "percentage": len(bucket) / len(self.data) * 100, "average": sum(bucket) / len(bucket), "min": min(bucket), "max": max(bucket) } return report # 使用示例 analyzer = DataDistributionAnalyzer() data = [ 23.5, 45.6, 78.9, 34.2, 56.7, 89.1, 12.3, 67.8, 90.4, 43.2, 55.6, 88.7 ] for value in data: analyzer.add_data(value) report = analyzer.analyze() print("数据分布分析报告:") for range_str, stats in report.items(): print(f"\n区间 {range_str}:") print(f"数量: {stats['count']}") print(f"占比: {stats['percentage']:.2f}%") print(f"平均值: {stats['average']:.2f}") print(f"最小值: {stats['min']:.2f}") print(f"最大值: {stats['max']:.2f}") ``` ## 总结