从零实现一个分布式锁:Redis与Zookeeper
前言
你有没有想过:在分布式系统中,多个服务同时操作同一份数据时,怎么保证数据一致性?
比如秒杀系统中,1000个人同时抢1个商品,怎么保证不会超卖?
分布式锁是解决分布式环境下资源竞争的核心方案。
今天我们用C语言从零实现两种分布式锁:
1. 基于Redis的分布式锁(Redlock算法)
2. 基于Zookeeper的分布式锁(临时顺序节点)
---
一、分布式锁核心原理
1. Redis分布式锁
```
┌─────────┐ SET lock_key value NX PX 10000 ┌─────────┐
│ 客户端A │ ───────────────────────────────────────→ │ Redis │
└─────────┘ └─────────┘
│ │
│ 执行业务逻辑 │
│ │
▼ ▼
┌─────────┐ DEL lock_key ┌─────────┐
│ 客户端A │ ───────────────────────────────────────→ │ Redis │
└─────────┘ └─────────┘
```
2. Zookeeper分布式锁
```
┌─────────┐ 创建临时顺序节点 ┌─────────────┐
│ 客户端A │ ──────────────────────────────────────→ │ Zookeeper │
└─────────┘ │ /locks/ │
│ │ /lock-001│
│ 检查是否最小节点 │ /lock-002│
▼ └─────────────┘
┌─────────┐ 是 → 获得锁
│ 客户端A │ ─── 否 → 监听前一个节点
└─────────┘
```
3. 核心要求
要求 说明
互斥性 同一时刻只有一个客户端持有锁
防死锁 锁有超时机制,客户端异常时自动释放
可重入 同一客户端可重复获取同一把锁
高可用 锁服务本身不能单点故障
---
二、完整代码实现
1. 通用数据结构
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#define MAX_KEY_LEN 128
#define MAX_VALUE_LEN 256
#define MAX_HOST_LEN 64
#define MAX_RETRY 3
// 分布式锁统一接口
typedef struct distributed_lock {
char lock_key[MAX_KEY_LEN];
char lock_value[MAX_VALUE_LEN];
int acquired;
time_t expire_time;
void *backend_data;
int (*lock)(struct distributed_lock *self, int timeout_ms);
int (*unlock)(struct distributed_lock *self);
int (*renew)(struct distributed_lock *self);
void (*destroy)(struct distributed_lock *self);
} distributed_lock_t;
```
2. Redis客户端基础
```c
// Redis连接结构
typedef struct redis_connection {
char host[MAX_HOST_LEN];
int port;
int sock_fd;
} redis_conn_t;
// Redis命令执行
int redis_connect(redis_conn_t *conn, const char *host, int port) {
conn->sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (conn->sock_fd < 0) return -1;
struct hostent *server = gethostbyname(host);
if (!server) {
close(conn->sock_fd);
return -1;
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
memcpy(&addr.sin_addr.s_addr, server->h_addr, server->h_length);
if (connect(conn->sock_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
close(conn->sock_fd);
return -1;
}
strcpy(conn->host, host);
conn->port = port;
return 0;
}
int redis_command(redis_conn_t *conn, const char *cmd, char *response, int resp_size) {
send(conn->sock_fd, cmd, strlen(cmd), 0);
send(conn->sock_fd, "\r\n", 2, 0);
int n = recv(conn->sock_fd, response, resp_size - 1, 0);
if (n > 0) {
response[n] = '\0';
return n;
}
return -1;
}
void redis_disconnect(redis_conn_t *conn) {
if (conn->sock_fd > 0) {
close(conn->sock_fd);
conn->sock_fd = -1;
}
}
```
3. Redis分布式锁实现
```c
// Redis锁结构
typedef struct redis_lock {
distributed_lock_t base;
redis_conn_t *connections; // 多Redis节点(Redlock)
int node_count;
int quorum; // 多数派
long long expire_ms;
} redis_lock_t;
// 生成唯一锁值(客户端标识)
void generate_lock_value(char *buf, int size) {
pid_t pid = getpid();
time_t now = time(NULL);
snprintf(buf, size, "%d-%ld-%d", pid, now, rand());
}
// 单个Redis节点加锁
int redis_node_lock(redis_conn_t *conn, const char *key, const char *value,
long long ttl_ms, char *error) {
char cmd[512];
snprintf(cmd, sizeof(cmd),
"SET %s %s NX PX %lld", key, value, ttl_ms);
char response[256];
if (redis_command(conn, cmd, response, sizeof(response)) < 0) {
strcpy(error, "redis command failed");
return -1;
}
// Redis返回+OK表示成功
if (strncmp(response, "+OK", 3) == 0) {
return 0;
}
strcpy(error, response);
return -1;
}
// Redis节点解锁
int redis_node_unlock(redis_conn_t *conn, const char *key, const char *value) {
// 使用Lua脚本保证原子性(检查value匹配才删除)
char cmd[512];
snprintf(cmd, sizeof(cmd),
"EVAL \"if redis.call('get', KEYS[1]) == ARGV[1] then "
"return redis.call('del', KEYS[1]) else return 0 end\" 1 %s %s",
key, value);
char response[256];
if (redis_command(conn, cmd, response, sizeof(response)) < 0) {
return -1;
}
return (strstr(response, ":1") != NULL) ? 0 : -1;
}
// Redlock加锁(多Redis节点)
int redlock_lock(distributed_lock_t *base, int timeout_ms) {
redis_lock_t *lock = (redis_lock_t*)base;
time_t start = time(NULL);
int acquired_count = 0;
// 计算每个节点的超时时间(总超时/节点数)
int per_node_timeout = timeout_ms / lock->node_count;
if (per_node_timeout < 10) per_node_timeout = 10;
// 尝试在所有节点上加锁
for (int i = 0; i < lock->node_count; i++) {
char error[256];
int ret = redis_node_lock(&lock->connections[i],
lock->base.lock_key,
lock->base.lock_value,
lock->expire_ms, error);
if (ret == 0) {
acquired_count++;
}
// 检查是否超时
time_t now = time(NULL);
if ((now - start) * 1000 > timeout_ms) {
break;
}
}
// 判断是否获得多数派
if (acquired_count >= lock->quorum) {
lock->base.acquired = 1;
lock->base.expire_time = time(NULL) + lock->expire_ms / 1000;
return 0;
}
// 加锁失败,释放已获取的锁
for (int i = 0; i < lock->node_count; i++) {
redis_node_unlock(&lock->connections[i],
lock->base.lock_key,
lock->base.lock_value);
}
return -1;
}
// Redlock解锁
int redlock_unlock(distributed_lock_t *base) {
redis_lock_t *lock = (redis_lock_t*)base;
if (!lock->base.acquired) return -1;
for (int i = 0; i < lock->node_count; i++) {
redis_node_unlock(&lock->connections[i],
lock->base.lock_key,
lock->base.lock_value);
}
lock->base.acquired = 0;
return 0;
}
// 锁续期
int redlock_renew(distributed_lock_t *base) {
redis_lock_t *lock = (redis_lock_t*)base;
if (!lock->base.acquired) return -1;
// 在大部分节点上续期
int renewed = 0;
for (int i = 0; i < lock->node_count; i++) {
// 使用PEXPIRE续期
char cmd[256];
snprintf(cmd, sizeof(cmd), "PEXPIRE %s %lld",
lock->base.lock_key, lock->expire_ms);
char response[64];
if (redis_command(&lock->connections[i], cmd, response, sizeof(response)) >= 0) {
if (strstr(response, ":1")) renewed++;
}
}
if (renewed >= lock->quorum) {
lock->base.expire_time = time(NULL) + lock->expire_ms / 1000;
return 0;
}
return -1;
}
// 创建Redis分布式锁
distributed_lock_t *create_redis_lock(const char *key,
const char **hosts, const int *ports,
int node_count, long long expire_ms) {
redis_lock_t *lock = malloc(sizeof(redis_lock_t));
memset(lock, 0, sizeof(redis_lock_t));
strcpy(lock->base.lock_key, key);
generate_lock_value(lock->base.lock_value, sizeof(lock->base.lock_value));
lock->base.acquired = 0;
lock->base.expire_time = 0;
lock->node_count = node_count;
lock->quorum = node_count / 2 + 1;
lock->expire_ms = expire_ms;
// 连接所有Redis节点
lock->connections = malloc(sizeof(redis_conn_t) * node_count);
for (int i = 0; i < node_count; i++) {
if (redis_connect(&lock->connections[i], hosts[i], ports[i]) < 0) {
// 连接失败处理
lock->connections[i].sock_fd = -1;
}
}
lock->base.lock = redlock_lock;
lock->base.unlock = redlock_unlock;
lock->base.renew = redlock_renew;
lock->base.destroy = NULL; // 会在外部处理
return (distributed_lock_t*)lock;
}
```
4. Zookeeper分布式锁实现
```c
// Zookeeper节点结构(模拟)
typedef struct zk_node {
char path[256];
char data[256];
int ephemeral;
int sequential;
int seq_num;
struct zk_node *children;
struct zk_node *next;
} zk_node_t;
// Zookeeper模拟服务器
typedef struct zk_server {
zk_node_t *root;
pthread_mutex_t mutex;
} zk_server_t;
zk_server_t *g_zk_server = NULL;
// 初始化模拟ZK
void zk_init() {
g_zk_server = malloc(sizeof(zk_server_t));
g_zk_server->root = malloc(sizeof(zk_node_t));
strcpy(g_zk_server->root->path, "/");
g_zk_server->root->children = NULL;
pthread_mutex_init(&g_zk_server->mutex, NULL);
}
// 创建ZNode
zk_node_t *zk_create_node(const char *path, int ephemeral, int sequential) {
zk_node_t *node = malloc(sizeof(zk_node_t));
strcpy(node->path, path);
node->ephemeral = ephemeral;
node->sequential = sequential;
node->seq_num = 0;
node->children = NULL;
node->next = NULL;
if (sequential) {
node->seq_num = rand() % 10000;
}
return node;
}
// 创建顺序节点
int zk_create_sequential(const char *base_path, const char *data, char *result_path) {
pthread_mutex_lock(&g_zk_server->mutex);
// 查找父节点
zk_node_t *parent = g_zk_server->root;
char *path_copy = strdup(base_path);
char *token = strtok(path_copy, "/");
while (token) {
zk_node_t *child = parent->children;
int found = 0;
while (child) {
if (strcmp(child->path + 1, token) == 0) {
parent = child;
found = 1;
break;
}
child = child->next;
}
if (!found) {
// 创建中间节点
char new_path[256];
snprintf(new_path, sizeof(new_path), "%s/%s", parent->path, token);
zk_node_t *new_node = zk_create_node(new_path, 0, 0);
new_node->next = parent->children;
parent->children = new_node;
parent = new_node;
}
token = strtok(NULL, "/");
}
free(path_copy);
// 创建顺序节点
int seq = parent->children ? parent->children->seq_num + 1 : 1;
char seq_path[256];
snprintf(seq_path, sizeof(seq_path), "%s/lock-%05d", base_path, seq);
zk_node_t *node = zk_create_node(seq_path, 1, 1);
node->seq_num = seq;
strcpy(node->data, data);
node->next = parent->children;
parent->children = node;
strcpy(result_path, seq_path);
pthread_mutex_unlock(&g_zk_server->mutex);
return seq;
}
// 获取子节点列表(按顺序)
void zk_get_children(const char *path, char ***children, int *count) {
pthread_mutex_lock(&g_zk_server->mutex);
// 查找节点
zk_node_t *node = g_zk_server->root;
char *path_copy = strdup(path);
char *token = strtok(path_copy, "/");
while (token) {
zk_node_t *child = node->children;
int found = 0;
while (child) {
if (strcmp(child->path + 1, token) == 0) {
node = child;
found = 1;
break;
}
child = child->next;
}
if (!found) {
free(path_copy);
*count = 0;
pthread_mutex_unlock(&g_zk_server->mutex);
return;
}
token = strtok(NULL, "/");
}
free(path_copy);
// 收集子节点
*count = 0;
zk_node_t *child = node->children;
while (child) {
(*count)++;
child = child->next;
}
*children = malloc(sizeof(char*) * (*count));
child = node->children;
int idx = 0;
while (child) {
(*children)[idx++] = strdup(child->path);
child = child->next;
}
pthread_mutex_unlock(&g_zk_server->mutex);
}
// 删除节点
void zk_delete(const char *path) {
pthread_mutex_lock(&g_zk_server->mutex);
// 查找父节点
zk_node_t *parent = g_zk_server->root;
char *path_copy = strdup(path);
char *last_token = NULL;
char *token = strtok(path_copy, "/");
while (token) {
last_token = token;
zk_node_t *child = parent->children;
int found = 0;
while (child) {
if (strcmp(child->path + 1, token) == 0) {
parent = child;
found = 1;
break;
}
child = child->next;
}
if (!found) {
free(path_copy);
pthread_mutex_unlock(&g_zk_server->mutex);
return;
}
token = strtok(NULL, "/");
}
// 删除节点
zk_node_t *prev = NULL;
zk_node_t *child = parent->children;
while (child) {
if (strcmp(child->path, path) == 0) {
if (prev) {
prev->next = child->next;
} else {
parent->children = child->next;
}
free(child);
break;
}
prev = child;
child = child->next;
}
free(path_copy);
pthread_mutex_unlock(&g_zk_server->mutex);
}
// Zookeeper锁结构
typedef struct zk_lock {
distributed_lock_t base;
char lock_path[256];
char node_path[256];
char watch_path[256];
int node_seq;
} zk_lock_t;
// Zookeeper加锁
int zk_lock(distributed_lock_t *base, int timeout_ms) {
zk_lock_t *lock = (zk_lock_t*)base;
time_t start = time(NULL);
// 创建临时顺序节点
snprintf(lock->lock_path, sizeof(lock->lock_path), "/locks/%s",
lock->base.lock_key);
char data[256];
snprintf(data, sizeof(data), "client-%d", getpid());
char result_path[256];
int seq = zk_create_sequential(lock->lock_path, data, result_path);
if (seq < 0) return -1;
strcpy(lock->node_path, result_path);
lock->node_seq = seq;
// 检查是否是最小节点
while (1) {
char **children;
int count;
zk_get_children(lock->lock_path, &children, &count);
// 找到最小序号
int min_seq = 99999;
for (int i = 0; i < count; i++) {
int s;
sscanf(children[i], "%*[^-]-%d", &s);
if (s < min_seq) min_seq = s;
free(children[i]);
}
free(children);
if (lock->node_seq == min_seq) {
// 获得锁
lock->base.acquired = 1;
return 0;
}
// 监听前一个节点(简化:轮询)
usleep(100 * 1000);
time_t now = time(NULL);
if ((now - start) * 1000 > timeout_ms) {
zk_delete(lock->node_path);
return -1;
}
}
}
// Zookeeper解锁
int zk_unlock(distributed_lock_t *base) {
zk_lock_t *lock = (zk_lock_t*)base;
if (!lock->base.acquired) return -1;
zk_delete(lock->node_path);
lock->base.acquired = 0;
return 0;
}
// 创建Zookeeper分布式锁
distributed_lock_t *create_zk_lock(const char *key) {
zk_lock_t *lock = malloc(sizeof(zk_lock_t));
memset(lock, 0, sizeof(zk_lock_t));
strcpy(lock->base.lock_key, key);
lock->base.acquired = 0;
lock->base.lock = zk_lock;
lock->base.unlock = zk_unlock;
lock->base.renew = NULL; // ZK锁不需要续期(会话保持)
// 确保锁路径存在
char path[256];
snprintf(path, sizeof(path), "/locks/%s", key);
strcpy(lock->lock_path, path);
return (distributed_lock_t*)lock;
}
```
5. 使用示例
```c
// 模拟共享资源
int shared_counter = 0;
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;
void *worker_thread(void *arg) {
distributed_lock_t *lock = (distributed_lock_t*)arg;
for (int i = 0; i < 10; i++) {
// 加锁
if (lock->lock(lock, 5000) == 0) {
// 临界区
pthread_mutex_lock(&counter_mutex);
shared_counter++;
int current = shared_counter;
pthread_mutex_unlock(&counter_mutex);
printf("Thread %lu: counter = %d\n", pthread_self(), current);
usleep(10000);
// 解锁
lock->unlock(lock);
} else {
printf("Thread %lu: 获取锁失败\n", pthread_self());
}
usleep(5000);
}
return NULL;
}
int main() {
printf("=== 分布式锁测试 ===\n\n");
srand(time(NULL));
// 初始化Zookeeper模拟服务器
zk_init();
// 测试Redis锁
printf("--- Redis分布式锁 ---\n");
const char *hosts[] = {"127.0.0.1", "127.0.0.1", "127.0.0.1"};
int ports[] = {6379, 6380, 6381};
distributed_lock_t *redis_lock = create_redis_lock("my_resource",
hosts, ports, 3, 10000);
// 创建多个线程竞争锁
pthread_t threads[5];
for (int i = 0; i < 5; i++) {
pthread_create(&threads[i], NULL, worker_thread, redis_lock);
}
for (int i = 0; i < 5; i++) {
pthread_join(threads[i], NULL);
}
// 测试Zookeeper锁
printf("\n--- Zookeeper分布式锁 ---\n");
shared_counter = 0;
distributed_lock_t *zk_lock = create_zk_lock("my_resource");
for (int i = 0; i < 5; i++) {
pthread_create(&threads[i], NULL, worker_thread, zk_lock);
}
for (int i = 0; i < 5; i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
```
---
三、编译和运行
```bash
gcc -o distributed_lock distributed_lock.c -lpthread
./distributed_lock
```
---
四、Redis vs Zookeeper分布式锁
特性 Redis Zookeeper
一致性 最终一致 强一致(ZAB协议)
性能 高(内存操作) 低(需要选举)
可用性 高(主从切换) 高(集群)
死锁预防 超时自动释放 临时节点自动删除
实现复杂度 简单 复杂
---
五、常见问题
问题 解决方案
锁过期业务未完成 锁续期(看门狗机制)
Redis主从切换锁丢失 Redlock算法
死锁 设置合理的超时时间
可重入 在锁值中记录持有者信息
---
六、总结
通过这篇文章,你学会了:
· 分布式锁的核心要求(互斥、防死锁、高可用)
· Redis分布式锁(SET NX PX + Redlock)
· Zookeeper分布式锁(临时顺序节点)
· 锁续期、超时处理
· 完整的测试示例
分布式锁是分布式系统的必备工具。掌握它,你就理解了秒杀系统、分布式任务调度的核心设计。
下一篇预告:《从零实现一个分布式事务:TCC与Saga模式》
---
评论区分享一下你用分布式锁解决过什么场景~
