手写一个B+树:从原理到数据库索引实战
前言
你有没有想过:MySQL为什么能用几毫秒从几亿条数据中找到你要的那一行?
答案是:B+树。
今天,我们手写一颗生产级的B+树:
· 支持百万级数据的高效存储
· 支持范围查询和分页
· 支持顺序遍历
· 完整实现,可直接用于项目
---
一、B+树的核心原理
1. 为什么是B+树?
数据结构 磁盘IO次数 适用场景
二叉搜索树 O(log N) ≈ 30次 内存
红黑树 O(log N) ≈ 30次 内存
B树 O(log_m N) ≈ 3-4次 磁盘
B+树 O(log_m N) ≈ 3-4次 磁盘索引
m = 1000时,10亿数据只需要3-4次IO!
2. B+树的三大特点
```
┌─────────────────────────────┐
│ [50, 100] │ ← 内部节点(只存key)
└─────────────┬───────────────┘
│
┌─────────────────────────┼─────────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ [10, 30] │ │ [60, 80] │ │ [120, 150] │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
┌───┴───┐ ┌───┴───┐ ┌───┴───┐
▼ ▼ ▼ ▼ ▼ ▼
┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐
│10,20│ │30,40│ │60,70│ │80,90│ │120,130│150,160│
│data│ │data│ │data│ │data│ │ data │ data │
└─────┘ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘
↑ ↑ ↑ ↑ ↑ ↑
└───────┴──────────────────┴───────┴──────────────────┴───────┘
叶子节点(存data + 链表指针)
```
三大特点:
1. 内部节点只存key:不存data,每个节点能存更多key → 树更矮
2. 叶子节点存所有data:用链表串联 → 支持范围查询
3. 所有叶子深度相同:完美平衡
---
二、完整代码实现
1. 节点定义
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
// B+树阶数(每个节点最多有多少个key)
// 公式:阶数 = 页大小 / (键大小 + 指针大小)
// MySQL默认页16KB,约300-500阶
#define ORDER 4 // 先设为4方便调试,生产环境建议100+
// 键值对(叶子节点存储)
typedef struct {
int key; // 键(这里用int简化,实际可以是任意类型)
void *data; // 数据指针
} leaf_entry_t;
// B+树节点
typedef struct bplus_node {
int is_leaf; // 是否是叶子节点
int num_keys; // 当前key数量
int *keys; // key数组
union {
struct bplus_node **children; // 内部节点:子节点指针数组
leaf_entry_t *entries; // 叶子节点:键值对数组
};
struct bplus_node *next; // 叶子节点链表指针(范围查询用)
struct bplus_node *parent; // 父节点
} bplus_node_t;
// B+树结构
typedef struct {
bplus_node_t *root; // 根节点
int order; // 阶数
int node_count; // 节点数量
int key_count; // 键值对数量
pthread_rwlock_t rwlock; // 读写锁
} bplus_tree_t;
```
2. 节点初始化和销毁
```c
// 创建内部节点
bplus_node_t *create_internal_node(int order) {
bplus_node_t *node = calloc(1, sizeof(bplus_node_t));
if (!node) return NULL;
node->is_leaf = 0;
node->num_keys = 0;
node->keys = malloc(sizeof(int) * (order));
node->children = malloc(sizeof(bplus_node_t*) * (order + 1));
if (!node->keys || !node->children) {
free(node->keys);
free(node->children);
free(node);
return NULL;
}
for (int i = 0; i <= order; i++) {
node->children[i] = NULL;
}
node->next = NULL;
node->parent = NULL;
return node;
}
// 创建叶子节点
bplus_node_t *create_leaf_node(int order) {
bplus_node_t *node = calloc(1, sizeof(bplus_node_t));
if (!node) return NULL;
node->is_leaf = 1;
node->num_keys = 0;
node->keys = malloc(sizeof(int) * order);
node->entries = malloc(sizeof(leaf_entry_t) * order);
if (!node->keys || !node->entries) {
free(node->keys);
free(node->entries);
free(node);
return NULL;
}
node->children = NULL;
node->next = NULL;
node->parent = NULL;
return node;
}
// 释放节点及其子树
void free_node(bplus_node_t *node) {
if (!node) return;
free(node->keys);
if (node->is_leaf) {
for (int i = 0; i < node->num_keys; i++) {
if (node->entries[i].data) {
free(node->entries[i].data);
}
}
free(node->entries);
} else {
for (int i = 0; i <= node->num_keys; i++) {
free_node(node->children[i]);
}
free(node->children);
}
free(node);
}
```
3. 查找操作
```c
// 在节点中查找key的位置(二分查找)
int find_key_position(bplus_node_t *node, int key) {
int left = 0, right = node->num_keys - 1;
while (left <= right) {
int mid = (left + right) / 2;
if (node->keys[mid] < key) {
left = mid + 1;
} else if (node->keys[mid] > key) {
right = mid - 1;
} else {
return mid;
}
}
return left; // 返回应该插入的位置
}
// 查找key对应的数据
void *bplus_search(bplus_tree_t *tree, int key) {
if (!tree || !tree->root) return NULL;
pthread_rwlock_rdlock(&tree->rwlock);
bplus_node_t *node = tree->root;
// 走到叶子节点
while (!node->is_leaf) {
int pos = find_key_position(node, key);
node = node->children[pos];
}
// 在叶子节点中查找
int pos = find_key_position(node, key);
void *result = NULL;
if (pos < node->num_keys && node->keys[pos] == key) {
result = node->entries[pos].data;
}
pthread_rwlock_unlock(&tree->rwlock);
return result;
}
```
4. 插入和分裂
```c
// 分裂叶子节点
void split_leaf_node(bplus_tree_t *tree, bplus_node_t *leaf) {
int order = tree->order;
int mid = order / 2;
// 创建新叶子节点
bplus_node_t *new_leaf = create_leaf_node(order);
// 复制后半部分到新节点
for (int i = mid; i < leaf->num_keys; i++) {
new_leaf->keys[i - mid] = leaf->keys[i];
new_leaf->entries[i - mid] = leaf->entries[i];
}
new_leaf->num_keys = leaf->num_keys - mid;
leaf->num_keys = mid;
// 更新叶子链表
new_leaf->next = leaf->next;
leaf->next = new_leaf;
new_leaf->parent = leaf->parent;
// 将中间key提升到父节点
int up_key = new_leaf->keys[0];
bplus_node_t *parent = leaf->parent;
if (!parent) {
// 创建新根节点
parent = create_internal_node(order);
tree->root = parent;
parent->keys[0] = up_key;
parent->children[0] = leaf;
parent->children[1] = new_leaf;
parent->num_keys = 1;
leaf->parent = parent;
new_leaf->parent = parent;
tree->node_count += 2;
} else {
// 插入到父节点
insert_into_parent(tree, parent, leaf, new_leaf, up_key);
}
}
// 分裂内部节点
void split_internal_node(bplus_tree_t *tree, bplus_node_t *internal) {
int order = tree->order;
int mid = order / 2;
bplus_node_t *new_internal = create_internal_node(order);
// 复制后半部分
for (int i = mid + 1; i < internal->num_keys; i++) {
new_internal->keys[i - (mid + 1)] = internal->keys[i];
new_internal->children[i - (mid + 1)] = internal->children[i];
if (internal->children[i]) {
internal->children[i]->parent = new_internal;
}
}
new_internal->children[new_internal->num_keys] = internal->children[internal->num_keys];
if (internal->children[internal->num_keys]) {
internal->children[internal->num_keys]->parent = new_internal;
}
new_internal->num_keys = internal->num_keys - (mid + 1);
internal->num_keys = mid;
int up_key = internal->keys[mid];
bplus_node_t *parent = internal->parent;
if (!parent) {
parent = create_internal_node(order);
tree->root = parent;
parent->keys[0] = up_key;
parent->children[0] = internal;
parent->children[1] = new_internal;
parent->num_keys = 1;
internal->parent = parent;
new_internal->parent = parent;
tree->node_count += 2;
} else {
insert_into_parent(tree, parent, internal, new_internal, up_key);
}
}
// 插入到父节点
void insert_into_parent(bplus_tree_t *tree, bplus_node_t *parent,
bplus_node_t *left, bplus_node_t *right, int key) {
int pos = find_key_position(parent, key);
// 腾出空间
for (int i = parent->num_keys; i > pos; i--) {
parent->keys[i] = parent->keys[i - 1];
parent->children[i + 1] = parent->children[i];
}
parent->keys[pos] = key;
parent->children[pos + 1] = right;
parent->num_keys++;
// 检查是否需要分裂
if (parent->num_keys >= tree->order) {
split_internal_node(tree, parent);
}
}
// 插入数据到叶子节点
void insert_into_leaf(bplus_tree_t *tree, bplus_node_t *leaf, int key, void *data) {
int pos = find_key_position(leaf, key);
// 检查是否已存在
if (pos < leaf->num_keys && leaf->keys[pos] == key) {
// 更新现有数据
if (leaf->entries[pos].data) {
free(leaf->entries[pos].data);
}
leaf->entries[pos].data = data;
return;
}
// 腾出空间
for (int i = leaf->num_keys; i > pos; i--) {
leaf->keys[i] = leaf->keys[i - 1];
leaf->entries[i] = leaf->entries[i - 1];
}
leaf->keys[pos] = key;
leaf->entries[pos].key = key;
leaf->entries[pos].data = data;
leaf->num_keys++;
tree->key_count++;
// 检查是否需要分裂
if (leaf->num_keys >= tree->order) {
split_leaf_node(tree, leaf);
}
}
// 插入主函数
int bplus_insert(bplus_tree_t *tree, int key, void *data) {
if (!tree || !data) return -1;
pthread_rwlock_wrlock(&tree->rwlock);
if (!tree->root) {
// 空树,创建根节点
tree->root = create_leaf_node(tree->order);
if (!tree->root) {
pthread_rwlock_unlock(&tree->rwlock);
return -1;
}
tree->root->keys[0] = key;
tree->root->entries[0].key = key;
tree->root->entries[0].data = data;
tree->root->num_keys = 1;
tree->key_count = 1;
tree->node_count = 1;
pthread_rwlock_unlock(&tree->rwlock);
return 0;
}
// 找到要插入的叶子节点
bplus_node_t *node = tree->root;
while (!node->is_leaf) {
int pos = find_key_position(node, key);
node = node->children[pos];
}
insert_into_leaf(tree, node, key, data);
pthread_rwlock_unlock(&tree->rwlock);
return 0;
}
```
5. 范围查询
```c
// 范围查询结构
typedef struct {
int *keys;
void **values;
int count;
int capacity;
} range_result_t;
// 创建范围查询结果
range_result_t *create_range_result(int capacity) {
range_result_t *result = malloc(sizeof(range_result_t));
if (!result) return NULL;
result->keys = malloc(sizeof(int) * capacity);
result->values = malloc(sizeof(void*) * capacity);
result->count = 0;
result->capacity = capacity;
return result;
}
// 释放范围查询结果
void free_range_result(range_result_t *result) {
if (result) {
free(result->keys);
free(result->values);
free(result);
}
}
// 范围查询
range_result_t *bplus_range(bplus_tree_t *tree, int start, int end, int limit) {
if (!tree || !tree->root) return NULL;
pthread_rwlock_rdlock(&tree->rwlock);
range_result_t *result = create_range_result(limit > 0 ? limit : 1000);
if (!result) {
pthread_rwlock_unlock(&tree->rwlock);
return NULL;
}
// 找到起始叶子节点
bplus_node_t *node = tree->root;
while (!node->is_leaf) {
int pos = find_key_position(node, start);
node = node->children[pos];
}
// 遍历叶子节点
int collected = 0;
while (node && (limit == 0 || collected < limit)) {
for (int i = 0; i < node->num_keys && (limit == 0 || collected < limit); i++) {
if (node->keys[i] >= start && node->keys[i] <= end) {
if (collected >= result->capacity) {
result->capacity *= 2;
result->keys = realloc(result->keys, sizeof(int) * result->capacity);
result->values = realloc(result->values, sizeof(void*) * result->capacity);
}
result->keys[collected] = node->keys[i];
result->values[collected] = node->entries[i].data;
collected++;
} else if (node->keys[i] > end) {
// 超出范围,结束
node = NULL;
break;
}
}
node = node->next;
}
result->count = collected;
pthread_rwlock_unlock(&tree->rwlock);
return result;
}
```
6. 统计和调试
```c
// 获取树的高度
int bplus_height(bplus_node_t *node) {
if (!node || node->is_leaf) return 1;
return 1 + bplus_height(node->children[0]);
}
// 打印树(调试用)
void bplus_print(bplus_node_t *node, int level) {
if (!node) return;
for (int i = 0; i < level; i++) {
printf(" ");
}
printf("%s [", node->is_leaf ? "LEAF" : "INT");
for (int i = 0; i < node->num_keys; i++) {
printf("%d", node->keys[i]);
if (i < node->num_keys - 1) printf(",");
}
printf("]\n");
if (!node->is_leaf) {
for (int i = 0; i <= node->num_keys; i++) {
bplus_print(node->children[i], level + 1);
}
}
}
```
---
三、测试代码
基础功能测试
```c
#include <time.h>
#include <assert.h>
int main() {
printf("=== B+树基础测试 ===\n\n");
bplus_tree_t *tree = malloc(sizeof(bplus_tree_t));
tree->root = NULL;
tree->order = ORDER;
tree->node_count = 0;
tree->key_count = 0;
pthread_rwlock_init(&tree->rwlock, NULL);
// 插入数据
printf("插入100条数据...\n");
for (int i = 0; i < 100; i++) {
int *data = malloc(sizeof(int));
*data = i * 10;
bplus_insert(tree, i, data);
}
printf("树高度: %d\n", bplus_height(tree->root));
printf("节点数: %d\n", tree->node_count);
printf("键值对数: %d\n", tree->key_count);
// 查找测试
printf("\n=== 查找测试 ===\n");
int *found = bplus_search(tree, 50);
printf("key=50: %d\n", found ? *found : -1);
found = bplus_search(tree, 200);
printf("key=200: %s\n", found ? "存在" : "不存在");
// 范围查询测试
printf("\n=== 范围查询测试 ===\n");
range_result_t *range = bplus_range(tree, 20, 60, 10);
printf("范围[20,60]前10条:\n");
for (int i = 0; i < range->count; i++) {
printf(" key=%d, value=%d\n", range->keys[i], *(int*)range->values[i]);
}
free_range_result(range);
bplus_print(tree->root, 0);
// 清理
free_node(tree->root);
free(tree);
return 0;
}
```
性能测试
```c
void test_performance() {
printf("\n=== B+树性能测试 ===\n\n");
int test_sizes[] = {10000, 50000, 100000, 500000};
for (int s = 0; s < 4; s++) {
int n = test_sizes[s];
printf("--- 测试规模: %d 元素 ---\n", n);
bplus_tree_t *tree = malloc(sizeof(bplus_tree_t));
tree->root = NULL;
tree->order = ORDER;
pthread_rwlock_init(&tree->rwlock, NULL);
// 插入测试
clock_t start = clock();
for (int i = 0; i < n; i++) {
int *data = malloc(sizeof(int));
*data = i;
bplus_insert(tree, i, data);
}
clock_t end = clock();
printf("插入 %d 条: %.3f 秒\n", n, (double)(end - start) / CLOCKS_PER_SEC);
// 查找测试
start = clock();
for (int i = 0; i < n; i++) {
bplus_search(tree, i);
}
end = clock();
printf("查找 %d 次: %.3f 秒\n", n, (double)(end - start) / CLOCKS_PER_SEC);
// 统计
printf("树高度: %d\n", bplus_height(tree->root));
printf("节点数: %d\n", tree->node_count);
printf("理论IO次数: %d\n", bplus_height(tree->root));
free_node(tree->root);
free(tree);
printf("\n");
}
}
```
---
四、B+树 vs 其他数据结构
数据结构 磁盘IO 范围查询 实现复杂度 适用场景
B+树 3-4次 ✅ 极快 复杂 数据库索引
B树 3-4次 ❌ 慢 复杂 文件系统
红黑树 30次 ❌ 慢 中等 内存数据
哈希表 1次 ❌ 不支持 简单 等值查询
LSM树 多次 ✅ 支持 复杂 写多读少
---
五、总结
通过这篇文章,你学会了:
· B+树的核心原理(多路搜索 + 叶子链表)
· 完整的插入、查找、范围查询实现
· 节点分裂和合并的逻辑
· 与MySQL索引的关系
B+树是数据库最核心的数据结构。掌握它,你就掌握了数据库索引设计的底层逻辑。
下一篇预告:《LSM树:从B+树到键值存储引擎》
---
评论区分享一下你对B+树还有哪些疑问~
