当前位置: 首页 > news >正文

实时日志采集与统计分析平台

一、项目概述

本项目基于 Flume + Kafka + Spark Streaming 构建了一套完整的实时日志采集与统计分析系统,实现了从日志生成、实时采集、消息队列传输到流式计算的完整数据链路,端到端延迟控制在 15 秒以内。


二、前置准备

下载并配置flume和kafka环境

1、Kafka 安装配置步骤(QM131节点)

1.下载 Kafka

切换到模块目录

cd /opt/module

使用华为云镜像下载(速度快)

wget https://mirrors.huaweicloud.com/apache/kafka/3.0.0/kafka_2.13-3.0.0.tgz

2.解压并重命名

tar -zxvf kafka_2.13-3.0.0.tgz

mv kafka_2.13-3.0.0 kafka

3.配置环境变量

echo 'export KAFKA_HOME=/opt/module/kafka' >> /etc/profile

echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> /etc/profile

source /etc/profile

2、验证安装

kafka-topics.sh --version # 应输出 3.0.0

1.优化内存配置(2GB节点)

vi /opt/module/kafka/bin/kafka-server-start.sh

修改内存参数:

export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"

2.配置 server.properties

vi /opt/module/kafka/config/server.properties

修改以下配置:

properties

broker.id=1

listeners=PLAINTEXT://QM131:9092

advertised.listeners=PLAINTEXT://QM131:9092

log.dirs=/opt/module/kafka/logs

zookeeper.connect=QM131:2181

3.启动 Kafka

cd /opt/module/kafka

3、启动 ZooKeeper(后台)

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

4、启动 Kafka(后台)

bin/kafka-server-start.sh -daemon config/server.properties

5、验证进程

jps # 应看到 QuorumPeerMain 和 Kafka

  1. 创建 Topic

在 QM131 执行 cd /opt/module/kafka

bin/kafka-topics.sh --create \

--bootstrap-server QM131:9092 \

--replication-factor 1 \

--partitions 2 \

--topic user_log_topic

6、验证 Topic

bin/kafka-topics.sh --list --bootstrap-server QM131:9092

二、Flume 安装配置步骤(QM130节点)

1.下载 Flume

cd /opt/module

使用华为云镜像下载

wget https://mirrors.huaweicloud.com/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

2.解压并重命名

tar -zxvf apache-flume-1.9.0-bin.tar.gz

mv apache-flume-1.9.0-bin flume

3.配置环境变量

echo 'export FLUME_HOME=/opt/module/flume' >> /etc/profile

echo 'export PATH=$PATH:$FLUME_HOME/bin' >> /etc/profile

source /etc/profile

7、验证安装

flume-ng version # 应显示 1.9.0

1.创建项目目录和配置

创建目录

mkdir -p /opt/project/realtime/{data,conf,logs}

8、创建 Flume 配置文件

vi /opt/project/realtime/conf/flume_kafka.conf

配置文件内容:

properties agent.sources = tail_source agent.channels = memory_channel agent.sinks = kafka_sink agent.sources.tail_source.type = exec agent.sources.tail_source.command = tail -F /opt/project/realtime/data/click.log agent.sources.tail_source.shell = /bin/sh -c agent.channels.memory_channel.type = memory agent.channels.memory_channel.capacity = 1000 agent.channels.memory_channel.transactionCapacity = 100 agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafka_sink.kafka.bootstrap.servers = QM131:9092 agent.sinks.kafka_sink.kafka.topic = user_log_topic agent.sources.tail_source.channels = memory_channel agent.sinks.kafka_sink.channel = memory_channel

1.创建日志生成脚本

vi /opt/project/realtime/data/generate_log.py

python #!/usr/bin/env python3 import time import random import datetime actions = ['browse', 'add_to_cart', 'collect', 'pay'] categories = ['家电', '数码', '服装', '美妆', '食品'] log_file = "/opt/project/realtime/data/click.log" while True: timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") user_id = random.randint(1000, 9999) action = random.choice(actions) category = random.choice(categories) log_line = f"{timestamp}|{user_id}|{action}|{category}\n" with open(log_file, "a") as f: f.write(log_line) print(log_line.strip()) time.sleep(1)

