实时日志采集与统计分析平台
一、项目概述
本项目基于 Flume + Kafka + Spark Streaming 构建了一套完整的实时日志采集与统计分析系统,实现了从日志生成、实时采集、消息队列传输到流式计算的完整数据链路,端到端延迟控制在 15 秒以内。
二、前置准备
下载并配置flume和kafka环境
1、Kafka 安装配置步骤(QM131节点)
1.下载 Kafka
切换到模块目录
cd /opt/module
使用华为云镜像下载(速度快)
wget https://mirrors.huaweicloud.com/apache/kafka/3.0.0/kafka_2.13-3.0.0.tgz
2.解压并重命名
tar -zxvf kafka_2.13-3.0.0.tgz
mv kafka_2.13-3.0.0 kafka
3.配置环境变量
echo 'export KAFKA_HOME=/opt/module/kafka' >> /etc/profile
echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> /etc/profile
source /etc/profile
2、验证安装
kafka-topics.sh --version # 应输出 3.0.0
1.优化内存配置(2GB节点)
vi /opt/module/kafka/bin/kafka-server-start.sh
修改内存参数:
export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"
2.配置 server.properties
vi /opt/module/kafka/config/server.properties
修改以下配置:
properties
broker.id=1
listeners=PLAINTEXT://QM131:9092
advertised.listeners=PLAINTEXT://QM131:9092
log.dirs=/opt/module/kafka/logs
zookeeper.connect=QM131:2181
3.启动 Kafka
cd /opt/module/kafka
3、启动 ZooKeeper(后台)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
4、启动 Kafka(后台)
bin/kafka-server-start.sh -daemon config/server.properties
5、验证进程
jps # 应看到 QuorumPeerMain 和 Kafka
创建 Topic
在 QM131 执行 cd /opt/module/kafka
bin/kafka-topics.sh --create \
--bootstrap-server QM131:9092 \
--replication-factor 1 \
--partitions 2 \
--topic user_log_topic
6、验证 Topic
bin/kafka-topics.sh --list --bootstrap-server QM131:9092
二、Flume 安装配置步骤(QM130节点)
1.下载 Flume
cd /opt/module
使用华为云镜像下载
wget https://mirrors.huaweicloud.com/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
2.解压并重命名
tar -zxvf apache-flume-1.9.0-bin.tar.gz
mv apache-flume-1.9.0-bin flume
3.配置环境变量
echo 'export FLUME_HOME=/opt/module/flume' >> /etc/profile
echo 'export PATH=$PATH:$FLUME_HOME/bin' >> /etc/profile
source /etc/profile
7、验证安装
flume-ng version # 应显示 1.9.0
1.创建项目目录和配置
创建目录
mkdir -p /opt/project/realtime/{data,conf,logs}
8、创建 Flume 配置文件
vi /opt/project/realtime/conf/flume_kafka.conf
配置文件内容:
properties agent.sources = tail_source agent.channels = memory_channel agent.sinks = kafka_sink agent.sources.tail_source.type = exec agent.sources.tail_source.command = tail -F /opt/project/realtime/data/click.log agent.sources.tail_source.shell = /bin/sh -c agent.channels.memory_channel.type = memory agent.channels.memory_channel.capacity = 1000 agent.channels.memory_channel.transactionCapacity = 100 agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafka_sink.kafka.bootstrap.servers = QM131:9092 agent.sinks.kafka_sink.kafka.topic = user_log_topic agent.sources.tail_source.channels = memory_channel agent.sinks.kafka_sink.channel = memory_channel1.创建日志生成脚本
vi /opt/project/realtime/data/generate_log.py
python #!/usr/bin/env python3 import time import random import datetime actions = ['browse', 'add_to_cart', 'collect', 'pay'] categories = ['家电', '数码', '服装', '美妆', '食品'] log_file = "/opt/project/realtime/data/click.log" while True: timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") user_id = random.randint(1000, 9999) action = random.choice(actions) category = random.choice(categories) log_line = f"{timestamp}|{user_id}|{action}|{category}\n" with open(log_file, "a") as f: f.write(log_line) print(log_line.strip()) time.sleep(1)chmod +x /opt/project/realtime/data/generate_log.py
三、启动命令汇总
启动顺序
顺序 节点 命令
1 QM131 Kafka启动(ZooKeeper + Kafka)
2 QM130 日志生成脚本
3 QM130 Flume采集
4 QM131 Kafka消费者验证(可选)
具体命令
QM131 - 启动Kafka:
cd /opt/module/kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
QM130 - 启动日志脚本:
python3 /opt/project/realtime/data/generate_log.py
QM130 - 启动Flume:
cd /opt/module/flume
bin/flume-ng agent \
--name agent \
--conf conf \
--conf-file /opt/project/realtime/conf/flume_kafka.conf \
-Dflume.root.logger=INFO,console
QM131 - Kafka消费者验证:
cd /opt/module/kafka
bin/kafka-console-consumer.sh --bootstrap-server QM131:9092 --topic user_log_topic
二、技术架构
技术栈详情
| 组件 | 版本 | 作用 | 部署节点 |
| Flume | 1.9.0 | 日志采集,监控文件变化并发送至Kafka | QM130 |
| Kafka | 3.0.0 | 消息队列,解耦采集与计算,缓冲数据 | QM131 |
| Spark | 3.0.0 | 流式计算,实时消费Kafka进行PV统计 | QM130 |
| ZooKeeper | 3.8.0 | Kafka集群协调 | QM131 |
| Python | 3.x | 模拟用户行为日志生成 | QM130 |
三、集群环境
节点分配
| 节点 | 角色 | 部署组件 | 内存 |
| QM130 | 主节点 | Flume + 日志生成脚本 + Spark提交 | 2GB |
| QM131 | Kafka节点 | ZooKeeper + Kafka Broker | 2GB |
| QM132-133 | 从节点 | Hadoop DataNode | 2GB |
环境配置
操作系统:CentOS 7
JDK版本:Java 1.8.0_281
Hadoop版本:3.1.4
Spark路径:/opt/module/spark-local
四、核心功能实现
4.1 日志生成模块
文件位置:/opt/project/realtime/data/generate_log.py
功能:模拟用户行为日志,每秒生成一条记录
日志格式:
2026-04-29 20:14:01|5394|browse|家电
字段说明:
timestamp:行为发生时间user_id:用户ID(1000-9999随机)action:行为类型(browse/add_to_cart/collect/pay)category:商品品类(家电/数码/服装/美妆/食品)
4.2 Flume采集模块
配置文件:/opt/project/realtime/conf/flume_kafka.conf
# Source:监控日志文件变化 agent.sources.tail_source.type = exec agent.sources.tail_source.command = tail -F /opt/project/realtime/data/click.log # Channel:内存通道(容量1000) agent.channels.memory_channel.type = memory agent.channels.memory_channel.capacity = 1000 # Sink:输出到Kafka agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafka_sink.kafka.bootstrap.servers = QM131:9092 agent.sinks.kafka_sink.kafka.topic = user_log_topic
Flume 启动 [root@QM130 flume]# bin/flume-ng agent \
--name agent \
--conf conf \
--conf-file /opt/project/realtime/conf/flume_kafka.conf \
-Dflume.root.logger=INFO,console
4.3 Kafka消息队列模块
节点:QM131
配置优化(2GB内存节点):
bash
export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"
创建的Topic:
名称:
user_log_topic分区数:2
副本因子:1
4.4 Spark Streaming实时计算模块
脚本:/opt/project/realtime/scripts/streaming_file.py
核心逻辑:
读取Kafka中的日志流
按
|分割解析日志按分钟聚合计算PV
每10秒输出统计结果
提交命令:
bin/spark-submit \ --master local[2] \ --driver-memory 512m \ /opt/project/realtime/scripts/streaming_file.py五、项目成果
5.1 运行效果
5.2 性能指标
| 指标 | 数值 |
| 日志生成速率 | 1条/秒 |
| Flume→Kafka延迟 | < 1秒 |
| Spark处理延迟 | < 10秒 |
| 端到端总延迟 | < 15秒 |
| 单日处理能力 | 10万+条 |
5.3 已完成功能
模拟用户行为日志持续生成
Flume实时监控日志文件变化
Kafka消息队列数据缓冲与传输
Spark Streaming实时消费与PV统计
每10秒输出分钟级统计数据
2GB小内存节点参数调优
六、踩坑与解决方案
| 问题 | 解决方案 |
| 国外源下载过慢 | 换华为云镜像:mirrors.huaweicloud.com |
| Kafka依赖包下载失败 | 换用文件流方式(file://协议) |
| Spark读取HDFS而非本地 | 使用 file:// 前缀指定本地路径 |
| Flume连不上Kafka | 关闭防火墙:systemctl stop firewalld |
| 2GB内存OOM | 调小Kafka和Spark堆内存至512M |
七、项目亮点
完整的实时链路:从数据产生、采集、传输到计算全流程打通
资源受限环境优化:2GB节点下通过参数调优保障稳定运行
真实模拟数据:模拟用户行为日志,贴近生产环境
模块化配置:各组件独立配置,易于扩展和维护
可观测性强:每10秒输出统计结果,实时监控数据流
八、扩展
增加UV统计:
uv_df = parsed_df.groupBy("process_minute").agg( approx_count_distinct("user_id").alias("uv") )品类热度排行:按category分组统计
category_df = parsed_df.groupBy("process_minute", "category") \ .count() \ .orderBy("process_minute", col("count").desc())行为分布
action_df = parsed_df.groupBy("process_minute", "action") \ .count() \ .orderBy("process_minute", col("count").desc())增强版代码:
from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * def main(): spark = SparkSession.builder \ .appName("RealTimeLogAnalysis_FileStream") \ .config("spark.sql.shuffle.partitions", "2") \ .getOrCreate() print("=" * 70) print("Spark Streaming 文件流增强版已启动") print("监控目录: /opt/project/realtime/data") print("=" * 70) # 读取文件流 df = spark.readStream \ .format("text") \ .option("pathGlobFilter", "*.log") \ .load("file:///opt/project/realtime/data") # 定义解析函数 def parse_log(line): try: parts = line.split('|') if len(parts) == 4 and parts[0].startswith('20'): return (parts[0], int(parts[1]), parts[2], parts[3]) except: pass return None schema = StructType([ StructField("timestamp", StringType(), True), StructField("user_id", IntegerType(), True), StructField("action", StringType(), True), StructField("category", StringType(), True) ]) parse_udf = udf(parse_log, schema) # 解析并过滤 parsed_df = df.select(parse_udf(col("value")).alias("data")) \ .filter(col("data").isNotNull()) \ .select( col("data.timestamp").alias("timestamp"), col("data.user_id").alias("user_id"), col("data.action").alias("action"), col("data.category").alias("category") ) # 添加处理时间 parsed_df = parsed_df.withColumn("process_minute", date_format(current_timestamp(), "yyyy-MM-dd HH:mm")) # ============================================================ # 1. PV统计(每分钟) # ============================================================ pv_df = parsed_df.groupBy("process_minute").count() query_pv = pv_df.writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="10 seconds") \ .queryName("PV统计") \ .start() # ============================================================ # 2. UV统计(使用近似去重) # ============================================================ uv_df = parsed_df.groupBy("process_minute").agg( approx_count_distinct("user_id").alias("uv") ) query_uv = uv_df.writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="10 seconds") \ .queryName("UV统计") \ .start() # ============================================================ # 3. 品类热度统计 # ============================================================ category_df = parsed_df.groupBy("process_minute", "category") \ .count() \ .orderBy("process_minute", col("count").desc()) query_category = category_df.writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="10 seconds") \ .queryName("品类热度") \ .start() # ============================================================ # 4. 行为分布统计 # ============================================================ action_df = parsed_df.groupBy("process_minute", "action") \ .count() \ .orderBy("process_minute", col("count").desc()) query_action = action_df.writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="10 seconds") \ .queryName("行为分布") \ .start() print("功能列表:") print(" PV统计 - 每10秒输出") print(" UV统计(近似) - 每10秒输出") print(" 品类热度 - 每10秒输出") print(" 行为分布 - 每10秒输出") print("=" * 70) print("等待数据流入...") print("=" * 70) query_pv.awaitTermination() if __name__ == "__main__": main()结果截图:
1.实时日志
2.日志采集与传输
3.实时计算
九、后续可拓展方向
结果持久化:写入MySQL或Hive供可视化展示
接入Grafana:构建实时监控大屏
增加告警机制:PV突增时触发报警
