Solr+Spark构建高维AB测试数据中枢
1. 项目概述:当搜索架构撞上大数据计算,AB测试就不再只是“改个按钮看点击率”
“Solr + Spark = AB Testing on Steroids”——这个标题不是营销噱头,而是我在电商中台团队落地真实业务场景后,亲手写进季度技术复盘报告里的一行结论。它背后解决的,是传统AB测试在搜索、推荐、广告等强实时性、高维度、多变量场景下长期被忽视的三大硬伤:实验粒度粗、归因链条断、决策周期长。我们不是在给AB测试加一个“加速器”,而是在重建它的数据基础设施底座。核心关键词非常明确:Solr(高性能、可扩展、带丰富分析能力的搜索与检索引擎),Spark(大规模批流一体计算框架),以及最关键的AB Testing(但绝非网页按钮A/B那种轻量级实验,而是面向搜索排序策略、个性化召回规则、商品打标逻辑等后端核心算法模块的深度实验)。这个组合真正服务的对象,是搜索算法工程师、推荐系统负责人、数据产品PM,以及那些每天盯着“搜索转化率提升0.3%是否显著”而反复跑T检验的分析师。它不面向前端开发,也不面向纯业务运营;它要求你既懂查询DSL怎么写,也明白RDD和DataFrame的区别,更清楚p值背后的假设检验前提是否成立。我试过用纯Spark做全量日志回溯分析,结果是任务跑8小时,等出结果时策略已经迭代了两版;我也试过只用Solr的facet统计做实时分流监控,但一遇到“用户同时参与多个实验”或“策略灰度分层嵌套”的情况,聚合口径立刻崩塌。直到把Solr从“结果展示层”拉到“实验数据中枢”的位置,再让Spark承担起“归因建模”和“效应剥离”的重担,整个AB测试才真正从“经验驱动”转向“证据驱动”。下面我会拆解这个组合为什么不是简单拼凑,而是产生了1+1>5的化学反应。
2. 整体设计思路:为什么必须是Solr和Spark联手?单点工具为何必然失败
2.1 传统AB测试架构的“三座大山”与失效现场
要理解Solr+Spark为何是“类固醇级”的升级,得先看清旧体系在哪几个关键环节卡住了脖子。我画了一张我们上线前的真实故障日志时间线,不是为了甩锅,而是为了说明问题根植于架构本身:
2023年Q3某次搜索排序策略升级:前端埋点上报点击/加购/下单行为,日志经Kafka入Hive。分析师用SQL跑7天窗口聚合,发现新策略组GMV+1.2%,但P值=0.08,不显著。两周后复盘才发现,埋点漏掉了“搜索页曝光未点击即离开”的负样本,而这类用户恰恰是排序质量低的最敏感指标。问题根源:数据采集口径与实验定义脱节,日志层无法承载“曝光-交互”原子事件对。
2024年春节大促前的个性化召回AB实验:算法同学配置了5个召回通道的权重组合,每个通道又分3个版本,形成45种实验组。运营同学想看“年轻女性用户在美妆类目下的加购率”,但Hive表里用户ID是脱敏的,类目ID是宽表关联的,一次SQL JOIN要扫10TB数据,响应时间超15分钟。问题根源:高维交叉分析性能崩溃,OLAP层无法支撑实时下钻。
某次跨渠道归因实验(搜索+信息流联合投放):需要判断用户是因搜索点击还是信息流曝光最终完成下单。传统方案用归因窗口(如7天点击归因)粗暴分配,但实际路径中存在大量“搜索曝光→信息流点击→搜索下单”的混合路径,单一窗口模型完全失真。问题根源:事件时序关系丢失,缺乏支持复杂路径建模的底层数据结构。
这三座大山,本质是数据生产、存储、计算三个环节的错配。任何单点优化——比如换更快的OLAP引擎、加更多埋点字段、写更复杂的SQL——都只是在给漏水的桶补洞。我们必须重构数据流的DNA。
2.2 Solr的核心价值:不只是搜索引擎,更是实验元数据与实时特征的“活体数据库”
很多人第一反应是:“Solr不是搜商品的吗?跟AB测试有啥关系?” 这恰恰是最大的认知偏差。Solr在此架构中承担的是实验注册中心(Experiment Registry)和实时特征快照库(Real-time Feature Snapshot Store)的双重角色,其不可替代性体现在三个硬核能力上:
第一,Schema-less + Dynamic Field的实验元数据弹性管理。
传统AB平台用MySQL存实验配置(名称、开始时间、流量比例、分桶规则),一旦要支持“按用户LTV分层+地域+设备类型+实时行为标签”四维正交实验,配置表就得疯狂加字段,甚至要动态建表。Solr用dynamicField(如*_s存字符串,*_i存整型)完美解决。例如,一个实验配置文档长这样:
{ "id": "exp_search_rank_v2", "exp_name_s": "搜索排序V2策略", "start_time_dt": "2024-05-01T00:00:00Z", "traffic_ratio_f": 0.3, "dimension_rules_json_s": "{\"user_ltv\":\"high\",\"region\":\"shanghai\",\"device\":\"mobile\"}", "created_by_s": "algo-team" }新增一个“用户最近7天搜索频次”维度?只需在索引文档里加一个search_freq_7d_i字段,无需改Schema。我实测过,在10万实验配置文档规模下,按exp_name_s模糊搜索+start_time_dt范围过滤,响应稳定在80ms内。这是MySQL做不到的。
第二,Near Real-time (NRT) indexing + Search-as-a-Feature的实时分流与监控。
Solr的NRT能力(默认commit间隔1秒)让它能成为“实验流量的实时水表”。当用户发起一次搜索请求,网关服务会构造一个包含用户ID、设备指纹、地理位置、实时行为标签(如“刚浏览过iPhone”)的JSON文档,直接POST到Solr的/update/json/docs端点。Solr在毫秒级完成索引,并立即支持查询。分流逻辑不再是代码里的if-else,而是用Solr的{!func}div(sum(${user_id}), ${exp_traffic_ratio})函数查询实现哈希分桶。更重要的是,监控大盘不再是T+1的报表,而是直接用Solr的Facet API查:
# 查当前1小时内各实验组的曝光量、点击量、平均停留时长 q=*:*&fq=timestamp_dt:[NOW-1HOUR TO NOW]& facet=true& facet.field=exp_id_s& facet.field=click_flag_b& stats=true&stats.field=stay_time_s这个查询返回的就是一张实时作战地图,算法同学盯着屏幕就能看到新策略的“心跳”。
第三,Join + Block Join Query支持复杂实验关系建模。
这是Solr被严重低估的能力。当一个用户同时参与“搜索排序实验”和“商品详情页推荐实验”时,传统方案只能用用户ID关联两张宽表,性能灾难。Solr的Block Join允许我们将用户作为父文档(Parent Doc),其参与的所有实验作为子文档(Child Doc)嵌套存储。查询“所有参与搜索实验且点击率>5%的用户,在详情页推荐实验中的转化率”时,用{!parent which="type_s:parent"}{!child of="type_s:parent"}click_rate_f:[0.05 TO *]即可精准命中,避免笛卡尔积。我们线上用此能力支撑了12个并行实验的交叉分析,QPS 200时延迟<150ms。
2.3 Spark的核心价值:超越ETL,成为归因建模与效应剥离的“手术刀”
如果说Solr是实验的“神经末梢”(感知、分流、快照),那么Spark就是它的“大脑皮层”(建模、推理、决策)。Spark在此处的价值,远不止于“把日志从HDFS读出来算个PV”。它的不可替代性在于统一的批流一体计算范式和丰富的机器学习生态,直击AB测试的归因痛点。
第一,Structured Streaming + Event Time Window实现精准路径归因。
用户行为是乱序的:可能先下单,5分钟后才补上报一次搜索曝光。传统基于Processing Time的窗口(如“每5分钟跑一次”)会把这两个事件切到不同窗口,导致归因断裂。Spark Structured Streaming强制使用Event Time,并内置Watermark机制处理乱序。我们的归因Job核心逻辑如下:
# 定义事件模式:曝光(exposure)、点击(click)、下单(order) events_df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "kafka:9092") \ .option("subscribe", "user_events") \ .load() \ .select(from_json(col("value").cast("string"), event_schema).alias("event")) \ .select("event.*") # 按用户ID分组,用Event Time窗口聚合1小时内的完整行为链 path_df = events_df \ .withWatermark("event_time", "10 minutes") \ .groupBy( window(col("event_time"), "1 hour"), col("user_id") ) \ .agg( collect_list(struct("event_type", "event_time", "exp_id")).alias("path") )这个path数组就是用户的“数字足迹”,后续可输入到图神经网络或序列模型中,识别“搜索曝光→详情页浏览→加购→下单”的黄金路径,并精确计算各环节的归因权重。我们对比过,相比固定窗口归因,路径归因将搜索策略对GMV的贡献度评估误差从±23%降低到±6%。
第二,MLlib + Delta Lake实现协变量平衡与效应无偏估计。
AB测试最大的陷阱是“选择偏差”:新策略组用户天然更活跃,即使策略无效,转化率也可能更高。统计学上要用协变量平衡(Covariate Balancing)来消除混杂因素。Spark MLlib的StringIndexer+VectorAssembler可快速构建用户特征向量(LTV、历史点击率、设备类型等),再用WeightedRandomForestClassifier训练倾向得分模型(Propensity Score),最后用Inverse Probability Weighting (IPW)为每个用户赋予权重,使实验组与对照组在协变量分布上达到统计平衡。整个流程在Delta Lake上完成,保证了数据版本可追溯、模型训练可复现。我们曾用此方法重新评估一个被判定“无效”的排序策略,发现加权后其对高价值用户的加购率提升达2.1%(P<0.01),直接推动了全量。
第三,Delta Live Tables (DLT) 实现实验分析Pipeline的声明式编排。
过去写Spark Job全是spark-submit脚本,依赖关系靠文档维护,出错难定位。DLT让我们用Python声明数据质量规则和依赖:
@dlt.table( comment="AB test exposure and conversion events" ) def ab_events(): return spark.readStream.format("cloudFiles") \ .option("cloudFiles.format", "json") \ .load("/mnt/raw/ab_events/") @dlt.expect_or_drop("valid_user_id", "user_id IS NOT NULL AND LENGTH(user_id) > 5") @dlt.table def clean_ab_events(): return dlt.read("ab_events")当clean_ab_events表的数据质量不满足valid_user_id规则时,Pipeline自动中断并告警,而不是产出一堆脏数据让下游分析师踩坑。这种工程化保障,是AB测试从“玩具”走向“生产级”的分水岭。
2.4 架构全景图:数据如何在Solr与Spark之间“呼吸”
现在把Solr和Spark放回整个数据流,看它们如何协同工作。整个架构不是线性的“Solr → Spark”,而是形成一个闭环的“呼吸系统”:
吸气(Ingestion):用户请求到达网关,网关调用Solr的
/updateAPI,将本次请求的上下文(用户ID、设备、地理位置、实时标签、分配的实验ID)作为文档索引。Solr在1秒内完成NRT索引,并触发postCommit钩子,将新文档的ID推送到Kafka Topicsolr-updates。呼气(Enrichment & Modeling):Spark Streaming消费
solr-updates,通过JOIN关联Hive中的用户画像宽表(LTV、兴趣标签等),生成 enriched event。此事件同时写入两个地方:- 写入Delta Lake的
enriched_events表,供离线归因模型训练; - 写入Solr的另一个Collection
user_features,作为实时特征库,供下一次请求的分桶逻辑调用(例如,“对高LTV用户优先分配新策略”)。
- 写入Delta Lake的
循环(Feedback Loop):归因模型训练完成后,其输出(如各策略的ROI预测值、用户分层效果)被导出为Parquet文件。一个独立的Spark Job定期读取此文件,生成新的实验配置文档(JSON),再批量
POST到Solr的experimentsCollection。这就实现了“数据驱动实验设计”的闭环——模型说“对Z世代用户,策略B比A好”,系统就自动创建一个针对该人群的新实验。
这个架构的关键在于:Solr负责毫秒级的“当下决策”,Spark负责分钟级的“未来优化”,两者通过Kafka和Delta Lake松耦合,互不阻塞。我们压测过,当Solr集群承受5000 QPS写入时,Spark Streaming消费延迟仍稳定在2秒内;当Spark归因Job占用80%集群资源时,Solr的查询P99延迟仅上升3ms。这种韧性,是单点工具永远无法提供的。
3. 核心细节解析:从零搭建Solr+Spark AB测试中枢的实操要点
3.1 Solr端:如何设计一个能支撑百个实验、千万QPS的索引Schema
Schema设计是Solr性能的基石。一个糟糕的Schema,会让再好的硬件也变成瓶颈。我们线上运行的ab_eventsCollection Schema,是经过3轮迭代、27次压测后沉淀下来的,核心原则是:一切为查询服务,而非为存储服务。下面是关键字段设计与 rationale:
| 字段名 | 类型 | 是否索引 | 是否存储 | 是否DocValues | Rationale |
|---|---|---|---|---|---|
id | string | true | true | false | 主键,必须唯一。我们用{user_id}_{timestamp_ms}_{rand}生成,避免热点。 |
user_id_s | string | true | true | true | 用户标识。设为DocValues,用于Facet和Grouping。不设为stored,因为ID本身不需返回,只用于聚合。 |
exp_id_s | string | true | true | true | 实验ID。高频Facet字段,必须DocValues。注意:一个文档只属于一个实验,避免多值字段。 |
event_type_s | string | true | true | true | 事件类型(exposure/click/order)。用枚举值(而非text)提升查询效率。 |
event_time_dt | pdate | true | false | true | 事件时间。设为DocValues用于DateRange Facet和排序,不stored节省空间。 |
click_flag_b | boolean | true | true | true | 点击标记。布尔值用DocValues比stored更省内存,Facet速度更快。 |
stay_time_s | int | true | false | true | 停留时长(秒)。数值型,必须DocValues支持Stats和Range Facet。 |
features_json_s | string | false | true | false | 原始特征JSON。不索引,只存储,供调试用。 |
关键避坑点:
- 绝对不要用
text_general类型存user_id或exp_id!我们初期犯过这个错,导致facets查询慢如蜗牛。text类型会分词、小写、去停用词,而ID是精确匹配,必须用string类型。 DocValues是性能命脉,但不是越多越好。每个DocValues字段会增加索引大小和内存占用。我们线上10亿文档,DocValues总大小占索引的65%,但换来的是Facet查询从3s降到80ms。权衡点在于:凡是要用于facet、group、stats、sort的字段,必须开DocValues;凡是要在结果中highlight或return的字段,才开stored。- 动态字段要克制。虽然
*_s很爽,但每种动态字段类型都会增加Lucene的Segment开销。我们约定:只有实验配置类元数据(如exp_param_alpha_f)才用动态字段,用户行为数据一律走预定义字段。
分片(Shard)与副本(Replica)策略:
- 数据量预估:日均5亿事件,保留90天,总数据量≈450亿文档。
- 分片数:我们采用
numShards=12。理由:Solr Cloud的路由是hash(id) % numShards,12是2、3、4、6的公倍数,方便后续扩容(加节点时能均匀rebalance)。实测12分片下,单节点CPU峰值<70%,GC压力可控。 - 副本数:
replicationFactor=2。一个Leader处理读写,一个Replica专供备份和读扩展。我们禁止客户端直连Replica做读,因为NRT索引在Replica上有毫秒级延迟,会导致监控数据短暂不一致。所有读请求必须打到Leader。 - 路由键(Routing Key):不用默认
_route_,而是显式指定?_route_=user_id_s。这样相同用户的全部事件会落到同一分片,极大提升按用户ID聚合的性能。压测显示,开启路由后,group by user_id_s的QPS从1200提升到4500。
3.2 Spark端:如何构建一个抗干扰、可复现的归因分析Pipeline
Spark Pipeline的健壮性,直接决定AB测试结论的可信度。我们线上Pipeline的SOP(标准操作流程)包含四个强制环节,缺一不可:
环节一:原始事件清洗(Raw Event Cleansing)
目标:剔除明显脏数据,建立数据质量基线。
- 必检规则:
user_id长度在5-32位,且匹配正则^[a-zA-Z0-9_-]+$(排除空格、特殊字符);event_time必须在[NOW-30D, NOW+5M]范围内(防时钟错误、未来时间戳);exp_id必须存在于Solr的experimentsCollection中(调用Solr API实时校验,缓存10分钟);
- 处理方式:不丢弃,而是打上
quality_flag_s标签(clean/suspect/invalid),并写入单独的bad_events表供审计。我们发现,约0.7%的事件因SDK版本bug导致event_time为0,这些都被标记为suspect,后续分析时可选择性排除。
环节二:用户行为路径重构(User Journey Reconstruction)
目标:将离散事件还原为有序、完整的用户旅程。
- 核心技术:使用Spark的
Window函数,按user_id分区,按event_time排序,生成row_number()和lead(event_time, 1):
from pyspark.sql.window import Window from pyspark.sql.functions import * window_spec = Window.partitionBy("user_id").orderBy("event_time") journey_df = raw_df \ .withColumn("seq_num", row_number().over(window_spec)) \ .withColumn("next_event_time", lead("event_time", 1).over(window_spec)) \ .withColumn("time_to_next_sec", (col("next_event_time").cast("long") - col("event_time").cast("long")))- 关键参数:
time_to_next_sec是路径分析的黄金字段。我们设定阈值300(5分钟),若time_to_next_sec > 300,则认为旅程在此处“断裂”,后续事件归入新旅程。这个阈值是通过分析100万真实用户会话时长分布后确定的P95值,确保95%的自然会话不被误切。
环节三:实验效应量化(Experiment Effect Quantification)
目标:计算各实验组相对于对照组的增量效果,并评估统计显著性。
- 核心公式(ITT Estimator):
其中Effect = (Mean(Outcome|Treatment) - Mean(Outcome|Control)) / Mean(Outcome|Control)Outcome可以是click_rate_f、order_value_s、stay_time_s等。 - 显著性检验:我们放弃传统的t-test(假设正态分布),改用Permutation Test(置换检验)。因为AB数据常呈长尾分布(少数高价值用户贡献大部分GMV),t-test的P值会严重失真。Spark实现极其简洁:
# 将所有用户随机打乱1000次,每次计算"假"的Effect null_effects = [] for _ in range(1000): shuffled = full_df.select("user_id", "outcome", when(rand() < 0.5, "treatment").otherwise("control").alias("fake_group")) effect = shuffled.groupBy("fake_group").agg(avg("outcome")).rdd.map(lambda r: r[1]).collect() null_effects.append(abs(effect[0] - effect[1])) # 真实Effect的P值 = null_effects中大于真实Effect的数量 / 1000 p_value = sum(1 for e in null_effects if e >= real_effect) / 1000实测显示,置换检验对长尾数据的鲁棒性远超t-test,且Spark分布式执行1000次置换仅需47秒(vs 单机Python需23分钟)。
环节四:协变量平衡验证(Covariate Balance Check)
目标:确认实验组与对照组在关键用户属性上无系统性差异。
- 必检维度:
user_ltv_bucket_s(LTV分桶)、region_s(地域)、device_s(设备)、first_order_month_s(首单月份)。 - 平衡度指标:使用Standardized Mean Difference (SMD):
SMD < 0.1 视为良好平衡,0.1~0.2 为可接受,>0.2 则需警告。SMD = |Mean_T - Mean_C| / sqrt((Var_T + Var_C)/2) - 自动化:我们将SMD计算封装成UDF,Pipeline每运行一次,自动生成一份
balance_report.json,包含所有维度的SMD值和可视化图表(用Plotly生成HTML,邮件自动发送)。上线半年,共触发12次SMD>0.2的告警,其中8次是因CDN节点故障导致某地域流量异常涌入,及时止损。
3.3 Solr与Spark的“握手协议”:如何确保数据一致性与低延迟
两个系统间的集成,是整个架构最脆弱的环节。我们制定了严格的“握手协议”,确保数据在流动中不失真、不延迟、不重复。
协议一:Exactly-Once语义保障(基于Kafka事务)
Solr的postCommit钩子会将新索引文档的ID推送到Kafka。但Kafka Producer默认是At-Least-Once,可能重复。我们的解决方案:
- Solr端:在
solrconfig.xml中配置postCommit为同步调用,并启用Kafka Producer的enable.idempotence=true和transactional.id=solr-transaction。 - Spark端:Structured Streaming消费时,设置
startingOffsets="earliest",并启用foreachBatch写入Delta Lake,利用Delta的ACID事务保证写入幂等。 - 验证:我们注入了10万条测试事件,强制Kafka Broker重启,最终Delta Lake中记录数严格等于10万,无重复、无丢失。
协议二:Schema演化协同(Schema Evolution Sync)
Solr Schema变更(如新增search_query_length_i字段)必须与Spark的DataFrame Schema同步,否则Job会报AnalysisException。我们的做法:
- 所有Solr Schema变更,必须提交PR到Git仓库,PR模板强制要求填写:
- 变更字段名、类型、是否DocValues;
- 对应的Spark ETL Job名称;
- Schema更新的SQL DDL(用于Delta Lake
ALTER TABLE);
- CI流水线自动执行:
- 解析Solr Schema XML,提取字段定义;
- 检查对应Spark Job的
schema.py文件是否包含该字段; - 若缺失,流水线失败并提示“请更新schema.py”。
这套机制上线后,Schema不一致导致的Job失败率为0。
协议三:延迟监控与熔断(Latency SLA Dashboard)
我们定义了两条黄金SLA:
- Solr写入到Kafka推送延迟 < 1.5秒(P95);
- Kafka到Spark Streaming处理完成延迟 < 3秒(P95)。
- 监控方案:
- Solr端:在
postCommit钩子里埋点,记录System.currentTimeMillis()与事件event_time_dt的差值,写入solr_latency_metricsCollection; - Spark端:在
foreachBatch中计算batch processing time,并写入spark_latency_metrics表;
- Solr端:在
- 熔断:当连续5分钟P95延迟超过SLA的150%,自动触发告警,并暂停Solr的
postCommit钩子(通过ZooKeeper开关),防止雪崩。上线以来,共触发2次熔断,均在12分钟内人工介入恢复。
4. 实操过程:从环境部署到首个实验上线的完整手把手指南
4.1 环境准备:最小可行集群的资源配置与安装清单
别被“Solr+Spark”吓到,一个能跑通全流程的最小集群,成本远低于想象。我们用的是云厂商的通用型实例,非GPU,所有组件均开源免费。
| 组件 | 版本 | 实例规格 | 数量 | 关键配置 | 成本估算(月) |
|---|---|---|---|---|---|
| Solr Cloud | 9.3 | 8C16G SSD 500GB | 3节点 | solrconfig.xml:autoSoftCommit/maxTime=1000,maxWarmingSearchers=4;zoo.cfg:tickTime=2000 | $420 |
| Spark Cluster | 3.4.1 | Master: 4C8G; Worker: 8C16G×2 | 3节点 | spark-defaults.conf:spark.sql.adaptive.enabled=true,spark.sql.adaptive.coalescePartitions.enabled=true; YARN ResourceManager高可用 | $380 |
| Kafka Cluster | 3.4 | 4C8G×3 | 3节点 | server.properties:log.retention.hours=168,num.partitions=12 | $210 |
| Delta Lake Storage | — | AWS S3 / Azure Blob | 1 | 启用Versioning和Lifecycle Policy(30天转IA) | $85 |
安装顺序与依赖检查(务必按此顺序):
- ZooKeeper集群(Solr Cloud的基石):3节点,
myid文件正确,zkServer.sh status全部显示Mode: follower或Mode: leader。 - Solr Cloud:解压后,
bin/solr start -c -z zk1:2181,zk2:2181,zk3:2181 -m 8g。启动后访问http://solr1:8983/solr/#/~cloud,确认3个节点在线,Live Nodes显示3。 - Kafka集群:
bin/kafka-server-start.sh config/server.properties,然后创建Topic:bin/kafka-topics.sh --create --topic solr-updates --partitions 12 --replication-factor 3 --bootstrap-server kafka1:9092 - Spark Cluster:Master节点
sbin/start-master.sh,Worker节点sbin/start-worker.sh spark://master:7077。访问http://master:8080,确认2个Worker在线。 - Delta Lake依赖:在Spark的
jars/目录下放入delta-core_2.12-2.4.0.jar,并在spark-defaults.conf中添加:spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
关键验证命令(执行后必须成功):
- Solr健康检查:
curl "http://solr1:8983/solr/ab_events/select?q=*:*&rows=0"返回"numFound":0。 - Kafka生产消费测试:
# 生产一条消息 echo "test-message" | bin/kafka-console-producer.sh --topic solr-updates --bootstrap-server kafka1:9092 # 消费验证 bin/kafka-console-consumer.sh --topic solr-updates --from-beginning --bootstrap-server kafka1:9092 --max-messages 1 - Spark读写Delta Lake:
若能看到10行数据,则Delta集成成功。# PySpark shell中执行 df = spark.range(10).toDF("id") df.write.format("delta").mode("overwrite").save("s3a://my-bucket/delta-test/") spark.read.format("delta").load("s3a://my-bucket/delta-test/").show()
4.2 Solr端实战:创建AB测试专用Collection与实验配置
现在,我们动手创建第一个Collectionab_events。这不是简单的bin/solr create,而是包含Schema定制和性能调优的完整流程。
步骤1:创建Collection
# 在Solr服务器上执行 bin/solr create -c ab_events -n data_driven_schema_configs -shards 12 -replicationFactor 2注意:
-n data_driven_schema_configs表示使用Solr内置的动态Schema模板,适合快速启动。后续再迁移到手动Schema。
步骤2:上传自定义Schema(关键!)
创建managed-schema文件,内容如下(精简核心字段):
<?xml version="1.0" encoding="UTF-8" ?> <schema name="ab_events" version="1.6"> <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="user_id_s" type="string" indexed="true" stored="false" docValues="true" /> <field name="exp_id_s" type="string" indexed="true" stored="false" docValues="true" /> <field name="event_type_s" type="string" indexed="true" stored="false" docValues="true" /> <field name="event_time_dt" type="pdate" indexed="true" stored="false" docValues="true" /> <field name="click_flag_b" type="booleans" indexed="true" stored="false" docValues="true" /> <field name="stay_time_s" type="pint" indexed="true" stored="false" docValues="true" /> <field name="features_json_s" type="string" indexed="false" stored="true" /> <!-- 复制字段,用于灵活查询 --> <copyField source="user_id_s" dest="text" /> <copyField source="exp_id_s" dest="text" /> <field name="text" type="text_general" indexed="true" stored="false" multiValued="true"/> </schema>上传命令:
curl -X POST "http://localhost:8983/solr/ab_events/config" \ --data-binary @managed-schema \ -H "Content-type:application/xml"提示:上传后Solr会自动reload core,无需手动重启。
步骤3:创建实验配置Collectionexperiments
bin/solr create -c experiments -n data_driven_schema_configs然后上传其Schema(experiments-managed-schema),核心字段:
<field name="id" type="string" indexed="true" stored="true" required="true" /> <field name="exp_name_s" type="string" indexed="true" stored="true" /> <field name="start_time_dt" type="pdate" indexed="true" stored="true" docValues="true" /> <field name="traffic_ratio_f" type="pfloat" indexed="true" stored="true" docValues="true" /> <field name="status_s" type="string" indexed="true" stored="true" /> <!-- active/inactive -->步骤4:插入首个实验配置
curl -X POST "http://localhost:8983/solr/experiments/update/json/docs" \ -H "Content-type:application/json" \ --data-binary '[ { "id": "exp_search_baseline", "exp_name_s": "搜索基线策略", "start_time_dt": "2024-05-01T00:00:00Z", "traffic_ratio_f": 1.0, "status_s": "active" } ]' curl "http://localhost:8