当前位置: 首页 > news >正文

PHP数据管道与流式计算框架

PHP数据管道与流式计算框架

数据管道是数据处理的核心模式。PHP可以实现简单的流式计算框架,对数据流进行实时处理。今天说说PHP数据管道的设计。

数据管道的核心是各阶段的处理节点。数据从输入节点流入,经过多个处理节点,最终从输出节点流出。

```php
interface PipelineNode
{
public function process(mixed $data): mixed;
public function setName(string $name): void;
public function getName(): string;
}

class InputNode implements PipelineNode
{
private string $name = 'input';

public function setName(string $name): void
{
$this->name = $name;
}

public function getName(): string
{
return $this->name;
}

public function process(mixed $data): mixed
{
echo "输入: " . json_encode($data) . "\n";
return $data;
}
}

class FilterNode implements PipelineNode
{
private string $name = 'filter';
private callable $predicate;

public function __construct(callable $predicate)
{
$this->predicate = $predicate;
}

public function setName(string $name): void
{
$this->name = $name;
}

public function getName(): string
{
return $this->name;
}

public function process(mixed $data): mixed
{
$result = ($this->predicate)($data);
echo "过滤: " . ($result ? '通过' : '丢弃') . "\n";
return $result ? $data : null;
}
}

class TransformNode implements PipelineNode
{
private string $name = 'transform';
private callable $transformer;

public function __construct(callable $transformer)
{
$this->transformer = $transformer;
}

public function setName(string $name): void
{
$this->name = $name;
}

public function getName(): string
{
return $this->name;
}

public function process(mixed $data): mixed
{
$result = ($this->transformer)($data);
echo "转换: " . json_encode($data) . " -> " . json_encode($result) . "\n";
return $result;
}
}

class AggregateNode implements PipelineNode
{
private string $name = 'aggregate';
private array $buffer = [];
private int $batchSize;
private callable $aggregator;

public function __construct(int $batchSize, callable $aggregator)
{
$this->batchSize = $batchSize;
$this->aggregator = $aggregator;
}

public function setName(string $name): void
{
$this->name = $name;
}

public function getName(): string
{
return $this->name;
}

public function process(mixed $data): mixed
{
$this->buffer[] = $data;

if (count($this->buffer) >= $this->batchSize) {
$result = ($this->aggregator)($this->buffer);
$this->buffer = [];
echo "聚合: " . json_encode($result) . "\n";
return $result;
}

return null;
}

public function flush(): mixed
{
if (empty($this->buffer)) return null;
$result = ($this->aggregator)($this->buffer);
$this->buffer = [];
return $result;
}
}

class OutputNode implements PipelineNode
{
private string $name = 'output';

public function setName(string $name): void
{
$this->name = $name;
}

public function getName(): string
{
return $this->name;
}

public function process(mixed $data): mixed
{
echo "输出: " . json_encode($data) . "\n";
return $data;
}
}

class DataPipeline
{
private array $nodes = [];

public function addNode(PipelineNode $node): void
{
$this->nodes[] = $node;
}

public function process(mixed $data): array
{
$results = [];

foreach ($data as $item) {
$current = $item;
foreach ($this->nodes as $node) {
$current = $node->process($current);
if ($current === null) break;
}

if ($current !== null) {
$results[] = $current;
}
}

// 处理聚合节点中剩余的数据
foreach ($this->nodes as $node) {
if ($node instanceof AggregateNode) {
$remaining = $node->flush();
if ($remaining !== null) {
$results[] = $remaining;
}
}
}

return $results;
}
}

// 创建管道
$pipeline = new DataPipeline();
$pipeline->addNode(new InputNode());
$pipeline->addNode(new FilterNode(fn($item) => $item['status'] === 'paid'));
$pipeline->addNode(new TransformNode(fn($item) => [
'order_id' => $item['order_id'],
'amount' => $item['amount'],
'user' => $item['user'],
'final_amount' => $item['amount'] * 0.95,
]));
$pipeline->addNode(new OutputNode());

$orders = [
['order_id' => 1, 'amount' => 100, 'user' => '张三', 'status' => 'paid'],
['order_id' => 2, 'amount' => 200, 'user' => '李四', 'status' => 'unpaid'],
['order_id' => 3, 'amount' => 300, 'user' => '王五', 'status' => 'paid'],
];

echo "开始处理订单数据...\n";
$results = $pipeline->process($orders);
echo "\n处理完成,共 " . count($results) . " 条结果\n";
?>
```

