PHP数据湖与数据联邦查询
PHP数据湖与数据联邦查询
数据湖集中存储各种格式的数据,数据联邦允许跨多个数据源的查询。今天说说PHP中数据湖和数据联邦的实现。
数据湖的核心是存储原始数据,不要求预先定义Schema。PHP可以将各种数据导入湖中,提供统一的查询接口。
```php
class DataLake
{
private string $storageDir;
private array $catalogs = [];
public function __construct(string $storageDir = '/var/data_lake')
{
$this->storageDir = rtrim($storageDir, '/');
if (!is_dir($this->storageDir)) {
mkdir($this->storageDir, 0755, true);
}
}
public function ingest(string $source, string $format, mixed $data): string
{
$path = date('Y/m/d');
$fullDir = "{$this->storageDir}/{$source}/{$path}";
if (!is_dir($fullDir)) {
mkdir($fullDir, 0755, true);
}
$filename = uniqid() . ".{$format}";
$filepath = "{$fullDir}/{$filename}";
$content = match ($format) {
'json' => json_encode($data, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT),
'csv' => $this->toCsv($data),
'txt' => is_string($data) ? $data : json_encode($data),
default => serialize($data),
};
file_put_contents($filepath, $content);
// 更新目录
$this->catalogs[$source][] = [
'path' => $filepath,
'format' => $format,
'size' => filesize($filepath),
'ingested_at' => date('Y-m-d H:i:s'),
];
return $filepath;
}
public function query(string $source, array $filters = []): array
{
$results = [];
if (!isset($this->catalogs[$source])) {
$this->scanSource($source);
}
foreach ($this->catalogs[$source] ?? [] as $entry) {
$data = $this->readFile($entry);
if ($this->matchFilters($data, $filters)) {
$results[] = $data;
}
}
return $results;
}
public function queryAll(array $filters = []): array
{
$results = [];
$sources = glob("{$this->storageDir}/*", GLOB_ONLYDIR);
foreach ($sources as $sourceDir) {
$source = basename($sourceDir);
$sourceResults = $this->query($source, $filters);
$results = array_merge($results, array_map(
fn($r) => array_merge(['_source' => $source], (array)$r),
$sourceResults
));
}
return $results;
}
private function scanSource(string $source): void
{
$sourceDir = "{$this->storageDir}/{$source}";
if (!is_dir($sourceDir)) return;
$files = new RecursiveIteratorIterator(
new RecursiveDirectoryIterator($sourceDir)
);
foreach ($files as $file) {
if ($file->isFile()) {
$format = $file->getExtension();
$this->catalogs[$source][] = [
'path' => $file->getPathname(),
'format' => $format,
'size' => $file->getSize(),
'ingested_at' => date('Y-m-d H:i:s', $file->getMTime()),
];
}
}
}
private function readFile(array $entry): mixed
{
$content = file_get_contents($entry['path']);
return match ($entry['format']) {
'json' => json_decode($content, true),
'csv' => $this->parseCsv($content),
default => $content,
};
}
private function matchFilters(mixed $data, array $filters): bool
{
if (empty($filters)) return true;
if (!is_array($data)) return false;
foreach ($filters as $key => $value) {
if (!isset($data[$key])) return false;
if (is_array($value) && $value['operator'] ?? '=' === '>') {
if ($data[$key] <= $value['value']) return false;
} elseif ($data[$key] !== $value) {
return false;
}
}
return true;
}
private function toCsv(array $data): string
{
if (empty($data)) return '';
$output = fopen('php://temp', 'r+');
fputcsv($output, array_keys((array)$data[0]));
foreach ($data as $row) {
fputcsv($output, (array)$row);
}
rewind($output);
return stream_get_contents($output);
}
private function parseCsv(string $content): array
{
$lines = explode("\n", trim($content));
if (empty($lines)) return [];
$headers = str_getcsv(array_shift($lines));
$data = [];
foreach ($lines as $line) {
$row = str_getcsv($line);
if (count($row) === count($headers)) {
$data[] = array_combine($headers, $row);
}
}
return $data;
}
public function getStats(): array
{
$totalFiles = 0;
$totalSize = 0;
$sources = [];
foreach ($this->catalogs as $source => $entries) {
$sourceFiles = count($entries);
$sourceSize = array_sum(array_column($entries, 'size'));
$sources[$source] = ['files' => $sourceFiles, 'size' => $sourceSize];
$totalFiles += $sourceFiles;
$totalSize += $sourceSize;
}
return [
'sources' => $sources,
'total_files' => $totalFiles,
'total_size' => round($totalSize / 1024 / 1024, 2) . 'MB',
];
}
}
$lake = new DataLake();
$lake->ingest('orders', 'json', [
['order_id' => 1, 'amount' => 100, 'status' => 'paid'],
['order_id' => 2, 'amount' => 200, 'status' => 'unpaid'],
]);
$lake->ingest('users', 'csv', [
['user_id' => 1, 'name' => '张三'],
['user_id' => 2, 'name' => '李四'],
]);
print_r($lake->getStats());
?>
>
数据联邦查询可以从多个数据源获取数据并合并结果:
```php
interface DataSource
{
public function query(string $sql): array;
public function getSchema(): array;
}
class MySqlSource implements DataSource
{
private PDO $pdo;
public function __construct(PDO $pdo)
{
$this->pdo = $pdo;
}
public function query(string $sql): array
{
return $this->pdo->query($sql)->fetchAll(PDO::FETCH_ASSOC);
}
public function getSchema(): array
{
$tables = $this->pdo->query("SHOW TABLES")->fetchAll(PDO::FETCH_COLUMN);
return ['type' => 'mysql', 'tables' => $tables];
}
}
class CsvSource implements DataSource
{
private string $dir;
public function __construct(string $dir)
{
$this->dir = $dir;
}
public function query(string $sql): array
{
// 简化实现
$parts = explode(' ', $sql);
$table = trim($parts[3] ?? '', '`');
$file = "{$this->dir}/{$table}.csv";
if (!file_exists($file)) return [];
$data = array_map('str_getcsv', file($file));
$headers = array_shift($data);
$results = [];
foreach ($data as $row) {
if (count($row) === count($headers)) {
$results[] = array_combine($headers, $row);
}
}
return $results;
}
public function getSchema(): array
{
$files = glob("{$this->dir}/*.csv");
return ['type' => 'csv', 'tables' => array_map(fn($f) => basename($f, '.csv'), $files)];
}
}
class FederationEngine
{
private array $sources = [];
public function addSource(string $name, DataSource $source): void
{
$this->sources[$name] = $source;
}
public function getSources(): array
{
return array_keys($this->sources);
}
public function federatedQuery(string $query, array $sourceFilter = []): array
{
$results = [];
foreach ($this->sources as $name => $source) {
if (!empty($sourceFilter) && !in_array($name, $sourceFilter)) continue;
try {
$data = $source->query($query);
$results[$name] = [
'success' => true,
'count' => count($data),
'data' => $data,
];
} catch (\Exception $e) {
$results[$name] = [
'success' => false,
'error' => $e->getMessage(),
];
}
}
return $results;
}
}
$engine = new FederationEngine();
$engine->addSource('mysql', new MySqlSource(new PDO('mysql:host=localhost;dbname=test', 'root', '')));
$engine->addSource('csv', new CsvSource('/tmp/csv_data'));
echo "数据源: " . implode(', ', $engine->getSources()) . "\n";
$results = $engine->federatedQuery("SELECT * FROM users");
print_r($results);
?>
数据湖和数据联邦是数据管理的进阶方案。数据湖存储原始数据不丢失信息,数据联邦提供统一的查询入口。PHP实现的方案适合小规模场景,生产环境建议使用专业的湖仓一体平台。
