当前位置: 首页 > news >正文

PHP数据同步与CDC变更数据捕获

PHP数据同步与CDC变更数据捕获

CDC(Change Data Capture)是一种跟踪数据库变更的技术。PHP可以通过多种方式实现CDC和数据同步。今天说说PHP中CDC的实现方案。

CDC的核心是捕获数据的插入、更新和删除操作,并将变更传播到其他系统。

```php
class ChangeTracker
{
private PDO $pdo;
private string $table;

public function __construct(PDO $pdo, string $table)
{
$this->pdo = $pdo;
$this->table = $table;
$this->initTrackingTable();
}

private function initTrackingTable(): void
{
$this->pdo->exec("
CREATE TABLE IF NOT EXISTS cdc_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
operation ENUM('insert', 'update', 'delete') NOT NULL,
entity_id INT NOT NULL,
old_data JSON,
new_data JSON,
changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed BOOLEAN DEFAULT FALSE,
INDEX idx_unprocessed (processed, changed_at)
)
");
}

public function logInsert(int $entityId, array $data): void
{
$stmt = $this->pdo->prepare("
INSERT INTO cdc_log (table_name, operation, entity_id, new_data)
VALUES (?, 'insert', ?, ?)
");
$stmt->execute([$this->table, $entityId, json_encode($data, JSON_UNESCAPED_UNICODE)]);
}

public function logUpdate(int $entityId, array $oldData, array $newData): void
{
$stmt = $this->pdo->prepare("
INSERT INTO cdc_log (table_name, operation, entity_id, old_data, new_data)
VALUES (?, 'update', ?, ?, ?)
");
$stmt->execute([
$this->table,
$entityId,
json_encode($oldData, JSON_UNESCAPED_UNICODE),
json_encode($newData, JSON_UNESCAPED_UNICODE),
]);
}

public function logDelete(int $entityId, array $oldData): void
{
$stmt = $this->pdo->prepare("
INSERT INTO cdc_log (table_name, operation, entity_id, old_data)
VALUES (?, 'delete', ?, ?)
");
$stmt->execute([$this->table, $entityId, json_encode($oldData, JSON_UNESCAPED_UNICODE)]);
}

public function getUnprocessed(int $limit = 100): array
{
$stmt = $this->pdo->prepare("
SELECT * FROM cdc_log
WHERE processed = FALSE
ORDER BY id ASC
LIMIT ?
");
$stmt->execute([$limit]);
return $stmt->fetchAll();
}

public function markProcessed(int $id): void
{
$this->pdo->prepare("UPDATE cdc_log SET processed = TRUE WHERE id = ?")->execute([$id]);
}
}

class UserServiceWithCDC
{
private PDO $pdo;
private ChangeTracker $tracker;

public function __construct(PDO $pdo, ChangeTracker $tracker)
{
$this->pdo = $pdo;
$this->tracker = $tracker;
}

public function createUser(string $name, string $email): int
{
$stmt = $this->pdo->prepare("INSERT INTO users (name, email) VALUES (?, ?)");
$stmt->execute([$name, $email]);
$id = (int)$this->pdo->lastInsertId();

$this->tracker->logInsert($id, ['name' => $name, 'email' => $email]);
return $id;
}

public function updateUser(int $id, array $data): void
{
$oldStmt = $this->pdo->prepare("SELECT * FROM users WHERE id = ?");
$oldStmt->execute([$id]);
$oldData = $oldStmt->fetch(PDO::FETCH_ASSOC);

$sets = [];
$params = [];
foreach ($data as $key => $value) {
$sets[] = "{$key} = ?";
$params[] = $value;
}
$params[] = $id;

$sql = "UPDATE users SET " . implode(', ', $sets) . " WHERE id = ?";
$this->pdo->prepare($sql)->execute($params);

$this->tracker->logUpdate($id, $oldData, $data);
}

public function deleteUser(int $id): void
{
$stmt = $this->pdo->prepare("SELECT * FROM users WHERE id = ?");
$stmt->execute([$id]);
$oldData = $stmt->fetch(PDO::FETCH_ASSOC);

$this->pdo->prepare("DELETE FROM users WHERE id = ?")->execute([$id]);
$this->tracker->logDelete($id, $oldData);
}
}
?>

CDC消费者将变更同步到其他系统:

```php
class CdcConsumer
{
private ChangeTracker $tracker;
private array $handlers = [];

public function __construct(ChangeTracker $tracker)
{
$this->tracker = $tracker;
}

public function registerHandler(string $operation, callable $handler): void
{
$this->handlers[$operation][] = $handler;
}

public function process(): int
{
$processed = 0;
$changes = $this->tracker->getUnprocessed(50);

foreach ($changes as $change) {
try {
$handlers = $this->handlers[$change['operation']] ?? [];
foreach ($handlers as $handler) {
$handler($change);
}
$this->tracker->markProcessed($change['id']);
$processed++;
} catch (\Exception $e) {
error_log("CDC处理失败: {$e->getMessage()}");
}
}

return $processed;
}
}
?>

