Skip to content

Redis高可用与分布式:构建坚不可摧的Redis集群

单台Redis服务器虽然很快,但在生产环境中,我们需要考虑高可用性和扩展性。这就像是从开小轿车升级到开飞机——需要更复杂的系统来保证稳定飞行。

1. 主从复制:Redis的"分身术"

主从复制就像孙悟空的分身术,一个主库(Master)可以变出多个从库(Slave),实现数据的备份和读写分离。

主从复制配置

bash
# 主库配置(redis-master.conf)
port 6379
bind 0.0.0.0
requirepass master_password
masterauth master_password

# 从库配置(redis-slave.conf)
port 6380
bind 0.0.0.0
slaveof 127.0.0.1 6379  # 连接主库
requirepass slave_password
masterauth master_password

# 启动主库
redis-server redis-master.conf

# 启动从库
redis-server redis-slave.conf

主从复制命令

bash
# 在从库上查看复制状态
INFO replication

# 动态设置主从关系(Redis 5.0前)
SLAVEOF master_ip master_port

# 动态设置主从关系(Redis 5.0+)
REPLICAOF master_ip master_port

# 断开主从关系
SLAVEOF NO ONE  # Redis 5.0前
REPLICAOF NO ONE  # Redis 5.0+

# 查看主库连接的从库
INFO replication

复制原理

  1. 全量复制:从库首次连接主库时,主库生成RDB文件发送给从库
  2. 增量复制:主库将写命令同步到从库(基于复制偏移量)
  3. 心跳检测:主从库定期发送心跳包检测连接状态

读写分离实现

python
# Python示例:简单的读写分离
import redis

class RedisCluster:
    def __init__(self):
        # 主库(写操作)
        self.master = redis.Redis(host='localhost', port=6379, password='master_password')
        # 从库(读操作)
        self.slave = redis.Redis(host='localhost', port=6380, password='slave_password')
    
    def write(self, key, value):
        """写操作到主库"""
        return self.master.set(key, value)
    
    def read(self, key):
        """读操作从从库"""
        return self.slave.get(key)

# 使用示例
cluster = RedisCluster()
cluster.write('user:1001', '张三')
name = cluster.read('user:1001')

2. 哨兵(Sentinel):Redis的"守护天使"

哨兵系统就像Redis集群的守护天使,时刻监控主从库的健康状态,并在主库宕机时自动进行故障转移。

哨兵配置

bash
# 哨兵配置文件(sentinel.conf)
port 26379
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel auth-pass mymaster master_password
sentinel down-after-milliseconds mymaster 5000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 10000

# 启动哨兵
redis-sentinel sentinel.conf
# 或者
redis-server sentinel.conf --sentinel

哨兵配置项说明

bash
# 监控主库
sentinel monitor <master-name> <ip> <port> <quorum>
# <master-name>: 主库名称
# <ip> <port>: 主库地址
# <quorum>: 判定主库下线所需的哨兵数量

# 主库密码
sentinel auth-pass <master-name> <password>

# 判定主库下线的时间(毫秒)
sentinel down-after-milliseconds <master-name> 5000

# 故障转移时,同时进行同步的从库数量
sentinel parallel-syncs <master-name> 1

# 故障转移超时时间(毫秒)
sentinel failover-timeout <master-name> 10000

哨兵命令

bash
# 连接哨兵
redis-cli -p 26379

# 查看监控的主库
SENTINEL masters

# 查看主库的从库
SENTINEL slaves mymaster

# 查看哨兵信息
SENTINEL sentinels mymaster

# 手动故障转移
SENTINEL failover mymaster

故障转移流程

  1. 哨兵检测到主库下线
  2. 多个哨兵确认主库下线
  3. 哨兵选举出领导者
  4. 领导者选择新的主库(从库中选举)
  5. 其他从库切换到新主库
  6. 通知客户端新的主库地址

3. 集群(Cluster):Redis的"分布式军团"

Redis集群是Redis的分布式解决方案,将数据分散到多个节点上,实现水平扩展和高可用。

集群搭建

