Redis高可用与分布式:构建坚不可摧的Redis集群
单台Redis服务器虽然很快,但在生产环境中,我们需要考虑高可用性和扩展性。这就像是从开小轿车升级到开飞机——需要更复杂的系统来保证稳定飞行。
1. 主从复制:Redis的"分身术"
主从复制就像孙悟空的分身术,一个主库(Master)可以变出多个从库(Slave),实现数据的备份和读写分离。
主从复制配置
bash
# 主库配置(redis-master.conf)
port 6379
bind 0.0.0.0
requirepass master_password
masterauth master_password
# 从库配置(redis-slave.conf)
port 6380
bind 0.0.0.0
slaveof 127.0.0.1 6379 # 连接主库
requirepass slave_password
masterauth master_password
# 启动主库
redis-server redis-master.conf
# 启动从库
redis-server redis-slave.conf主从复制命令
bash
# 在从库上查看复制状态
INFO replication
# 动态设置主从关系(Redis 5.0前)
SLAVEOF master_ip master_port
# 动态设置主从关系(Redis 5.0+)
REPLICAOF master_ip master_port
# 断开主从关系
SLAVEOF NO ONE # Redis 5.0前
REPLICAOF NO ONE # Redis 5.0+
# 查看主库连接的从库
INFO replication复制原理
- 全量复制:从库首次连接主库时,主库生成RDB文件发送给从库
- 增量复制:主库将写命令同步到从库(基于复制偏移量)
- 心跳检测:主从库定期发送心跳包检测连接状态
读写分离实现
python
# Python示例:简单的读写分离
import redis
class RedisCluster:
def __init__(self):
# 主库(写操作)
self.master = redis.Redis(host='localhost', port=6379, password='master_password')
# 从库(读操作)
self.slave = redis.Redis(host='localhost', port=6380, password='slave_password')
def write(self, key, value):
"""写操作到主库"""
return self.master.set(key, value)
def read(self, key):
"""读操作从从库"""
return self.slave.get(key)
# 使用示例
cluster = RedisCluster()
cluster.write('user:1001', '张三')
name = cluster.read('user:1001')2. 哨兵(Sentinel):Redis的"守护天使"
哨兵系统就像Redis集群的守护天使,时刻监控主从库的健康状态,并在主库宕机时自动进行故障转移。
哨兵配置
bash
# 哨兵配置文件(sentinel.conf)
port 26379
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel auth-pass mymaster master_password
sentinel down-after-milliseconds mymaster 5000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 10000
# 启动哨兵
redis-sentinel sentinel.conf
# 或者
redis-server sentinel.conf --sentinel哨兵配置项说明
bash
# 监控主库
sentinel monitor <master-name> <ip> <port> <quorum>
# <master-name>: 主库名称
# <ip> <port>: 主库地址
# <quorum>: 判定主库下线所需的哨兵数量
# 主库密码
sentinel auth-pass <master-name> <password>
# 判定主库下线的时间(毫秒)
sentinel down-after-milliseconds <master-name> 5000
# 故障转移时,同时进行同步的从库数量
sentinel parallel-syncs <master-name> 1
# 故障转移超时时间(毫秒)
sentinel failover-timeout <master-name> 10000哨兵命令
bash
# 连接哨兵
redis-cli -p 26379
# 查看监控的主库
SENTINEL masters
# 查看主库的从库
SENTINEL slaves mymaster
# 查看哨兵信息
SENTINEL sentinels mymaster
# 手动故障转移
SENTINEL failover mymaster故障转移流程
- 哨兵检测到主库下线
- 多个哨兵确认主库下线
- 哨兵选举出领导者
- 领导者选择新的主库(从库中选举)
- 其他从库切换到新主库
- 通知客户端新的主库地址
3. 集群(Cluster):Redis的"分布式军团"
Redis集群是Redis的分布式解决方案,将数据分散到多个节点上,实现水平扩展和高可用。
集群搭建
bash
# 创建集群配置文件(redis-cluster-7000.conf)
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 5000
appendonly yes
# 其他节点配置(7001-7005.conf)
port 7001 # 端口依次递增
cluster-enabled yes
cluster-config-file nodes-7001.conf # 文件名对应端口
cluster-node-timeout 5000
appendonly yes
# 启动所有节点
redis-server redis-cluster-7000.conf
redis-server redis-cluster-7001.conf
redis-server redis-cluster-7002.conf
redis-server redis-cluster-7003.conf
redis-server redis-cluster-7004.conf
redis-server redis-cluster-7005.conf
# 创建集群(至少3个主节点)
redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 --cluster-replicas 1集群命令
bash
# 连接集群节点
redis-cli -c -p 7000 # -c参数启用集群模式
# 查看集群信息
CLUSTER INFO
# 查看集群节点
CLUSTER NODES
# 查看键属于哪个槽
CLUSTER KEYSLOT mykey
# 添加节点
redis-cli --cluster add-node 127.0.0.1:7006 127.0.0.1:7000
# 删除节点
redis-cli --cluster del-node 127.0.0.1:7000 <node-id>
# 重新分片
redis-cli --cluster reshard 127.0.0.1:7000数据分布原理
Redis集群使用哈希槽(Hash Slot)来分布数据:
- 默认16384个哈希槽
- 每个键通过CRC16算法计算槽位:
CRC16(key) % 16384 - 每个节点负责一部分槽位
bash
# 查看键属于哪个槽
redis-cli -p 7000 cluster keyslot mykey
# 手动分配槽位(在redis-cli中)
CLUSTER ADDSLOTS 0 1 2 3 ... 5460 # 节点1负责槽位0-54604. 分布式锁与限流:并发控制专家
分布式锁实现
bash
# 简单的分布式锁实现
# 获取锁(NX:不存在才设置,EX:过期时间)
SET lock:resource_name unique_value NX EX 30
# 释放锁(需要Lua脚本保证原子性)
EVAL "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" 1 lock:resource_name unique_valuepython
# Python实现Redis分布式锁
import redis
import time
import uuid
class RedisLock:
def __init__(self, redis_client):
self.redis = redis_client
def acquire(self, lock_name, expire_time=30):
"""获取锁"""
lock_value = str(uuid.uuid4())
result = self.redis.set(
f"lock:{lock_name}",
lock_value,
nx=True, # 不存在才设置
ex=expire_time # 过期时间
)
if result:
return lock_value
return None
def release(self, lock_name, lock_value):
"""释放锁"""
lua_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
return self.redis.eval(lua_script, 1, f"lock:{lock_name}", lock_value)
# 使用示例
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
lock = RedisLock(r)
# 获取锁
lock_value = lock.acquire("order_12345")
if lock_value:
try:
# 执行业务逻辑
print("处理订单...")
time.sleep(2)
finally:
# 释放锁
lock.release("order_12345", lock_value)
else:
print("获取锁失败")限流实现
bash
# 计数器限流(简单但有临界问题)
# 每分钟最多100次请求
SET rate_limit:user:1001 1 EX 60 NX
INCR rate_limit:user:1001
GET rate_limit:user:1001
# 滑动窗口限流(基于Sorted Set)
# 记录请求时间戳
ZADD rate_limit:api 1640995200 "request_1"
ZADD rate_limit:api 1640995201 "request_2"
# 统计最近60秒的请求数
ZCOUNT rate_limit:api 1640995140 1640995200
# 删除过期的请求记录
ZREMRANGEBYSCORE rate_limit:api 0 1640995140python
# Python实现滑动窗口限流
import redis
import time
class SlidingWindowRateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
def is_allowed(self, key, limit=100, window=60):
"""检查是否允许请求"""
now = int(time.time())
window_start = now - window
# 移除窗口外的记录
self.redis.zremrangebyscore(key, 0, window_start)
# 获取当前窗口内的请求数
current_count = self.redis.zcard(key)
if current_count < limit:
# 添加当前请求
self.redis.zadd(key, {str(now): now})
return True
else:
return False
# 使用示例
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
limiter = SlidingWindowRateLimiter(r)
# 检查API请求是否被允许
if limiter.is_allowed("api:user:1001", limit=100, window=60):
print("请求被允许")
else:
print("请求被限流")5. 实践项目:动手搭建高可用Redis
一主二从读写分离架构
bash
# 1. 创建配置文件
# master.conf
port 6379
bind 0.0.0.0
daemonize yes
pidfile /var/run/redis-master.pid
# slave1.conf
port 6380
bind 0.0.0.0
daemonize yes
pidfile /var/run/redis-slave1.pid
replicaof 127.0.0.1 6379
# slave2.conf
port 6381
bind 0.0.0.0
daemonize yes
pidfile /var/run/redis-slave2.pid
replicaof 127.0.0.1 6379
# 2. 启动所有实例
redis-server master.conf
redis-server slave1.conf
redis-server slave2.conf
# 3. 验证复制状态
redis-cli -p 6379 INFO replication
redis-cli -p 6380 INFO replication
redis-cli -p 6381 INFO replication三节点哨兵集群
bash
# 1. 创建哨兵配置文件
# sentinel1.conf
port 26379
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 10000
# sentinel2.conf
port 26380
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 10000
# sentinel3.conf
port 26381
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 10000
# 2. 启动哨兵
redis-sentinel sentinel1.conf
redis-sentinel sentinel2.conf
redis-sentinel sentinel3.conf
# 3. 测试故障转移
# 停止主库
redis-cli -p 6379 shutdown
# 查看哨兵状态
redis-cli -p 26379 SENTINEL masters分布式锁解决超卖问题
python
# 模拟商品库存扣减
import redis
import threading
import time
class InventoryService:
def __init__(self, redis_client):
self.redis = redis_client
self.lock = RedisLock(redis_client)
def purchase(self, product_id, user_id):
"""购买商品"""
lock_key = f"lock:product:{product_id}"
lock_value = None
try:
# 获取分布式锁
lock_value = self.lock.acquire(lock_key, 10)
if not lock_value:
return {"success": False, "message": "系统繁忙,请稍后重试"}
# 查询库存
stock = self.redis.get(f"stock:product:{product_id}")
if stock is None:
return {"success": False, "message": "商品不存在"}
stock = int(stock)
if stock <= 0:
return {"success": False, "message": "商品已售罄"}
# 扣减库存
self.redis.decr(f"stock:product:{product_id}")
# 记录购买记录
order_id = f"order_{int(time.time()*1000)}_{user_id}"
self.redis.hset(f"order:{order_id}", mapping={
"user_id": user_id,
"product_id": product_id,
"timestamp": time.time()
})
return {"success": True, "message": "购买成功", "order_id": order_id}
finally:
# 释放锁
if lock_value:
self.lock.release(lock_key, lock_value)
# 初始化库存
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
r.set("stock:product:1001", 100) # 商品1001库存100件
# 测试并发购买
service = InventoryService(r)
def worker(user_id):
result = service.purchase("1001", user_id)
print(f"用户{user_id}: {result}")
# 模拟多个用户同时购买
threads = []
for i in range(10):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# 检查最终库存
final_stock = r.get("stock:product:1001")
print(f"最终库存: {final_stock}")总结
本章节介绍了Redis的高可用与分布式技术:
- 主从复制实现数据备份和读写分离
- 哨兵系统提供自动故障转移
- 集群模式实现水平扩展
- 分布式锁和限流解决并发问题
掌握这些技术后,你可以构建生产级别的Redis应用,确保系统的高可用性和扩展性。在下一章节中,我们将学习Redis的实战应用和性能优化技巧。
记住,高可用架构就像建房子,地基打得好,房子才能稳固。Redis的高可用技术就是为你的应用打下坚实的基础,让系统在面对各种故障时依然能够稳定运行。