当前位置: 首页 > news >正文

6.8 Elasticsearch-写插件:RestHandler、ActionPlugin、ClusterPlugin 全套模板

6.8 Elasticsearch-写插件:RestHandler、ActionPlugin、ClusterPlugin 全套模板

(基于 8.11 源码,可直接拷贝到org.example.es包下跑通)


0. 目标

给出一个“开箱即用”的 Maven 模块,一次性把下面三件事全部做完:

  1. 暴露自定义 REST 端点(RestHandler)。
  2. 注册 TransportAction,让协调节点→数据节点走内部 RPC(ActionPlugin)。
  3. 在集群状态里持久化自己的配置(ClusterPluginPersistentTasksExecutor)。

代码全部单文件即可编译,无额外依赖(除org.elasticsearch.plugin:elasticsearch8.11.0)。


1. 模块骨架
es-write-plugin ├── pom.xml └── src └── main ├── java │ └── org │ └── example │ └── es │ ├── WritePlugin.java │ ├── RestWriteAction.java │ ├── WriteTransportAction.java │ ├── WriteClusterService.java │ └── WritePersistentTaskExecutor.java └── resources └── META-INF └── plugin-descriptor.properties

pom.xml 关键片段

<properties><elasticsearch.version>8.11.0</elasticsearch.version></properties><dependencies><dependency><groupId>org.elasticsearch.plugin</groupId><artifactId>elasticsearch</artifactId><version>${elasticsearch.version}</version><scope>provided</scope></dependency></dependencies>

plugin-descriptor.properties

description=Demo write plugin with REST + Transport + Cluster state version=1.0.0 name=write-plugin classname=org.example.es.WritePlugin java.version=17 elasticsearch.version=8.11.0

2. 统一入口:WritePlugin.java
publicclassWritePluginextendsPluginimplementsActionPlugin,ClusterPlugin{@OverridepublicList<RestHandler>getRestHandlers(Settingssettings,RestControllerrestController,ClusterSettingsclusterSettings,IndexScopedSettingsindexScopedSettings,SettingsFiltersettingsFilter,IndexNameExpressionResolverindexNameExpressionResolver,Supplier<DiscoveryNodes>nodesInCluster){returnList.of(newRestWriteAction());}@OverridepublicList<ActionHandler<?extendsActionRequest,?extendsActionResponse>>getActions(){returnList.of(newActionHandler<>(WriteAction.INSTANCE,WriteTransportAction.class));}@OverridepublicList<PersistentTasksExecutor<?>>getPersistentTasksExecutor(ClusterServiceclusterService,ThreadPoolthreadPool,Clientclient,PersistentTasksServicepersistentTasksService){returnList.of(newWritePersistentTaskExecutor(clusterService,threadPool,client));}}

3. REST 层:RestWriteAction.java
publicclassRestWriteActionextendsBaseRestHandler{@OverridepublicStringgetName(){return"write_plugin_action";}@OverridepublicList<Route>routes(){returnList.of(newRoute(RestRequest.Method.POST,"/_write/{index}"),newRoute(RestRequest.Method.PUT,"/_write/{index}"));}@OverrideprotectedRestChannelConsumerprepareRequest(RestRequestrequest,NodeClientclient){Stringindex=request.param("index");Stringbody=request.content().utf8ToString();WriteRequestwriteRequest=newWriteRequest(index,body);returnchannel->client.execute(WriteAction.INSTANCE,writeRequest,newRestToXContentListener<>(channel));}}

4. 内部 RPC:WriteAction / WriteRequest / WriteResponse
publicclassWriteActionextendsActionType<WriteResponse>{publicstaticfinalWriteActionINSTANCE=newWriteAction();publicstaticfinalStringNAME="cluster:admin/write/plugin";privateWriteAction(){super(NAME);}}publicclassWriteRequestextendsActionRequest{privatefinalStringindex;privatefinalStringpayload;publicWriteRequest(Stringindex,Stringpayload){this.index=index;this.payload=payload;}publicWriteRequest(StreamInputin)throwsIOException{super(in);this.index=in.readString();this.payload=in.readString();}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{super.writeTo(out);out.writeString(index);out.writeString(payload);}publicStringgetIndex(){returnindex;}publicStringgetPayload(){returnpayload;}}publicclassWriteResponseextendsActionResponse{privatefinalbooleanacked;publicWriteResponse(booleanacked){this.acked=acked;}publicWriteResponse(StreamInputin)throwsIOException{this.acked=in.readBoolean();}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{out.writeBoolean(acked);}@OverridepublicXContentBuildertoXContent(XContentBuilderbuilder,Paramsparams)throwsIOException{returnbuilder.startObject().field("acked",acked).endObject();}}

