当前位置: 首页 > news >正文

从零实现一个分布式锁:Redis与Zookeeper

前言

你有没有想过:在分布式系统中,多个服务同时操作同一份数据时,怎么保证数据一致性?

比如秒杀系统中,1000个人同时抢1个商品,怎么保证不会超卖?

分布式锁是解决分布式环境下资源竞争的核心方案。

今天我们用C语言从零实现两种分布式锁:

1. 基于Redis的分布式锁(Redlock算法)
2. 基于Zookeeper的分布式锁(临时顺序节点)

---

一、分布式锁核心原理

1. Redis分布式锁

```
┌─────────┐ SET lock_key value NX PX 10000 ┌─────────┐
│ 客户端A │ ───────────────────────────────────────→ │ Redis │
└─────────┘ └─────────┘
│ │
│ 执行业务逻辑 │
│ │
▼ ▼
┌─────────┐ DEL lock_key ┌─────────┐
│ 客户端A │ ───────────────────────────────────────→ │ Redis │
└─────────┘ └─────────┘
```

2. Zookeeper分布式锁

```
┌─────────┐ 创建临时顺序节点 ┌─────────────┐
│ 客户端A │ ──────────────────────────────────────→ │ Zookeeper │
└─────────┘ │ /locks/ │
│ │ /lock-001│
│ 检查是否最小节点 │ /lock-002│
▼ └─────────────┘
┌─────────┐ 是 → 获得锁
│ 客户端A │ ─── 否 → 监听前一个节点
└─────────┘
```

3. 核心要求

要求 说明
互斥性 同一时刻只有一个客户端持有锁
防死锁 锁有超时机制,客户端异常时自动释放
可重入 同一客户端可重复获取同一把锁
高可用 锁服务本身不能单点故障

---

二、完整代码实现

1. 通用数据结构

```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>

#define MAX_KEY_LEN 128
#define MAX_VALUE_LEN 256
#define MAX_HOST_LEN 64
#define MAX_RETRY 3

// 分布式锁统一接口
typedef struct distributed_lock {
char lock_key[MAX_KEY_LEN];
char lock_value[MAX_VALUE_LEN];
int acquired;
time_t expire_time;
void *backend_data;
int (*lock)(struct distributed_lock *self, int timeout_ms);
int (*unlock)(struct distributed_lock *self);
int (*renew)(struct distributed_lock *self);
void (*destroy)(struct distributed_lock *self);
} distributed_lock_t;
```

2. Redis客户端基础

```c
// Redis连接结构
typedef struct redis_connection {
char host[MAX_HOST_LEN];
int port;
int sock_fd;
} redis_conn_t;

// Redis命令执行
int redis_connect(redis_conn_t *conn, const char *host, int port) {
conn->sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (conn->sock_fd < 0) return -1;

struct hostent *server = gethostbyname(host);
if (!server) {
close(conn->sock_fd);
return -1;
}

struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
memcpy(&addr.sin_addr.s_addr, server->h_addr, server->h_length);

if (connect(conn->sock_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
close(conn->sock_fd);
return -1;
}

strcpy(conn->host, host);
conn->port = port;
return 0;
}

int redis_command(redis_conn_t *conn, const char *cmd, char *response, int resp_size) {
send(conn->sock_fd, cmd, strlen(cmd), 0);
send(conn->sock_fd, "\r\n", 2, 0);

int n = recv(conn->sock_fd, response, resp_size - 1, 0);
if (n > 0) {
response[n] = '\0';
return n;
}
return -1;
}

void redis_disconnect(redis_conn_t *conn) {
if (conn->sock_fd > 0) {
close(conn->sock_fd);
conn->sock_fd = -1;
}
}
```

3. Redis分布式锁实现

