PHP数据管道与流式计算框架
PHP数据管道与流式计算框架
数据管道是数据处理的核心模式。PHP可以实现简单的流式计算框架,对数据流进行实时处理。今天说说PHP数据管道的设计。
数据管道的核心是各阶段的处理节点。数据从输入节点流入,经过多个处理节点,最终从输出节点流出。
```php
interface PipelineNode
{
public function process(mixed $data): mixed;
public function setName(string $name): void;
public function getName(): string;
}
class InputNode implements PipelineNode
{
private string $name = 'input';
public function setName(string $name): void
{
$this->name = $name;
}
public function getName(): string
{
return $this->name;
}
public function process(mixed $data): mixed
{
echo "输入: " . json_encode($data) . "\n";
return $data;
}
}
class FilterNode implements PipelineNode
{
private string $name = 'filter';
private callable $predicate;
public function __construct(callable $predicate)
{
$this->predicate = $predicate;
}
public function setName(string $name): void
{
$this->name = $name;
}
public function getName(): string
{
return $this->name;
}
public function process(mixed $data): mixed
{
$result = ($this->predicate)($data);
echo "过滤: " . ($result ? '通过' : '丢弃') . "\n";
return $result ? $data : null;
}
}
class TransformNode implements PipelineNode
{
private string $name = 'transform';
private callable $transformer;
public function __construct(callable $transformer)
{
$this->transformer = $transformer;
}
public function setName(string $name): void
{
$this->name = $name;
}
public function getName(): string
{
return $this->name;
}
public function process(mixed $data): mixed
{
$result = ($this->transformer)($data);
echo "转换: " . json_encode($data) . " -> " . json_encode($result) . "\n";
return $result;
}
}
class AggregateNode implements PipelineNode
{
private string $name = 'aggregate';
private array $buffer = [];
private int $batchSize;
private callable $aggregator;
public function __construct(int $batchSize, callable $aggregator)
{
$this->batchSize = $batchSize;
$this->aggregator = $aggregator;
}
public function setName(string $name): void
{
$this->name = $name;
}
public function getName(): string
{
return $this->name;
}
public function process(mixed $data): mixed
{
$this->buffer[] = $data;
if (count($this->buffer) >= $this->batchSize) {
$result = ($this->aggregator)($this->buffer);
$this->buffer = [];
echo "聚合: " . json_encode($result) . "\n";
return $result;
}
return null;
}
public function flush(): mixed
{
if (empty($this->buffer)) return null;
$result = ($this->aggregator)($this->buffer);
$this->buffer = [];
return $result;
}
}
class OutputNode implements PipelineNode
{
private string $name = 'output';
public function setName(string $name): void
{
$this->name = $name;
}
public function getName(): string
{
return $this->name;
}
public function process(mixed $data): mixed
{
echo "输出: " . json_encode($data) . "\n";
return $data;
}
}
class DataPipeline
{
private array $nodes = [];
public function addNode(PipelineNode $node): void
{
$this->nodes[] = $node;
}
public function process(mixed $data): array
{
$results = [];
foreach ($data as $item) {
$current = $item;
foreach ($this->nodes as $node) {
$current = $node->process($current);
if ($current === null) break;
}
if ($current !== null) {
$results[] = $current;
}
}
// 处理聚合节点中剩余的数据
foreach ($this->nodes as $node) {
if ($node instanceof AggregateNode) {
$remaining = $node->flush();
if ($remaining !== null) {
$results[] = $remaining;
}
}
}
return $results;
}
}
// 创建管道
$pipeline = new DataPipeline();
$pipeline->addNode(new InputNode());
$pipeline->addNode(new FilterNode(fn($item) => $item['status'] === 'paid'));
$pipeline->addNode(new TransformNode(fn($item) => [
'order_id' => $item['order_id'],
'amount' => $item['amount'],
'user' => $item['user'],
'final_amount' => $item['amount'] * 0.95,
]));
$pipeline->addNode(new OutputNode());
$orders = [
['order_id' => 1, 'amount' => 100, 'user' => '张三', 'status' => 'paid'],
['order_id' => 2, 'amount' => 200, 'user' => '李四', 'status' => 'unpaid'],
['order_id' => 3, 'amount' => 300, 'user' => '王五', 'status' => 'paid'],
];
echo "开始处理订单数据...\n";
$results = $pipeline->process($orders);
echo "\n处理完成,共 " . count($results) . " 条结果\n";
?>
```
窗口计算是流式计算的常见模式:
```php
class WindowCalculator
{
private array $window = [];
private int $windowSize;
public function __construct(int $windowSize = 5)
{
$this->windowSize = $windowSize;
}
public function add(mixed $value): void
{
$this->window[] = $value;
if (count($this->window) > $this->windowSize) {
array_shift($this->window);
}
}
public function sum(): float
{
return array_sum($this->window);
}
public function average(): float
{
if (empty($this->window)) return 0;
return array_sum($this->window) / count($this->window);
}
public function max(): float
{
return max($this->window);
}
public function min(): float
{
return min($this->window);
}
public function count(): int
{
return count($this->window);
}
public function reset(): void
{
$this->window = [];
}
}
$calculator = new WindowCalculator(3);
// 模拟流式数据
$data = [10, 20, 30, 40, 50, 60, 70, 80];
echo "滑动窗口计算:\n";
foreach ($data as $i => $value) {
$calculator->add($value);
echo " 添加 {$value}: 平均={$calculator->average()}, 和={$calculator->sum()}\n";
}
?>
```
PHP的数据管道实现虽然不如Kafka Streams或Flink专业,但在数据量不大的场景下完全够用。管道模式让数据处理逻辑模块化,每个节点各司其职,便于测试和维护。适合日志处理、数据清洗、实时计算等场景。