bash
# 创建集群配置文件(redis-cluster-7000.conf)
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 5000
appendonly yes

# 其他节点配置(7001-7005.conf)
port 7001  # 端口依次递增
cluster-enabled yes
cluster-config-file nodes-7001.conf  # 文件名对应端口
cluster-node-timeout 5000
appendonly yes

# 启动所有节点
redis-server redis-cluster-7000.conf
redis-server redis-cluster-7001.conf
redis-server redis-cluster-7002.conf
redis-server redis-cluster-7003.conf
redis-server redis-cluster-7004.conf
redis-server redis-cluster-7005.conf

# 创建集群(至少3个主节点)
redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 --cluster-replicas 1

集群命令

bash
# 连接集群节点
redis-cli -c -p 7000  # -c参数启用集群模式

# 查看集群信息
CLUSTER INFO

# 查看集群节点
CLUSTER NODES

# 查看键属于哪个槽
CLUSTER KEYSLOT mykey

# 添加节点
redis-cli --cluster add-node 127.0.0.1:7006 127.0.0.1:7000

# 删除节点
redis-cli --cluster del-node 127.0.0.1:7000 <node-id>

# 重新分片
redis-cli --cluster reshard 127.0.0.1:7000

数据分布原理

Redis集群使用哈希槽(Hash Slot)来分布数据:

  • 默认16384个哈希槽
  • 每个键通过CRC16算法计算槽位:CRC16(key) % 16384
  • 每个节点负责一部分槽位
bash
# 查看键属于哪个槽
redis-cli -p 7000 cluster keyslot mykey

# 手动分配槽位(在redis-cli中)
CLUSTER ADDSLOTS 0 1 2 3 ... 5460  # 节点1负责槽位0-5460

4. 分布式锁与限流:并发控制专家

分布式锁实现

bash
# 简单的分布式锁实现
# 获取锁(NX:不存在才设置,EX:过期时间)
SET lock:resource_name unique_value NX EX 30

# 释放锁(需要Lua脚本保证原子性)
EVAL "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" 1 lock:resource_name unique_value
python
# Python实现Redis分布式锁
import redis
import time
import uuid

class RedisLock:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def acquire(self, lock_name, expire_time=30):
        """获取锁"""
        lock_value = str(uuid.uuid4())
        result = self.redis.set(
            f"lock:{lock_name}", 
            lock_value, 
            nx=True,  # 不存在才设置
            ex=expire_time  # 过期时间
        )
        if result:
            return lock_value
        return None
    
    def release(self, lock_name, lock_value):
        """释放锁"""
        lua_script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
        return self.redis.eval(lua_script, 1, f"lock:{lock_name}", lock_value)

# 使用示例
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
lock = RedisLock(r)

# 获取锁
lock_value = lock.acquire("order_12345")
if lock_value:
    try:
        # 执行业务逻辑
        print("处理订单...")
        time.sleep(2)
    finally:
        # 释放锁
        lock.release("order_12345", lock_value)
else:
    print("获取锁失败")

限流实现

bash
# 计数器限流(简单但有临界问题)
# 每分钟最多100次请求
SET rate_limit:user:1001 1 EX 60 NX
INCR rate_limit:user:1001
GET rate_limit:user:1001

# 滑动窗口限流(基于Sorted Set)
# 记录请求时间戳
ZADD rate_limit:api 1640995200 "request_1"
ZADD rate_limit:api 1640995201 "request_2"

# 统计最近60秒的请求数
ZCOUNT rate_limit:api 1640995140 1640995200

# 删除过期的请求记录
ZREMRANGEBYSCORE rate_limit:api 0 1640995140
python
# Python实现滑动窗口限流
import redis
import time

class SlidingWindowRateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def is_allowed(self, key, limit=100, window=60):
        """检查是否允许请求"""
        now = int(time.time())
        window_start = now - window
        
        # 移除窗口外的记录
        self.redis.zremrangebyscore(key, 0, window_start)
        
        # 获取当前窗口内的请求数
        current_count = self.redis.zcard(key)
        
        if current_count < limit:
            # 添加当前请求
            self.redis.zadd(key, {str(now): now})
            return True
        else:
            return False