```c
// Redis锁结构
typedef struct redis_lock {
distributed_lock_t base;
redis_conn_t *connections; // 多Redis节点(Redlock)
int node_count;
int quorum; // 多数派
long long expire_ms;
} redis_lock_t;

// 生成唯一锁值(客户端标识)
void generate_lock_value(char *buf, int size) {
pid_t pid = getpid();
time_t now = time(NULL);
snprintf(buf, size, "%d-%ld-%d", pid, now, rand());
}

// 单个Redis节点加锁
int redis_node_lock(redis_conn_t *conn, const char *key, const char *value,
long long ttl_ms, char *error) {
char cmd[512];
snprintf(cmd, sizeof(cmd),
"SET %s %s NX PX %lld", key, value, ttl_ms);

char response[256];
if (redis_command(conn, cmd, response, sizeof(response)) < 0) {
strcpy(error, "redis command failed");
return -1;
}

// Redis返回+OK表示成功
if (strncmp(response, "+OK", 3) == 0) {
return 0;
}

strcpy(error, response);
return -1;
}

// Redis节点解锁
int redis_node_unlock(redis_conn_t *conn, const char *key, const char *value) {
// 使用Lua脚本保证原子性(检查value匹配才删除)
char cmd[512];
snprintf(cmd, sizeof(cmd),
"EVAL \"if redis.call('get', KEYS[1]) == ARGV[1] then "
"return redis.call('del', KEYS[1]) else return 0 end\" 1 %s %s",
key, value);

char response[256];
if (redis_command(conn, cmd, response, sizeof(response)) < 0) {
return -1;
}

return (strstr(response, ":1") != NULL) ? 0 : -1;
}

// Redlock加锁(多Redis节点)
int redlock_lock(distributed_lock_t *base, int timeout_ms) {
redis_lock_t *lock = (redis_lock_t*)base;
time_t start = time(NULL);
int acquired_count = 0;

// 计算每个节点的超时时间(总超时/节点数)
int per_node_timeout = timeout_ms / lock->node_count;
if (per_node_timeout < 10) per_node_timeout = 10;

// 尝试在所有节点上加锁
for (int i = 0; i < lock->node_count; i++) {
char error[256];
int ret = redis_node_lock(&lock->connections[i],
lock->base.lock_key,
lock->base.lock_value,
lock->expire_ms, error);
if (ret == 0) {
acquired_count++;
}

// 检查是否超时
time_t now = time(NULL);
if ((now - start) * 1000 > timeout_ms) {
break;
}
}

// 判断是否获得多数派
if (acquired_count >= lock->quorum) {
lock->base.acquired = 1;
lock->base.expire_time = time(NULL) + lock->expire_ms / 1000;
return 0;
}

// 加锁失败,释放已获取的锁
for (int i = 0; i < lock->node_count; i++) {
redis_node_unlock(&lock->connections[i],
lock->base.lock_key,
lock->base.lock_value);
}

return -1;
}

// Redlock解锁
int redlock_unlock(distributed_lock_t *base) {
redis_lock_t *lock = (redis_lock_t*)base;
if (!lock->base.acquired) return -1;

for (int i = 0; i < lock->node_count; i++) {
redis_node_unlock(&lock->connections[i],
lock->base.lock_key,
lock->base.lock_value);
}

lock->base.acquired = 0;
return 0;
}

// 锁续期
int redlock_renew(distributed_lock_t *base) {
redis_lock_t *lock = (redis_lock_t*)base;
if (!lock->base.acquired) return -1;

// 在大部分节点上续期
int renewed = 0;
for (int i = 0; i < lock->node_count; i++) {
// 使用PEXPIRE续期
char cmd[256];
snprintf(cmd, sizeof(cmd), "PEXPIRE %s %lld",
lock->base.lock_key, lock->expire_ms);
char response[64];
if (redis_command(&lock->connections[i], cmd, response, sizeof(response)) >= 0) {
if (strstr(response, ":1")) renewed++;
}
}

if (renewed >= lock->quorum) {
lock->base.expire_time = time(NULL) + lock->expire_ms / 1000;
return 0;
}
return -1;
}

// 创建Redis分布式锁
distributed_lock_t *create_redis_lock(const char *key,
const char **hosts, const int *ports,
int node_count, long long expire_ms) {
redis_lock_t *lock = malloc(sizeof(redis_lock_t));
memset(lock, 0, sizeof(redis_lock_t));

strcpy(lock->base.lock_key, key);
generate_lock_value(lock->base.lock_value, sizeof(lock->base.lock_value));
lock->base.acquired = 0;
lock->base.expire_time = 0;
lock->node_count = node_count;
lock->quorum = node_count / 2 + 1;
lock->expire_ms = expire_ms;

// 连接所有Redis节点
lock->connections = malloc(sizeof(redis_conn_t) * node_count);
for (int i = 0; i < node_count; i++) {
if (redis_connect(&lock->connections[i], hosts[i], ports[i]) < 0) {
// 连接失败处理
lock->connections[i].sock_fd = -1;
}
}

lock->base.lock = redlock_lock;
lock->base.unlock = redlock_unlock;
lock->base.renew = redlock_renew;
lock->base.destroy = NULL; // 会在外部处理

return (distributed_lock_t*)lock;
}
```

4. Zookeeper分布式锁实现