窗口计算是流式计算的常见模式:

```php
class WindowCalculator
{
private array $window = [];
private int $windowSize;

public function __construct(int $windowSize = 5)
{
$this->windowSize = $windowSize;
}

public function add(mixed $value): void
{
$this->window[] = $value;
if (count($this->window) > $this->windowSize) {
array_shift($this->window);
}
}

public function sum(): float
{
return array_sum($this->window);
}

public function average(): float
{
if (empty($this->window)) return 0;
return array_sum($this->window) / count($this->window);
}

public function max(): float
{
return max($this->window);
}

public function min(): float
{
return min($this->window);
}

public function count(): int
{
return count($this->window);
}

public function reset(): void
{
$this->window = [];
}
}

$calculator = new WindowCalculator(3);

// 模拟流式数据
$data = [10, 20, 30, 40, 50, 60, 70, 80];

echo "滑动窗口计算:\n";
foreach ($data as $i => $value) {
$calculator->add($value);
echo " 添加 {$value}: 平均={$calculator->average()}, 和={$calculator->sum()}\n";
}
?>
```

PHP的数据管道实现虽然不如Kafka Streams或Flink专业,但在数据量不大的场景下完全够用。管道模式让数据处理逻辑模块化,每个节点各司其职,便于测试和维护。适合日志处理、数据清洗、实时计算等场景。

http://www.jsqmd.com/news/941220/

相关文章:

  • 一款免安装的窗口调试小工具,能查句柄、看控件内容、改窗口状态
  • 2026金价破970,无锡你的闲置旧金该去哪卖高价? - 奢侈品回收测评
  • 2026手机制作蓝底证件照保姆级教程!免费换底色方法大全 - AI测评专家
  • 数据科学中的多元化与算法公平性:从理论到工程实践
  • 如何在10分钟内让Switch手柄成为你的PC游戏利器?BetterJoy完全指南
  • STM32F407三轴CNC控制器固件包:兼容GRBL、500kHz脉冲输出、全功能驱动模块
  • 杰理之清除TWS配对的功能(恢复出厂设置)【篇】
  • VLA未死但需成长,具身智能数据工厂战争谁能笑到最后?
  • 浏览器脚本自动化革命:为什么ScriptCat是提升效率的终极选择?
  • 从无人机到智能车:手把手教你用自适应Kalman滤波搞定传感器数据融合(Python实战)
  • python新手福音:用快马ai生成你的第一个pycharm风格实战项目
  • 第一次课
  • GBase 8a MPP Cluster数据库之虚拟集群技术解析
  • 不止是解析工具:用GROBID+Python构建你的学术PDF信息自动提取流水线
  • Python写的汽车UDS诊断工具库,支持CAN通信、ISO-14229服务和J2534硬件
  • STM32F103C8数控DC-DC电源完整开发包|含0.1V步进调压KEIL工程、全外设驱动源码与可烧录镜像
  • 3分钟让你的Windows右键菜单秒开如飞!ContextMenuManager完全使用指南
  • Linux 系统新玩法:用 NVIDIA GPU 显存作交换空间,提升可寻址内存
  • 保姆级教程:在Ubuntu 22.04上从源码编译FLEXPART-WRF(含依赖库避坑指南)
  • 聚丙烯阻燃剂技术解析与济南合规厂家选型参考 - 奔跑123
  • 别再死记硬背了!用Python+OpenCV手把手带你标定相机内参K矩阵(附完整代码)
  • 苏州客厅地毯品牌哪家专业
  • 开放维修数据标准 ORDS:助力小型电气和电子产品维修数据整合
  • Horseshoe先验在稀疏信号预测中的理论最优性与自适应应用
  • 2026年最新黄石市黄金回收铂金回收白银回收彩金回收解析:口碑排行前五门店筛选及避坑要点和联系方式推荐 - 亦辰小黄鸭
  • 放弃传统图传?用OpenIPC+WFB-NG+RTL8812AU打造百元级开源高清FPV方案实战
  • UE5 UMG性能优化实战:如何高效绘制实时更新的多曲线图表?
  • BetterJoy深度解析:让Switch手柄在Windows上获得完美XInput支持的技术方案
  • Gmail语言模型功能“太热情”,用户不堪其扰告别16年“老伙伴”
  • 新手福音:在快马平台通过ai生成代码学习python基础