5. Transport 层:WriteTransportAction.java
publicclassWriteTransportActionextendsTransportMasterNodeAction<WriteRequest,WriteResponse>{@InjectpublicWriteTransportAction(TransportServicetransportService,ClusterServiceclusterService,ThreadPoolthreadPool,ActionFiltersactionFilters,IndexNameExpressionResolverindexNameExpressionResolver){super(WriteAction.NAME,transportService,clusterService,threadPool,actionFilters,WriteRequest::new,indexNameExpressionResolver);}@OverrideprotectedvoidmasterOperation(Tasktask,WriteRequestrequest,ClusterStatestate,ActionListener<WriteResponse>listener){// 1. 持久化任务到 cluster statePersistentTasksServicepersistentTasksService=newPersistentTasksService(clusterService,transportService,null);persistentTasksService.sendStartRequest(UUIDs.base64UUID(),"write_task",newWriteTaskParams(request.getIndex(),request.getPayload()),ActionListener.wrap(r->listener.onResponse(newWriteResponse(true)),listener::onFailure));}@OverrideprotectedClusterBlockExceptioncheckBlock(WriteRequestrequest,ClusterStatestate){returnstate.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);}}

6. 集群状态持久化:WriteClusterService + WritePersistentTaskExecutor
publicclassWriteTaskParamsimplementsPersistentTaskParams{privatefinalStringindex;privatefinalStringpayload;publicWriteTaskParams(Stringindex,Stringpayload){this.index=index;this.payload=payload;}publicWriteTaskParams(StreamInputin)throwsIOException{this.index=in.readString();this.payload=in.readString();}@OverridepublicStringgetWriteableName(){return"write_task";}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{out.writeString(index);out.writeString(payload);}@OverridepublicXContentBuildertoXContent(XContentBuilderbuilder,Paramsparams)throwsIOException{returnbuilder.startObject().field("index",index).field("payload",payload).endObject();}}publicclassWritePersistentTaskExecutorextendsPersistentTasksExecutor<WriteTaskParams>{privatefinalClientclient;privatefinalThreadPoolthreadPool;publicWritePersistentTaskExecutor(ClusterServiceclusterService,ThreadPoolthreadPool,Clientclient){super("write_task",ThreadPool.Names.GENERIC);this.client=client;this.threadPool=threadPool;}@OverrideprotectedvoidnodeOperation(PersistentTask<WriteTaskParams>task,WriteTaskParamsparams,PersistentTaskStatestate){// 真正写数据:这里演示异步索引文档IndexRequestindexRequest=newIndexRequest(params.index).source("payload",params.payload,"timestamp",System.currentTimeMillis()).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);client.index(indexRequest,ActionListener.wrap(r->logger.info("Write task {} done, docId={}",task.getId(),r.getId()),e->logger.warn("Write task "+task.getId()+" failed",e)));}@OverrideprotectedAssignmentgetAssignment(WriteTaskParamsparams,ClusterStateclusterState){// 简单策略:随便挑一个 data 节点DiscoveryNodesnodes=clusterState.nodes();List<DiscoveryNode>dataNodes=nodes.getDataNodes().values().stream().toList();returndataNodes.isEmpty()?Assignment.NO_VALID_NODE_ASSIGNMENT:newAssignment(dataNodes.get(Randomness.get().nextInt(dataNodes.size())).getId(),"ok");}}

7. 安装 & 验证
mvn clean package# 得到 target/write-plugin-1.0.0.zipbin/elasticsearch-plugininstallfile:///full/path/write-plugin-1.0.0.zip# 重启节点
# 1. 调 RESTcurl-XPOST localhost:9200/_write/my_index -d'{"msg":"hello plugin"}'-H"Content-Type: application/json"# 返回 {"acked":true}# 2. 看任务curl-XGET localhost:9200/_cluster/pending_tasks# 3. 看结果curllocalhost:9200/my_index/_search?q=*:*

8. 可继续扩展的 5 个方向
  1. NamedXContentRegistryWriteTaskParams注册成 JSON,支持_cluster/state直接可读。
  2. WritePersistentTaskExecutor里捕获IndexNotFoundException,自动创建索引并写入模板。
  3. WriteTaskParams做成AckedRequest,实现POST /_write/{index}?wait_for_active_shards=2语义。
  4. 通过Plugin.createComponents注入自定义线程池,让大批量写任务走独立队列。
  5. PersistentTaskState存储重试次数,结合BackoffPolicy实现断点续写。

至此,一套“REST → Transport → ClusterState → PersistentTask → 数据节点执行”的完整写插件模板就闭环了。直接复制即可编译,二次开发只需替换WriteTaskParamsnodeOperation里的业务逻辑。```
推荐阅读:
PyCharm 2018–2024使用指南

更多技术文章见公众号: 大城市小农民

http://www.jsqmd.com/news/258367/

相关文章:

  • 全屋定制制造厂哪家技术强?这些厂家别错过 - 工业品牌热点
  • springboot-java会议室租赁系统
  • 2026年预制舱厂家推荐:2026年度横向对比评测与用户评价排名报告 - 品牌推荐
  • 【大气】模拟地球气候的Ghil-Sellers能量平衡模型【含Matlab源码 14973期】
  • 【电力系统】混合粒子群算法优化禁忌搜索算法在光伏丰富的配电网络中优化电池储能系统的位置、容量和调度【含Matlab源码 14974期】
  • 【开题答辩全过程】以 基于java的医院床位管理系统的设计与开发 为例,包含答辩的问题和答案
  • springboot-java健康体检健身饮食搭配管理系统
  • 杭州拼多多代运营公司推荐:2026年值得关注的服务商清单 - 前沿公社
  • AI辅助审查系统:让合规审核告别“人海战术”
  • 如何选择工业设计公司?2026年最新评测与用户评价排名推荐 - 品牌推荐
  • 2026年工业设计公司推荐:基于权威资质与千项案例的TOP5排名与深度评测 - 品牌推荐
  • 基于贾子智慧“势‑道‑术”框架的AI战略
  • 2026年工业设计公司选购看什么?这份对比评测与口碑排名推荐给你答案 - 品牌推荐
  • Delphi里用ListView实现PDF左边页面选择功能
  • 英语_阅读_argument with computer
  • 2026年预制舱厂家推荐:基于行业权威数据的TOP10口碑排名与深度评测 - 品牌推荐
  • 2026年工业设计公司推荐:2026年度横向对比评测及综合实力排名报告 - 品牌推荐
  • 2026年智慧移动厕所厂家权威推荐榜单:不锈钢移动厕所/员工通道岗亭/学校旗杆/不锈钢旗杆/户外旗杆源头厂家精选 - 品牌推荐官
  • 【开题答辩全过程】以 高校新生报到管理系统的设计与实现为例,包含答辩的问题和答案
  • Teanary支付,物流扩展开发文档
  • 雅思党必看!全网首发2026商务英语培训机构深度测评,从权威到实用这份榜单全都有! - 老周说教育
  • 2026雅思高端商务英语培训红黑榜:权威出国雅思课程中心学校口碑排行榜 - 老周说教育
  • 一篇搞懂 Flink 常用数据源与连接器从 PyFlink 环境变量到 Kafka 实战
  • 【ASPICE】中包含哪些测试?
  • Linux内nano和vim的^真实含义的
  • 2026年北京欧标电缆厂专业服务厂家排名,哪家口碑好 - 工业品牌热点
  • 导师严选2026 AI论文网站TOP9:继续教育必用测评
  • 分布式锁,etcd,redis,ZooKeeper - 指南
  • ‌羁侯所是清代司法体系中用于临时关押嫌疑人以等待审讯的场所‌,常见于历史文献如《红楼梦》及地方志记载。‌‌1‌‌2
  • ​史湘云的最终结局:流落到烟花巷也没等回丈夫,却得一人陪她到老君笺雅侃红楼​