```c
// Zookeeper节点结构(模拟)
typedef struct zk_node {
char path[256];
char data[256];
int ephemeral;
int sequential;
int seq_num;
struct zk_node *children;
struct zk_node *next;
} zk_node_t;

// Zookeeper模拟服务器
typedef struct zk_server {
zk_node_t *root;
pthread_mutex_t mutex;
} zk_server_t;

zk_server_t *g_zk_server = NULL;

// 初始化模拟ZK
void zk_init() {
g_zk_server = malloc(sizeof(zk_server_t));
g_zk_server->root = malloc(sizeof(zk_node_t));
strcpy(g_zk_server->root->path, "/");
g_zk_server->root->children = NULL;
pthread_mutex_init(&g_zk_server->mutex, NULL);
}

// 创建ZNode
zk_node_t *zk_create_node(const char *path, int ephemeral, int sequential) {
zk_node_t *node = malloc(sizeof(zk_node_t));
strcpy(node->path, path);
node->ephemeral = ephemeral;
node->sequential = sequential;
node->seq_num = 0;
node->children = NULL;
node->next = NULL;

if (sequential) {
node->seq_num = rand() % 10000;
}

return node;
}

// 创建顺序节点
int zk_create_sequential(const char *base_path, const char *data, char *result_path) {
pthread_mutex_lock(&g_zk_server->mutex);

// 查找父节点
zk_node_t *parent = g_zk_server->root;
char *path_copy = strdup(base_path);
char *token = strtok(path_copy, "/");
while (token) {
zk_node_t *child = parent->children;
int found = 0;
while (child) {
if (strcmp(child->path + 1, token) == 0) {
parent = child;
found = 1;
break;
}
child = child->next;
}
if (!found) {
// 创建中间节点
char new_path[256];
snprintf(new_path, sizeof(new_path), "%s/%s", parent->path, token);
zk_node_t *new_node = zk_create_node(new_path, 0, 0);
new_node->next = parent->children;
parent->children = new_node;
parent = new_node;
}
token = strtok(NULL, "/");
}
free(path_copy);

// 创建顺序节点
int seq = parent->children ? parent->children->seq_num + 1 : 1;
char seq_path[256];
snprintf(seq_path, sizeof(seq_path), "%s/lock-%05d", base_path, seq);

zk_node_t *node = zk_create_node(seq_path, 1, 1);
node->seq_num = seq;
strcpy(node->data, data);
node->next = parent->children;
parent->children = node;

strcpy(result_path, seq_path);

pthread_mutex_unlock(&g_zk_server->mutex);
return seq;
}

// 获取子节点列表(按顺序)
void zk_get_children(const char *path, char ***children, int *count) {
pthread_mutex_lock(&g_zk_server->mutex);

// 查找节点
zk_node_t *node = g_zk_server->root;
char *path_copy = strdup(path);
char *token = strtok(path_copy, "/");
while (token) {
zk_node_t *child = node->children;
int found = 0;
while (child) {
if (strcmp(child->path + 1, token) == 0) {
node = child;
found = 1;
break;
}
child = child->next;
}
if (!found) {
free(path_copy);
*count = 0;
pthread_mutex_unlock(&g_zk_server->mutex);
return;
}
token = strtok(NULL, "/");
}
free(path_copy);

// 收集子节点
*count = 0;
zk_node_t *child = node->children;
while (child) {
(*count)++;
child = child->next;
}

*children = malloc(sizeof(char*) * (*count));
child = node->children;
int idx = 0;
while (child) {
(*children)[idx++] = strdup(child->path);
child = child->next;
}

pthread_mutex_unlock(&g_zk_server->mutex);
}

// 删除节点
void zk_delete(const char *path) {
pthread_mutex_lock(&g_zk_server->mutex);

// 查找父节点
zk_node_t *parent = g_zk_server->root;
char *path_copy = strdup(path);
char *last_token = NULL;
char *token = strtok(path_copy, "/");
while (token) {
last_token = token;
zk_node_t *child = parent->children;
int found = 0;
while (child) {
if (strcmp(child->path + 1, token) == 0) {
parent = child;
found = 1;
break;
}
child = child->next;
}
if (!found) {
free(path_copy);
pthread_mutex_unlock(&g_zk_server->mutex);
return;
}
token = strtok(NULL, "/");
}

// 删除节点
zk_node_t *prev = NULL;
zk_node_t *child = parent->children;
while (child) {
if (strcmp(child->path, path) == 0) {
if (prev) {
prev->next = child->next;
} else {
parent->children = child->next;
}
free(child);
break;
}
prev = child;
child = child->next;
}

free(path_copy);
pthread_mutex_unlock(&g_zk_server->mutex);
}

// Zookeeper锁结构
typedef struct zk_lock {
distributed_lock_t base;
char lock_path[256];
char node_path[256];
char watch_path[256];
int node_seq;
} zk_lock_t;

// Zookeeper加锁
int zk_lock(distributed_lock_t *base, int timeout_ms) {
zk_lock_t *lock = (zk_lock_t*)base;
time_t start = time(NULL);

// 创建临时顺序节点
snprintf(lock->lock_path, sizeof(lock->lock_path), "/locks/%s",
lock->base.lock_key);

char data[256];
snprintf(data, sizeof(data), "client-%d", getpid());
char result_path[256];
int seq = zk_create_sequential(lock->lock_path, data, result_path);
if (seq < 0) return -1;

strcpy(lock->node_path, result_path);
lock->node_seq = seq;

// 检查是否是最小节点
while (1) {
char **children;
int count;
zk_get_children(lock->lock_path, &children, &count);

// 找到最小序号
int min_seq = 99999;
for (int i = 0; i < count; i++) {
int s;
sscanf(children[i], "%*[^-]-%d", &s);
if (s < min_seq) min_seq = s;
free(children[i]);
}
free(children);

if (lock->node_seq == min_seq) {
// 获得锁
lock->base.acquired = 1;
return 0;
}

// 监听前一个节点(简化:轮询)
usleep(100 * 1000);

time_t now = time(NULL);
if ((now - start) * 1000 > timeout_ms) {
zk_delete(lock->node_path);
return -1;
}
}
}

// Zookeeper解锁
int zk_unlock(distributed_lock_t *base) {
zk_lock_t *lock = (zk_lock_t*)base;
if (!lock->base.acquired) return -1;

zk_delete(lock->node_path);
lock->base.acquired = 0;
return 0;
}

// 创建Zookeeper分布式锁
distributed_lock_t *create_zk_lock(const char *key) {
zk_lock_t *lock = malloc(sizeof(zk_lock_t));
memset(lock, 0, sizeof(zk_lock_t));

strcpy(lock->base.lock_key, key);
lock->base.acquired = 0;
lock->base.lock = zk_lock;
lock->base.unlock = zk_unlock;
lock->base.renew = NULL; // ZK锁不需要续期(会话保持)

// 确保锁路径存在
char path[256];
snprintf(path, sizeof(path), "/locks/%s", key);
strcpy(lock->lock_path, path);

return (distributed_lock_t*)lock;
}
```

