ZooKeeper 服务器动态上下线监听案例
1. 需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
2. 需求分析
核心流程:
- 服务端启动时去注册信息(创建都是临时节点)
- 客户端获取到当前在线服务器列表,并且注册监听
- 服务器节点下线(临时节点自动删除)
- 客户端收到服务器节点上下线事件通知,重新获取服务器列表
3. 具体实现
3.1 先在集群上创建/servers节点
[zk:localhost:2181(CONNECTED)10]create /servers"servers"Created /servers3.2 在Idea中创建包名
com.atguigu.zkcase13.3 服务器端向Zookeeper注册代码
packagecom.atguigu.zkcase1;importjava.io.IOException;importorg.apache.zookeeper.CreateMode;importorg.apache.zookeeper.WatchedEvent;importorg.apache.zookeeper.Watcher;importorg.apache.zookeeper.ZooKeeper;importorg.apache.zookeeper.ZooDefs.Ids;publicclassDistributeServer{privatestaticStringconnectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";privatestaticintsessionTimeout=2000;privateZooKeeperzk=null;privateStringparentNode="/servers";// 创建到zk的客户端连接publicvoidgetConnect()throwsIOException{zk=newZooKeeper(connectString,sessionTimeout,newWatcher(){@Overridepublicvoidprocess(WatchedEventevent){}});}// 注册服务器publicvoidregistServer(Stringhostname)throwsException{Stringcreate=zk.create(parentNode+"/server",hostname.getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname+" is online "+create);}// 业务功能publicvoidbusiness(Stringhostname)throwsException{System.out.println(hostname+" is working ...");Thread.sleep(Long.MAX_VALUE);}publicstaticvoidmain(String[]args)throwsException{// 1 获取zk连接DistributeServerserver=newDistributeServer();server.getConnect();// 2 利用zk连接注册服务器信息server.registServer(args[0]);// 3 启动业务功能server.business(args[0]);}}3.4 客户端代码
packagecom.atguigu.zkcase1;importjava.io.IOException;importjava.util.ArrayList;importjava.util.List;importorg.apache.zookeeper.WatchedEvent;importorg.apache.zookeeper.Watcher;importorg.apache.zookeeper.ZooKeeper;publicclassDistributeClient{privatestaticStringconnectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";privatestaticintsessionTimeout=2000;privateZooKeeperzk=null;privateStringparentNode="/servers";// 创建到zk的客户端连接publicvoidgetConnect()throwsIOException{zk=newZooKeeper(connectString,sessionTimeout,newWatcher(){@Overridepublicvoidprocess(WatchedEventevent){// 再次启动监听try{getServerList();}catch(Exceptione){e.printStackTrace();}}});}// 获取服务器列表信息publicvoidgetServerList()throwsException{// 1 获取服务器子节点信息,并且对父节点进行监听List<String>children=zk.getChildren(parentNode,true);// 2 存储服务器信息列表ArrayList<String>servers=newArrayList<>();// 3 遍历所有节点,获取节点中的主机名称信息for(Stringchild:children){byte[]data=zk.getData(parentNode+"/"+child,false,null);servers.add(newString(data));}// 4 打印服务器列表信息System.out.println(servers);}// 业务功能publicvoidbusiness()throwsException{System.out.println("client is working ...");Thread.sleep(Long.MAX_VALUE);}publicstaticvoidmain(String[]args)throwsException{// 1 获取zk连接DistributeClientclient=newDistributeClient();client.getConnect();// 2 获取servers的子节点信息,从中获取服务器信息列表client.getServerList();// 3 业务进程启动client.business();}}4. 测试
4.1 在Linux命令行上操作增加减少服务器
(1) 启动DistributeClient客户端
(2) 在hadoop102上zk的客户端/servers目录上创建临时带序号节点
[zk:localhost:2181(CONNECTED)5]create-e-s/servers/hadoop102"hadoop102"Created /servers/hadoop1020000000000[zk:localhost:2181(CONNECTED)6]create-e-s/servers/hadoop103"hadoop103"Created /servers/hadoop1030000000001(3) 观察Idea控制台变化
[hadoop102, hadoop103](4) 执行删除操作
[zk:localhost:2181(CONNECTED)8]delete /servers/hadoop1020000000000(5) 观察Idea控制台变化
[hadoop103]4.2 在Idea上操作增加减少服务器
(1) 启动DistributeClient客户端(如果已经启动过,不需要重启)
(2) 启动DistributeServer服务
- 点击Edit Configurations…
- 在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102
- 回到DistributeServer的main方法,右键,在弹出的窗口中点击Run
观察DistributeServer控制台,提示hadoop102 is working
观察DistributeClient控制台,提示hadoop102已经上线
总结
| 角色 | 核心功能 | 关键代码 |
|---|---|---|
| DistributeServer | 向Zookeeper注册临时节点 | zk.create(..., CreateMode.EPHEMERAL_SEQUENTIAL) |
| DistributeClient | 监听/servers子节点变化,获取服务器列表 | zk.getChildren(parentNode, true) |
| 临时节点特性 | 服务器断开连接后节点自动删除,客户端收到通知 | EPHEMERAL_SEQUENTIAL |
| Watch机制 | 节点变化触发回调,重新获取列表 | process()中调用getServerList() |