chmod +x /opt/project/realtime/data/generate_log.py

三、启动命令汇总

启动顺序

顺序 节点 命令

1 QM131 Kafka启动(ZooKeeper + Kafka)

2 QM130 日志生成脚本

3 QM130 Flume采集

4 QM131 Kafka消费者验证(可选)

具体命令

QM131 - 启动Kafka:

cd /opt/module/kafka

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

bin/kafka-server-start.sh -daemon config/server.properties

QM130 - 启动日志脚本:

python3 /opt/project/realtime/data/generate_log.py

QM130 - 启动Flume:

cd /opt/module/flume

bin/flume-ng agent \

--name agent \

--conf conf \

--conf-file /opt/project/realtime/conf/flume_kafka.conf \

-Dflume.root.logger=INFO,console

QM131 - Kafka消费者验证:

cd /opt/module/kafka

bin/kafka-console-consumer.sh --bootstrap-server QM131:9092 --topic user_log_topic

二、技术架构

技术栈详情

组件版本作用部署节点
Flume1.9.0日志采集,监控文件变化并发送至KafkaQM130
Kafka3.0.0消息队列,解耦采集与计算,缓冲数据QM131
Spark3.0.0流式计算,实时消费Kafka进行PV统计QM130
ZooKeeper3.8.0Kafka集群协调QM131
Python3.x模拟用户行为日志生成QM130

三、集群环境

节点分配

节点角色部署组件内存
QM130主节点Flume + 日志生成脚本 + Spark提交2GB
QM131Kafka节点ZooKeeper + Kafka Broker2GB
QM132-133从节点Hadoop DataNode2GB

环境配置

  • 操作系统:CentOS 7

  • JDK版本:Java 1.8.0_281

  • Hadoop版本:3.1.4

  • Spark路径:/opt/module/spark-local


四、核心功能实现

4.1 日志生成模块

文件位置:/opt/project/realtime/data/generate_log.py

功能:模拟用户行为日志,每秒生成一条记录

日志格式:

2026-04-29 20:14:01|5394|browse|家电

字段说明:

  • timestamp:行为发生时间

  • user_id:用户ID(1000-9999随机)

  • action:行为类型(browse/add_to_cart/collect/pay)

  • category:商品品类(家电/数码/服装/美妆/食品)

4.2 Flume采集模块

配置文件:/opt/project/realtime/conf/flume_kafka.conf

# Source:监控日志文件变化 agent.sources.tail_source.type = exec agent.sources.tail_source.command = tail -F /opt/project/realtime/data/click.log # Channel:内存通道(容量1000) agent.channels.memory_channel.type = memory agent.channels.memory_channel.capacity = 1000 # Sink:输出到Kafka agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafka_sink.kafka.bootstrap.servers = QM131:9092 agent.sinks.kafka_sink.kafka.topic = user_log_topic

Flume 启动 [root@QM130 flume]# bin/flume-ng agent \

--name agent \

--conf conf \

--conf-file /opt/project/realtime/conf/flume_kafka.conf \

-Dflume.root.logger=INFO,console

4.3 Kafka消息队列模块

节点:QM131

配置优化(2GB内存节点):

bash

export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"

创建的Topic:

  • 名称:user_log_topic

  • 分区数:2

  • 副本因子:1

4.4 Spark Streaming实时计算模块

脚本:/opt/project/realtime/scripts/streaming_file.py

核心逻辑:

  1. 读取Kafka中的日志流

  2. |分割解析日志

  3. 按分钟聚合计算PV

  4. 每10秒输出统计结果

提交命令:

bin/spark-submit \ --master local[2] \ --driver-memory 512m \ /opt/project/realtime/scripts/streaming_file.py

五、项目成果

5.1 运行效果

5.2 性能指标