5. 使用示例

```c
// 模拟共享资源
int shared_counter = 0;
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;

void *worker_thread(void *arg) {
distributed_lock_t *lock = (distributed_lock_t*)arg;

for (int i = 0; i < 10; i++) {
// 加锁
if (lock->lock(lock, 5000) == 0) {
// 临界区
pthread_mutex_lock(&counter_mutex);
shared_counter++;
int current = shared_counter;
pthread_mutex_unlock(&counter_mutex);

printf("Thread %lu: counter = %d\n", pthread_self(), current);
usleep(10000);

// 解锁
lock->unlock(lock);
} else {
printf("Thread %lu: 获取锁失败\n", pthread_self());
}
usleep(5000);
}
return NULL;
}

int main() {
printf("=== 分布式锁测试 ===\n\n");
srand(time(NULL));

// 初始化Zookeeper模拟服务器
zk_init();

// 测试Redis锁
printf("--- Redis分布式锁 ---\n");
const char *hosts[] = {"127.0.0.1", "127.0.0.1", "127.0.0.1"};
int ports[] = {6379, 6380, 6381};

distributed_lock_t *redis_lock = create_redis_lock("my_resource",
hosts, ports, 3, 10000);

// 创建多个线程竞争锁
pthread_t threads[5];
for (int i = 0; i < 5; i++) {
pthread_create(&threads[i], NULL, worker_thread, redis_lock);
}

for (int i = 0; i < 5; i++) {
pthread_join(threads[i], NULL);
}

// 测试Zookeeper锁
printf("\n--- Zookeeper分布式锁 ---\n");
shared_counter = 0;

distributed_lock_t *zk_lock = create_zk_lock("my_resource");

for (int i = 0; i < 5; i++) {
pthread_create(&threads[i], NULL, worker_thread, zk_lock);
}

for (int i = 0; i < 5; i++) {
pthread_join(threads[i], NULL);
}

return 0;
}
```

---

三、编译和运行

```bash
gcc -o distributed_lock distributed_lock.c -lpthread
./distributed_lock
```

---

四、Redis vs Zookeeper分布式锁

特性 Redis Zookeeper
一致性 最终一致 强一致(ZAB协议)
性能 高(内存操作) 低(需要选举)
可用性 高(主从切换) 高(集群)
死锁预防 超时自动释放 临时节点自动删除
实现复杂度 简单 复杂

