当前位置: 首页 > news >正文

PHP数据湖与数据联邦查询

PHP数据湖与数据联邦查询

数据湖集中存储各种格式的数据,数据联邦允许跨多个数据源的查询。今天说说PHP中数据湖和数据联邦的实现。

数据湖的核心是存储原始数据,不要求预先定义Schema。PHP可以将各种数据导入湖中,提供统一的查询接口。

```php
class DataLake
{
private string $storageDir;
private array $catalogs = [];

public function __construct(string $storageDir = '/var/data_lake')
{
$this->storageDir = rtrim($storageDir, '/');
if (!is_dir($this->storageDir)) {
mkdir($this->storageDir, 0755, true);
}
}

public function ingest(string $source, string $format, mixed $data): string
{
$path = date('Y/m/d');
$fullDir = "{$this->storageDir}/{$source}/{$path}";

if (!is_dir($fullDir)) {
mkdir($fullDir, 0755, true);
}

$filename = uniqid() . ".{$format}";
$filepath = "{$fullDir}/{$filename}";

$content = match ($format) {
'json' => json_encode($data, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT),
'csv' => $this->toCsv($data),
'txt' => is_string($data) ? $data : json_encode($data),
default => serialize($data),
};

file_put_contents($filepath, $content);

// 更新目录
$this->catalogs[$source][] = [
'path' => $filepath,
'format' => $format,
'size' => filesize($filepath),
'ingested_at' => date('Y-m-d H:i:s'),
];

return $filepath;
}

public function query(string $source, array $filters = []): array
{
$results = [];

if (!isset($this->catalogs[$source])) {
$this->scanSource($source);
}

foreach ($this->catalogs[$source] ?? [] as $entry) {
$data = $this->readFile($entry);
if ($this->matchFilters($data, $filters)) {
$results[] = $data;
}
}

return $results;
}

public function queryAll(array $filters = []): array
{
$results = [];
$sources = glob("{$this->storageDir}/*", GLOB_ONLYDIR);

foreach ($sources as $sourceDir) {
$source = basename($sourceDir);
$sourceResults = $this->query($source, $filters);
$results = array_merge($results, array_map(
fn($r) => array_merge(['_source' => $source], (array)$r),
$sourceResults
));
}

return $results;
}

private function scanSource(string $source): void
{
$sourceDir = "{$this->storageDir}/{$source}";
if (!is_dir($sourceDir)) return;

$files = new RecursiveIteratorIterator(
new RecursiveDirectoryIterator($sourceDir)
);

foreach ($files as $file) {
if ($file->isFile()) {
$format = $file->getExtension();
$this->catalogs[$source][] = [
'path' => $file->getPathname(),
'format' => $format,
'size' => $file->getSize(),
'ingested_at' => date('Y-m-d H:i:s', $file->getMTime()),
];
}
}
}

private function readFile(array $entry): mixed
{
$content = file_get_contents($entry['path']);

return match ($entry['format']) {
'json' => json_decode($content, true),
'csv' => $this->parseCsv($content),
default => $content,
};
}

private function matchFilters(mixed $data, array $filters): bool
{
if (empty($filters)) return true;
if (!is_array($data)) return false;

foreach ($filters as $key => $value) {
if (!isset($data[$key])) return false;
if (is_array($value) && $value['operator'] ?? '=' === '>') {
if ($data[$key] <= $value['value']) return false;
} elseif ($data[$key] !== $value) {
return false;
}
}

return true;
}

private function toCsv(array $data): string
{
if (empty($data)) return '';
$output = fopen('php://temp', 'r+');
fputcsv($output, array_keys((array)$data[0]));
foreach ($data as $row) {
fputcsv($output, (array)$row);
}
rewind($output);
return stream_get_contents($output);
}

private function parseCsv(string $content): array
{
$lines = explode("\n", trim($content));
if (empty($lines)) return [];
$headers = str_getcsv(array_shift($lines));
$data = [];
foreach ($lines as $line) {
$row = str_getcsv($line);
if (count($row) === count($headers)) {
$data[] = array_combine($headers, $row);
}
}
return $data;
}

public function getStats(): array
{
$totalFiles = 0;
$totalSize = 0;
$sources = [];

foreach ($this->catalogs as $source => $entries) {
$sourceFiles = count($entries);
$sourceSize = array_sum(array_column($entries, 'size'));
$sources[$source] = ['files' => $sourceFiles, 'size' => $sourceSize];
$totalFiles += $sourceFiles;
$totalSize += $sourceSize;
}

return [
'sources' => $sources,
'total_files' => $totalFiles,
'total_size' => round($totalSize / 1024 / 1024, 2) . 'MB',
];
}
}

$lake = new DataLake();

$lake->ingest('orders', 'json', [
['order_id' => 1, 'amount' => 100, 'status' => 'paid'],
['order_id' => 2, 'amount' => 200, 'status' => 'unpaid'],
]);

$lake->ingest('users', 'csv', [
['user_id' => 1, 'name' => '张三'],
['user_id' => 2, 'name' => '李四'],
]);

print_r($lake->getStats());
?>
>

数据联邦查询可以从多个数据源获取数据并合并结果:

```php
interface DataSource
{
public function query(string $sql): array;
public function getSchema(): array;
}

class MySqlSource implements DataSource
{
private PDO $pdo;

public function __construct(PDO $pdo)
{
$this->pdo = $pdo;
}

public function query(string $sql): array
{
return $this->pdo->query($sql)->fetchAll(PDO::FETCH_ASSOC);
}

public function getSchema(): array
{
$tables = $this->pdo->query("SHOW TABLES")->fetchAll(PDO::FETCH_COLUMN);
return ['type' => 'mysql', 'tables' => $tables];
}
}