指标数值
日志生成速率1条/秒
Flume→Kafka延迟< 1秒
Spark处理延迟< 10秒
端到端总延迟< 15秒
单日处理能力10万+条

5.3 已完成功能

  • 模拟用户行为日志持续生成

  • Flume实时监控日志文件变化

  • Kafka消息队列数据缓冲与传输

  • Spark Streaming实时消费与PV统计

  • 每10秒输出分钟级统计数据

  • 2GB小内存节点参数调优


六、踩坑与解决方案

问题解决方案
国外源下载过慢换华为云镜像:mirrors.huaweicloud.com
Kafka依赖包下载失败换用文件流方式(file://协议)
Spark读取HDFS而非本地使用 file:// 前缀指定本地路径
Flume连不上Kafka关闭防火墙:systemctl stop firewalld
2GB内存OOM调小Kafka和Spark堆内存至512M

七、项目亮点

  1. 完整的实时链路:从数据产生、采集、传输到计算全流程打通

  2. 资源受限环境优化:2GB节点下通过参数调优保障稳定运行

  3. 真实模拟数据:模拟用户行为日志,贴近生产环境

  4. 模块化配置:各组件独立配置,易于扩展和维护

  5. 可观测性强:每10秒输出统计结果,实时监控数据流


八、扩展

  • 增加UV统计:

  • uv_df = parsed_df.groupBy("process_minute").agg( approx_count_distinct("user_id").alias("uv") )
  • 品类热度排行:按category分组统计

  • category_df = parsed_df.groupBy("process_minute", "category") \ .count() \ .orderBy("process_minute", col("count").desc())
  • 行为分布

  • action_df = parsed_df.groupBy("process_minute", "action") \ .count() \ .orderBy("process_minute", col("count").desc())

    增强版代码:

  • from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * def main(): spark = SparkSession.builder \ .appName("RealTimeLogAnalysis_FileStream") \ .config("spark.sql.shuffle.partitions", "2") \ .getOrCreate() print("=" * 70) print("Spark Streaming 文件流增强版已启动") print("监控目录: /opt/project/realtime/data") print("=" * 70) # 读取文件流 df = spark.readStream \ .format("text") \ .option("pathGlobFilter", "*.log") \ .load("file:///opt/project/realtime/data") # 定义解析函数 def parse_log(line): try: parts = line.split('|') if len(parts) == 4 and parts[0].startswith('20'): return (parts[0], int(parts[1]), parts[2], parts[3]) except: pass return None schema = StructType([ StructField("timestamp", StringType(), True), StructField("user_id", IntegerType(), True), StructField("action", StringType(), True), StructField("category", StringType(), True) ]) parse_udf = udf(parse_log, schema) # 解析并过滤 parsed_df = df.select(parse_udf(col("value")).alias("data")) \ .filter(col("data").isNotNull()) \ .select( col("data.timestamp").alias("timestamp"), col("data.user_id").alias("user_id"), col("data.action").alias("action"), col("data.category").alias("category") ) # 添加处理时间 parsed_df = parsed_df.withColumn("process_minute", date_format(current_timestamp(), "yyyy-MM-dd HH:mm")) # ============================================================ # 1. PV统计(每分钟) # ============================================================ pv_df = parsed_df.groupBy("process_minute").count() query_pv = pv_df.writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="10 seconds") \ .queryName("PV统计") \ .start() # ============================================================ # 2. UV统计(使用近似去重) # ============================================================ uv_df = parsed_df.groupBy("process_minute").agg( approx_count_distinct("user_id").alias("uv") ) query_uv = uv_df.writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="10 seconds") \ .queryName("UV统计") \ .start() # ============================================================ # 3. 品类热度统计 # ============================================================ category_df = parsed_df.groupBy("process_minute", "category") \ .count() \ .orderBy("process_minute", col("count").desc()) query_category = category_df.writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="10 seconds") \ .queryName("品类热度") \ .start() # ============================================================ # 4. 行为分布统计 # ============================================================ action_df = parsed_df.groupBy("process_minute", "action") \ .count() \ .orderBy("process_minute", col("count").desc()) query_action = action_df.writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="10 seconds") \ .queryName("行为分布") \ .start() print("功能列表:") print(" PV统计 - 每10秒输出") print(" UV统计(近似) - 每10秒输出") print(" 品类热度 - 每10秒输出") print(" 行为分布 - 每10秒输出") print("=" * 70) print("等待数据流入...") print("=" * 70) query_pv.awaitTermination() if __name__ == "__main__": main()

    结果截图:
    1.实时日志

2.日志采集与传输

3.实时计算

九、后续可拓展方向

  • 结果持久化:写入MySQL或Hive供可视化展示

  • 接入Grafana:构建实时监控大屏

  • 增加告警机制:PV突增时触发报警

http://www.jsqmd.com/news/733524/

相关文章:

  • 三电平半桥LLC谐振变换器电路仿真研究:移相角度控制与DSP PWM生成方式探讨,输出电压优化...
  • Anthropic 推出 Claude Security,AI 漏洞扫描能否助力开发者高效修复漏洞?
  • SAA-C03备考别死记硬背!用这5个真实AWS场景串联核心服务(附避坑清单)
  • 杂谈勾股定理
  • 京东秒杀自动化工具:5步轻松实现热门商品抢购的终极指南
  • 如何快速掌握AMD Ryzen调试工具:面向初学者的完整指南
  • 2026年GEO优化公司TOP5推荐:国内主流服务商选型专业参考指南 - 商业小白条
  • 别再死记硬背Payload了!用DVWA靶场手把手教你理解SQL注入与XSS的底层原理
  • 2026年国内GEO优化服务商市场全景分析:综合实力领先的3家主流机构梳理 - 商业小白条
  • 别再瞎调间距了!手把手教你用TCAD仿真优化功率器件场限环(FLR)设计
  • VSCode 2026协作权限体系曝光:细粒度文件级/行级/语义级锁定策略(含RBAC+SCIM集成方案)
  • 基于大语言模型的游戏AI助手:ChatGPT-On-CS项目实战解析
  • Pandas数据分析避坑指南:describe()函数里藏着的5个细节,新手必看
  • 别再手动算闰年了!基于UNIX时间戳的STM32 RTC日期转换与显示实战(附完整代码)
  • 南京及周边防水补漏技术全解析 选服务商的核心逻辑 - 奔跑123
  • 优质小程序开发公司2026年权威推荐!深度测评靠谱小程序制作服务商选型指南 - 新闻快传
  • 高性能内存分配器xgmem:原理、集成与调优实战
  • SparkFun Datalogger IoT开发板:无代码传感器数据采集方案
  • 别急着把 autocast 全切成 bf16:RTX 3090 上把 GEMM、Conv2d 和 ResNet18 训练都跑完后,我的推荐顺序是这样
  • 终极LaTeX公式转换指南:3秒将网页公式完美粘贴到Word
  • 从元数据混乱到有序:用ExifToolGUI重构你的照片管理思维
  • 各行业营销推广方法速查总纲:覆盖30+行业的获客方案
  • 从 CNN 到 ViT,再到多模态大模型:计算机视觉的下一站在哪里?
  • Tidyverse 2.0报告自动化终极面试清单(23道题|11道代码实操|9道架构设计),仅剩最后200份PDF版解析可领
  • 百度网盘直链解析:5分钟掌握高速下载终极技巧
  • Taotoken 模型广场功能在 AI 应用选型阶段的实践价值
  • 2026年3月自动化设备生产厂家推荐,制冷设备管件焊接/高精度淬火机床/红冲设备,自动化设备供应商哪家权威 - 品牌推荐师
  • 别再被线阻坑了!用开尔文四线法精准测量毫欧级电阻(附Multisim仿真步骤)
  • CNN在电力负荷预测中的应用与优化实践
  • 【完整源码+数据集+部署教程】颜色分割系统源码&数据集分享 [yolov8-seg-HGNetV2&yolov8-seg-p6等50+全套改进创新点发刊_一键训练教程_Web前端展示]