---

五、常见问题

问题 解决方案
锁过期业务未完成 锁续期(看门狗机制)
Redis主从切换锁丢失 Redlock算法
死锁 设置合理的超时时间
可重入 在锁值中记录持有者信息

---

六、总结

通过这篇文章,你学会了:

· 分布式锁的核心要求(互斥、防死锁、高可用)
· Redis分布式锁(SET NX PX + Redlock)
· Zookeeper分布式锁(临时顺序节点)
· 锁续期、超时处理
· 完整的测试示例

分布式锁是分布式系统的必备工具。掌握它,你就理解了秒杀系统、分布式任务调度的核心设计。

下一篇预告:《从零实现一个分布式事务:TCC与Saga模式》

---

评论区分享一下你用分布式锁解决过什么场景~

http://www.jsqmd.com/news/1064679/

相关文章:

  • 2026年当前,上海专业的开式冷却塔优质厂商如何选?深度解析绍兴联冷深度冷却塔有限公司 - 品牌鉴赏官2026
  • Mac鼠标终极优化指南:5分钟让普通鼠标媲美苹果触控板
  • 如何快速解决AMD GPU驱动兼容性问题:终极ROCm版本管理指南
  • 3D点云检测:多尺度注意力机制如何解决稀疏与无序挑战
  • 深圳离婚纠纷律师联系方式推荐 专业处理大额财产抚养权纠纷 - 外贸老黄
  • 2026遵义防水补漏避坑指南:卫生间/厨房/阳台/屋顶/地下室漏水检测维修全攻略,正规施工+透明报价+口碑榜靠谱服务商推荐 - 安佳防水
  • PinWin窗口置顶工具:Windows多任务处理的终极解决方案
  • UniCon:基于谱更新的高效对比学习对齐方法解析与实践
  • 2026年东莞板材推荐榜单:高定抗菌/生态阻燃/防虫抗蚁/环保防火/家装别墅板材十大品牌深度解析 - 品牌发掘
  • 5分钟快速上手:yuzu Switch模拟器零基础完整教程
  • 深圳离婚律师联系方式推荐 资深婚家律师许阿赛执业服务指南 - 外贸老黄
  • 2026邯郸防水补漏避坑指南:卫生间/厨房/阳台/屋顶/地下室漏水检测维修全攻略,正规施工+透明报价+口碑榜靠谱服务商推荐 - 安佳防水
  • 终极网盘直链下载指南:免费解锁九大网盘高速下载的秘密
  • 2026遂宁防水补漏避坑指南:卫生间/厨房/阳台/屋顶/地下室漏水检测维修全攻略,正规施工+透明报价+口碑榜靠谱服务商推荐 - 安佳防水
  • 天津遗产分割律师联系方式推荐 深耕家事领域十六载熟悉本地司法实践 - 外贸老黄
  • 配置claude code(命令行)并接入deepseek
  • 图谱RAG太笨重,SAG轻量多跳性能暴涨30%
  • 2026行业内靠谱的税务犯罪刑事律师口碑推荐 - 品牌排行榜
  • GRIP模型:动态规划提升问答系统性能
  • 2026鹰潭漏水检测维修精选优质服务商TOP5推荐!卫生间漏水/厨房漏水/屋顶天花板漏水/阳台漏水/地下室漏水防水补漏检测维修-正规防水补漏公司优选口碑榜测评推荐 - 即刻修防水
  • 天津遗产继承律所联系方式推荐 京津冀跨区域继承纠纷处理参考指南 - 外贸老黄
  • Switch注入终极指南:TegraRcmGUI让复杂操作变简单
  • 2026年近期如何选择新疆的50吨地磅生产厂商:专业推荐与电话指南 - 品牌鉴赏官2026
  • 日语金融文本嵌入基准JFinTEB:构建、评估与实战指南
  • 恶劣天气下多模态全景分割技术:原理、挑战与URVIS 2026实战解析
  • 2026国内比较好的pvdf管优质厂家排行 - 品牌排行榜
  • 5个实战技巧:快速掌握awesome-math数学资源宝库的完整指南
  • 2026邵阳防水补漏避坑指南:卫生间/厨房/阳台/屋顶/地下室漏水检测维修全攻略,正规施工+透明报价+口碑榜靠谱服务商推荐 - 安佳防水
  • 深圳遗嘱咨询律师联系方式推荐 家理许阿赛专业遗嘱法律服务指引 - 外贸老黄
  • AVR64DU系列MCU:集成USB的8位微控制器开发实战指南