# 使用示例
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
limiter = SlidingWindowRateLimiter(r)

# 检查API请求是否被允许
if limiter.is_allowed("api:user:1001", limit=100, window=60):
    print("请求被允许")
else:
    print("请求被限流")

5. 实践项目:动手搭建高可用Redis

一主二从读写分离架构

bash
# 1. 创建配置文件
# master.conf
port 6379
bind 0.0.0.0
daemonize yes
pidfile /var/run/redis-master.pid

# slave1.conf
port 6380
bind 0.0.0.0
daemonize yes
pidfile /var/run/redis-slave1.pid
replicaof 127.0.0.1 6379

# slave2.conf
port 6381
bind 0.0.0.0
daemonize yes
pidfile /var/run/redis-slave2.pid
replicaof 127.0.0.1 6379

# 2. 启动所有实例
redis-server master.conf
redis-server slave1.conf
redis-server slave2.conf

# 3. 验证复制状态
redis-cli -p 6379 INFO replication
redis-cli -p 6380 INFO replication
redis-cli -p 6381 INFO replication

三节点哨兵集群

bash
# 1. 创建哨兵配置文件
# sentinel1.conf
port 26379
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 10000

# sentinel2.conf
port 26380
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 10000

# sentinel3.conf
port 26381
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 10000

# 2. 启动哨兵
redis-sentinel sentinel1.conf
redis-sentinel sentinel2.conf
redis-sentinel sentinel3.conf

# 3. 测试故障转移
# 停止主库
redis-cli -p 6379 shutdown

# 查看哨兵状态
redis-cli -p 26379 SENTINEL masters

分布式锁解决超卖问题

python
# 模拟商品库存扣减
import redis
import threading
import time

class InventoryService:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.lock = RedisLock(redis_client)
    
    def purchase(self, product_id, user_id):
        """购买商品"""
        lock_key = f"lock:product:{product_id}"
        lock_value = None
        
        try:
            # 获取分布式锁
            lock_value = self.lock.acquire(lock_key, 10)
            if not lock_value:
                return {"success": False, "message": "系统繁忙,请稍后重试"}
            
            # 查询库存
            stock = self.redis.get(f"stock:product:{product_id}")
            if stock is None:
                return {"success": False, "message": "商品不存在"}
            
            stock = int(stock)
            if stock <= 0:
                return {"success": False, "message": "商品已售罄"}
            
            # 扣减库存
            self.redis.decr(f"stock:product:{product_id}")
            
            # 记录购买记录
            order_id = f"order_{int(time.time()*1000)}_{user_id}"
            self.redis.hset(f"order:{order_id}", mapping={
                "user_id": user_id,
                "product_id": product_id,
                "timestamp": time.time()
            })
            
            return {"success": True, "message": "购买成功", "order_id": order_id}
            
        finally:
            # 释放锁
            if lock_value:
                self.lock.release(lock_key, lock_value)

# 初始化库存
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
r.set("stock:product:1001", 100)  # 商品1001库存100件

# 测试并发购买
service = InventoryService(r)

def worker(user_id):
    result = service.purchase("1001", user_id)
    print(f"用户{user_id}: {result}")

# 模拟多个用户同时购买
threads = []
for i in range(10):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# 检查最终库存
final_stock = r.get("stock:product:1001")
print(f"最终库存: {final_stock}")

总结

本章节介绍了Redis的高可用与分布式技术:

  • 主从复制实现数据备份和读写分离
  • 哨兵系统提供自动故障转移
  • 集群模式实现水平扩展
  • 分布式锁和限流解决并发问题

掌握这些技术后,你可以构建生产级别的Redis应用,确保系统的高可用性和扩展性。在下一章节中,我们将学习Redis的实战应用和性能优化技巧。

记住,高可用架构就像建房子,地基打得好,房子才能稳固。Redis的高可用技术就是为你的应用打下坚实的基础,让系统在面对各种故障时依然能够稳定运行。