PHP数据同步与CDC变更数据捕获
PHP数据同步与CDC变更数据捕获
CDC(Change Data Capture)是一种跟踪数据库变更的技术。PHP可以通过多种方式实现CDC和数据同步。今天说说PHP中CDC的实现方案。
CDC的核心是捕获数据的插入、更新和删除操作,并将变更传播到其他系统。
```php
class ChangeTracker
{
private PDO $pdo;
private string $table;
public function __construct(PDO $pdo, string $table)
{
$this->pdo = $pdo;
$this->table = $table;
$this->initTrackingTable();
}
private function initTrackingTable(): void
{
$this->pdo->exec("
CREATE TABLE IF NOT EXISTS cdc_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
operation ENUM('insert', 'update', 'delete') NOT NULL,
entity_id INT NOT NULL,
old_data JSON,
new_data JSON,
changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed BOOLEAN DEFAULT FALSE,
INDEX idx_unprocessed (processed, changed_at)
)
");
}
public function logInsert(int $entityId, array $data): void
{
$stmt = $this->pdo->prepare("
INSERT INTO cdc_log (table_name, operation, entity_id, new_data)
VALUES (?, 'insert', ?, ?)
");
$stmt->execute([$this->table, $entityId, json_encode($data, JSON_UNESCAPED_UNICODE)]);
}
public function logUpdate(int $entityId, array $oldData, array $newData): void
{
$stmt = $this->pdo->prepare("
INSERT INTO cdc_log (table_name, operation, entity_id, old_data, new_data)
VALUES (?, 'update', ?, ?, ?)
");
$stmt->execute([
$this->table,
$entityId,
json_encode($oldData, JSON_UNESCAPED_UNICODE),
json_encode($newData, JSON_UNESCAPED_UNICODE),
]);
}
public function logDelete(int $entityId, array $oldData): void
{
$stmt = $this->pdo->prepare("
INSERT INTO cdc_log (table_name, operation, entity_id, old_data)
VALUES (?, 'delete', ?, ?)
");
$stmt->execute([$this->table, $entityId, json_encode($oldData, JSON_UNESCAPED_UNICODE)]);
}
public function getUnprocessed(int $limit = 100): array
{
$stmt = $this->pdo->prepare("
SELECT * FROM cdc_log
WHERE processed = FALSE
ORDER BY id ASC
LIMIT ?
");
$stmt->execute([$limit]);
return $stmt->fetchAll();
}
public function markProcessed(int $id): void
{
$this->pdo->prepare("UPDATE cdc_log SET processed = TRUE WHERE id = ?")->execute([$id]);
}
}
class UserServiceWithCDC
{
private PDO $pdo;
private ChangeTracker $tracker;
public function __construct(PDO $pdo, ChangeTracker $tracker)
{
$this->pdo = $pdo;
$this->tracker = $tracker;
}
public function createUser(string $name, string $email): int
{
$stmt = $this->pdo->prepare("INSERT INTO users (name, email) VALUES (?, ?)");
$stmt->execute([$name, $email]);
$id = (int)$this->pdo->lastInsertId();
$this->tracker->logInsert($id, ['name' => $name, 'email' => $email]);
return $id;
}
public function updateUser(int $id, array $data): void
{
$oldStmt = $this->pdo->prepare("SELECT * FROM users WHERE id = ?");
$oldStmt->execute([$id]);
$oldData = $oldStmt->fetch(PDO::FETCH_ASSOC);
$sets = [];
$params = [];
foreach ($data as $key => $value) {
$sets[] = "{$key} = ?";
$params[] = $value;
}
$params[] = $id;
$sql = "UPDATE users SET " . implode(', ', $sets) . " WHERE id = ?";
$this->pdo->prepare($sql)->execute($params);
$this->tracker->logUpdate($id, $oldData, $data);
}
public function deleteUser(int $id): void
{
$stmt = $this->pdo->prepare("SELECT * FROM users WHERE id = ?");
$stmt->execute([$id]);
$oldData = $stmt->fetch(PDO::FETCH_ASSOC);
$this->pdo->prepare("DELETE FROM users WHERE id = ?")->execute([$id]);
$this->tracker->logDelete($id, $oldData);
}
}
?>
CDC消费者将变更同步到其他系统:
```php
class CdcConsumer
{
private ChangeTracker $tracker;
private array $handlers = [];
public function __construct(ChangeTracker $tracker)
{
$this->tracker = $tracker;
}
public function registerHandler(string $operation, callable $handler): void
{
$this->handlers[$operation][] = $handler;
}
public function process(): int
{
$processed = 0;
$changes = $this->tracker->getUnprocessed(50);
foreach ($changes as $change) {
try {
$handlers = $this->handlers[$change['operation']] ?? [];
foreach ($handlers as $handler) {
$handler($change);
}
$this->tracker->markProcessed($change['id']);
$processed++;
} catch (\Exception $e) {
error_log("CDC处理失败: {$e->getMessage()}");
}
}
return $processed;
}
}
?>
CDC是数据同步和事件驱动架构的基础技术。通过捕获数据库变更日志,可以将数据同步到缓存、搜索引擎或数据仓库。CDC的关键是变更的顺序性和幂等性,确保数据最终一致。PHP实现的CDC适合轻量级的同步场景,大规模场景建议使用Debezium等专业工具。