CDC是数据同步和事件驱动架构的基础技术。通过捕获数据库变更日志,可以将数据同步到缓存、搜索引擎或数据仓库。CDC的关键是变更的顺序性和幂等性,确保数据最终一致。PHP实现的CDC适合轻量级的同步场景,大规模场景建议使用Debezium等专业工具。

http://www.jsqmd.com/news/937421/

相关文章:

  • 基于CircuitPython与WS2812B的温度感应可穿戴头饰制作全攻略
  • 2026新疆建筑资质/压力管道资质代办机构推荐排行 权威专业榜单 - 极欧测评
  • 5分钟掌握Translumo:Windows平台终极实时屏幕翻译工具完整指南
  • G-Helper终极指南:华硕笔记本轻量级控制中心完全教程
  • 山东喷涂工艺品牌2026最新排行:5家企业核心能力客观对比 - 奔跑123
  • 2026不锈钢桥架厂家实力排名|防火电缆桥架选型指南与工业民用口碑推荐 - 安互工业信息
  • 基于WS2812与ESP8266的动态几何灯光艺术装置设计与实现
  • ES2020七大新特性实战:构建单位价格计算器
  • MTK手机传感器驱动开发避坑指南:从FreeRTOS到CHRE的完整加载流程解析
  • 2026年GEO/SEO优化服务商选型全解:GEO优化是啥?谁是国内TOP5专业GEO/SEO公司? - 互联网科技品牌测评
  • 从AlphaZero到区块链:指数技术浪潮下的信任构建与伦理挑战
  • 中小企业如何利用云机器学习实现智能化转型:场景、成本与落地指南
  • 别再炸机了!固定翼无人机重心调试保姆级指南(从原理到实操)
  • Windows Server 2022组策略实战:10分钟搞定桌面环境标准化(附脚本)
  • 2026年微纳CT X射线在线检测系统制造商技术能力解析:选型参考指南 - 品牌推荐大师1
  • AI语音合成将如何重塑内容产业?:7大颠覆性趋势+3类已验证商业场景(附2025技术成熟度曲线)
  • 个人AI工具清单:从ChatGPT到DeepSeek,提升效率的实用指南
  • HttpContext.Connection 深度解析:从连接元数据到请求追踪与 mTLS
  • # 总氮水质在线自动监测仪源头厂家推荐榜:2026国产技术突围与选型实战全解析 - 仪表品牌榜
  • 【紧急更新】Veo 2最新连贯性Bug已确认影响4K/60fps项目交付(附临时热修复patch+Google DeepMind联合建议应对方案)
  • League Akari:你的英雄联盟智能助手终极指南 [特殊字符]
  • 别再只会用Google了!网络安全工程师的“神器”FOFA,从语法到实战一次讲透
  • 别再死磕淘宝源了!手把手教你将npm镜像切换到npmmirror.com(解决证书过期问题)
  • AI工具“免费”背后的精密算计:从Rate Limit到数据训练权,6大隐性条款如何 silently lock 你的生产力
  • Arduino温控风扇系统:从传感器到电机驱动的嵌入式实战
  • AI Agent项目立项前需要做哪些可行性分析?最详细的全景指南与高ROI实战方案
  • 终极跨平台视频查重神器:Czkawka/Krokiet 5步释放硬盘空间
  • 别再死记公式了!用LTspice仿真OP07D反相放大器,5分钟搞懂‘虚短’和‘虚断’
  • 劳力士回收也贬值?拒绝套路!6 月北京最新榜单告诉你谁家靠谱 - 合扬奢侈品交易中心
  • 不只是libxcb-cursor:盘点Qt在Linux桌面(X11/Wayland)下那些容易缺失的图形库