元素码农
基础
UML建模
数据结构
算法
设计模式
网络
TCP/IP协议
HTTPS安全机制
WebSocket实时通信
数据库
sqlite
postgresql
clickhouse
后端
rust
go
java
php
mysql
redis
mongodb
etcd
nats
zincsearch
前端
浏览器
javascript
typescript
vue3
react
游戏
unity
unreal
C++
C#
Lua
App
android
ios
flutter
react-native
安全
Web安全
测试
软件测试
自动化测试 - Playwright
人工智能
Python
langChain
langGraph
运维
linux
docker
工具
git
svn
🌞
🌙
目录
▶
Redis核心
▶
数据结构
字符串实现
哈希表实现
列表实现
集合实现
有序集合实现
▶
内存管理
内存分配策略
淘汰算法
▶
持久化
▶
RDB机制
快照生成原理
文件格式解析
▶
AOF机制
命令追加策略
重写过程分析
▶
高可用
▶
主从复制
SYNC原理
增量复制
▶
哨兵机制
故障检测
领导选举
▶
高级特性
▶
事务系统
ACID实现
WATCH原理
▶
Lua脚本
沙盒环境
执行管道
▶
实战问题
▶
缓存问题
缓存雪崩
缓存穿透
缓存击穿
缓存预热
▶
数据一致性
读写一致性
双写一致性
▶
性能优化
大key处理
热点key优化
发布时间:
2025-03-22 10:48
↑
☰
# Redis事务ACID实现原理 ## 引言 Redis事务提供了一种将多个命令打包、按顺序执行的机制。虽然Redis的事务实现与传统数据库有所不同,但它仍然保证了一定程度的ACID特性。本文将深入分析Redis事务的ACID实现原理。 ## 基本概念 ### 1. 事务命令 ```redis # 开始事务 MULTI # 执行命令 EXEC # 取消事务 DISCARD # 监视键 WATCH key [key ...] # 取消监视 UNWATCH ``` ### 2. 事务状态 ```c // 事务状态 typedef struct redisClient { multiState mstate; // 事务状态 int flags; // 标志 list *watched_keys; // 监视键链表 } redisClient; // 事务状态结构 typedef struct multiState { multiCmd *commands; // 事务队列 int count; // 命令计数 int minreplicas; // 最小从节点数 time_t minreplicas_timeout; // 超时时间 } multiState; ``` ## ACID特性实现 ### 1. 原子性(Atomicity) ```c // 事务执行 void execCommand(redisClient *c) { int j; robj **orig_argv; int orig_argc; struct redisCommand *orig_cmd; int must_propagate = 0; int was_master = server.masterhost == NULL; /* 检查是否在事务状态 */ if (!(c->flags & CLIENT_MULTI)) { addReplyError(c,"EXEC without MULTI"); return; } /* 检查是否有命令入队错误 */ if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) { addReply(c,shared.nullmultibulk); discardTransaction(c); return; } /* 执行事务队列中的命令 */ unwatchAllKeys(c); orig_argv = c->argv; orig_argc = c->argc; orig_cmd = c->cmd; addReplyMultiBulkLen(c,c->mstate.count); for (j = 0; j < c->mstate.count; j++) { c->argc = c->mstate.commands[j].argc; c->argv = c->mstate.commands[j].argv; c->cmd = c->mstate.commands[j].cmd; call(c,CMD_CALL_FULL); } /* 清理事务状态 */ discardTransaction(c); } ``` ### 2. 一致性(Consistency) ```c // 命令入队检查 void queueMultiCommand(redisClient *c) { multiCmd *mc; int j; /* 检查命令语法 */ if (c->argc == 0) { addReplyError(c,"wrong number of arguments for empty command"); return; } /* 分配事务命令空间 */ mc = zmalloc(sizeof(multiCmd)); mc->cmd = c->cmd; mc->argc = c->argc; mc->argv = zmalloc(sizeof(robj*)*c->argc); for (j = 0; j < c->argc; j++) { mc->argv[j] = c->argv[j]; incrRefCount(mc->argv[j]); } /* 将命令添加到事务队列 */ c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.count+1)); c->mstate.commands[c->mstate.count] = *mc; c->mstate.count++; zfree(mc); } ``` ### 3. 隔离性(Isolation) ```c // WATCH命令实现 void watchCommand(redisClient *c) { int j; /* 检查是否已经在事务中 */ if (c->flags & CLIENT_MULTI) { addReplyError(c,"WATCH inside MULTI is not allowed"); return; } /* 监视键 */ for (j = 1; j < c->argc; j++) watchForKey(c,c->argv[j]); addReply(c,shared.ok); } // 监视键实现 void watchForKey(redisClient *c, robj *key) { list *clients = NULL; listIter li; listNode *ln; watchedKey *wk; /* 检查是否已经监视了该键 */ listRewind(c->watched_keys,&li); while((ln = listNext(&li))) { wk = listNodeValue(ln); if (wk->db == c->db && equalStringObjects(key,wk->key)) return; } /* 将键添加到监视列表 */ clients = dictFetchValue(c->db->watched_keys,key); if (!clients) { clients = listCreate(); dictAdd(c->db->watched_keys,key,clients); incrRefCount(key); } listAddNodeTail(clients,c); /* 将监视信息添加到客户端 */ wk = zmalloc(sizeof(watchedKey)); wk->key = key; wk->db = c->db; incrRefCount(key); listAddNodeTail(c->watched_keys,wk); } ``` ### 4. 持久性(Durability) ```c // AOF持久化 void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { sds buf = sdsempty(); robj *tmpargv[3]; /* 如果是事务命令 */ if (dictid != server.aof_selected_db) { char seldb[64]; snprintf(seldb,sizeof(seldb),"*2\r\n$6\r\nSELECT\r\n$%d\r\n%d\r\n", (int)strlen(sdsfromlonglong(dictid)),dictid); buf = sdscatlen(buf,seldb,strlen(seldb)); server.aof_selected_db = dictid; } /* 将命令转换为RESP格式 */ buf = catAppendOnlyGenericCommand(buf,argc,argv); /* 追加到AOF缓冲区 */ if (server.aof_state == AOF_ON) server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf)); } ``` ## 性能优化 ### 1. 命令队列优化 ```c // 命令队列管理 void discardTransaction(redisClient *c) { freeClientMultiState(c); initClientMultiState(c); c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC); unwatchAllKeys(c); } ``` ### 2. 内存优化 ```c // 内存回收 void freeClientMultiState(redisClient *c) { int j; for (j = 0; j < c->mstate.count; j++) { int i; multiCmd *mc = c->mstate.commands+j; for (i = 0; i < mc->argc; i++) { decrRefCount(mc->argv[i]); } zfree(mc->argv); } zfree(c->mstate.commands); } ``` ### 3. 监视键优化 ```c // 取消监视优化 void unwatchAllKeys(redisClient *c) { listIter li; listNode *ln; if (listLength(c->watched_keys) == 0) return; listRewind(c->watched_keys,&li); while((ln = listNext(&li))) { watchedKey *wk = listNodeValue(ln); list *clients = dictFetchValue(wk->db->watched_keys,wk->key); listNode *ln; ln = listSearchKey(clients,c); listDelNode(clients,ln); if (listLength(clients) == 0) { dictDelete(wk->db->watched_keys,wk->key); } } listRelease(c->watched_keys); c->watched_keys = listCreate(); } ``` ## 监控和维护 ### 1. 事务状态监控 ```redis # 查看事务状态 INFO commandstats | grep multi # 查看监视键状态 INFO stats | grep watch ``` ### 2. 性能监控 ```redis # 监控事务执行时间 INFO commandstats | grep exec # 监控内存使用 INFO memory | grep multi ``` ### 3. 错误处理 ```redis # 检查事务错误 INFO stats | grep rejected_exec # 查看命令错误 INFO stats | grep rejected_commands ``` ## 最佳实践 ### 1. 事务使用建议 - 合理使用WATCH机制 - 控制事务大小 - 避免长时间事务 ### 2. 性能优化建议 - 减少监视键数量 - 及时清理监视状态 - 优化命令顺序 ### 3. 错误处理建议 - 做好错误检查 - 实现重试机制 - 记录错误日志 ### 4. 监控建议 - 监控事务状态 - 跟踪性能指标 - 设置告警阈值 ## 常见问题 ### 1. 事务失败 - 命令语法错误 - 监视键被修改 - 内存不足 ### 2. 性能问题 - 事务队列过大 - 监视键过多 - 命令执行慢 ### 3. 并发问题 - 键值冲突 - 死锁风险 - 超时处理 ## 总结 Redis事务通过简单而高效的方式实现了ACID特性,虽然与传统数据库的事务有所不同,但它能满足大多数应用场景的需求。在实际使用中,需要根据业务特点选择合适的事务策略,并通过合理的监控和优化来确保事务的可靠性和性能。