class CsvSource implements DataSource
{
private string $dir;

public function __construct(string $dir)
{
$this->dir = $dir;
}

public function query(string $sql): array
{
// 简化实现
$parts = explode(' ', $sql);
$table = trim($parts[3] ?? '', '`');
$file = "{$this->dir}/{$table}.csv";

if (!file_exists($file)) return [];

$data = array_map('str_getcsv', file($file));
$headers = array_shift($data);
$results = [];

foreach ($data as $row) {
if (count($row) === count($headers)) {
$results[] = array_combine($headers, $row);
}
}

return $results;
}

public function getSchema(): array
{
$files = glob("{$this->dir}/*.csv");
return ['type' => 'csv', 'tables' => array_map(fn($f) => basename($f, '.csv'), $files)];
}
}

class FederationEngine
{
private array $sources = [];

public function addSource(string $name, DataSource $source): void
{
$this->sources[$name] = $source;
}

public function getSources(): array
{
return array_keys($this->sources);
}

public function federatedQuery(string $query, array $sourceFilter = []): array
{
$results = [];

foreach ($this->sources as $name => $source) {
if (!empty($sourceFilter) && !in_array($name, $sourceFilter)) continue;

try {
$data = $source->query($query);
$results[$name] = [
'success' => true,
'count' => count($data),
'data' => $data,
];
} catch (\Exception $e) {
$results[$name] = [
'success' => false,
'error' => $e->getMessage(),
];
}
}

return $results;
}
}

$engine = new FederationEngine();
$engine->addSource('mysql', new MySqlSource(new PDO('mysql:host=localhost;dbname=test', 'root', '')));
$engine->addSource('csv', new CsvSource('/tmp/csv_data'));

echo "数据源: " . implode(', ', $engine->getSources()) . "\n";
$results = $engine->federatedQuery("SELECT * FROM users");
print_r($results);
?>

数据湖和数据联邦是数据管理的进阶方案。数据湖存储原始数据不丢失信息,数据联邦提供统一的查询入口。PHP实现的方案适合小规模场景,生产环境建议使用专业的湖仓一体平台。

http://www.jsqmd.com/news/940572/

相关文章:

  • 【紧急预警】Claude v3.5决策树已悄然升级:3大底层分裂准则变更,不更新分析框架将导致响应偏差率飙升214%
  • 手把手教你用uniCloud云函数搞定UniPush在线消息推送(附完整代码)
  • KUKA KRC-Nexxt 3.2.4.45 PROFINET通信功能增强安装包(含认证文件、配置工具与多语言支持)
  • 惠州市2026年黄金回收白银回收铂金回收门店指南 五家诚信店铺排行榜+联系方式电话推荐 - 大熊猫898989
  • [开源] 科研样本外送检测全链路追踪系统:面向科研协调与检验管理的五节点时间轴工具
  • Spring Boot项目里@Async注解不生效?别急,先检查这5个配置(附线程池调优建议)
  • 别再手动复制了!用Godot拖放功能5分钟搞定游戏背包系统(附完整GDScript代码)
  • ESP8266驱动WS2812B灯带:WLED固件配置与xLights灯光秀集成指南
  • 家庭创客指南:用Arduino与树莓派复刻互动科技展
  • 河源市2026年黄金回收白银回收铂金回收门店指南 五家诚信店铺排行榜+联系方式电话推荐 - 大熊猫898989
  • 避坑指南:在Ubuntu 20.04服务器上为CARLA 0.9.13手动寻找并安装正确的Python 3.8客户端whl文件
  • GTA5线上小助手:免费开源的终极游戏增强工具,彻底改变你的洛圣都体验
  • 鸣潮自动化工具终极指南:3步配置解放双手的游戏助手
  • 黑神话悟空启动无反应?一个神奇的解决方案:修改系统时间到2026.04.28
  • 用Stable Diffusion和DDIM反演搞点‘坏’事:手把手教你复现DiffAttack对抗攻击
  • LAGO优化算法在心血管健康管理中的仿真应用与效果评估
  • 生物信息学工具开发:从.NET框架到统一数据模型与算法集成
  • AI驱动云技术自主化:从自动化到预见式架构的演进与实践
  • Dev Containers与CI/CD实战:构建自动化开发环境与高效研发流程
  • 1小时上线AI日志助手:基于现有Fluentd/Kafka零代码改造的轻量级集成模板
  • PyTorch猫狗图像分类三模型实战包:含DNN/RNN/CNN完整训练推理代码与结构化目录
  • 从零开始,用GitHub Pages搭建你的个人学术主页
  • 香橙派AIpro散热风扇手动调节保姆级教程:用npu-smi命令告别过热降频
  • 从图像风格迁移到域自适应:深入浅出聊聊傅里叶变换(FFT)在CV中的神奇应用(附FDA源码解读)
  • Narwhal:连接复杂时空数据与WorldWide Telescope的可视化桥梁
  • 别急着重启!用Sysinternals RAMMap揪出VMware虚拟机偷吃内存的元凶(附定期清理脚本)
  • 告别重复输入密码:用SSH-Agent管理你的GitHub、GitLab和Hugging Face密钥
  • 为什么OpenAI从未提及Sora 2的“动态帧率蒸馏”?揭秘其视频生成延迟降低63%的核心黑箱模块,
  • 微软新方案:软硬协同让可穿戴设备续航倍增
  • BilibiliDown:跨平台B站视频下载完整解决方案与实战指南