zookeeper的断线重连实现

zookeeper简介

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。
我们可以使用zookeeper做程序的健康监测(EPHEMERAL 临时节点)、公共配置文件、集群管理(leader 选举)等等。
但是zookeeper并没有提供断线重连的功能,必须我们手动实现,这里使用 Curator来实现了zookeeper的断线重连功能,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import java.io.UnsupportedEncodingException;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;


/**
* @ClassName: ZookeeperExcutor
* @Description: zookeeper连接处理器
*/
public class ZookeeperExcutor {

private CuratorFramework client;

public ZookeeperExcutor(String zklist,int sessionTimeout,int connectTimeout){
client = CuratorFrameworkFactory.builder()
.connectString(zklist).sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectTimeout)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
}

public CuratorFramework getClient() {
return client;
}

/**
* @Title: createNodeAddListener
* @Description: 添加node节点
* @param nodePath
* @param nodeData 设定文件
* @return void 返回类型
*/
public String createNode(String nodePath,String nodeData){
if(client!=null){
try {
String nodeName=client.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(nodePath, nodeData.getBytes("UTF-8"));
return nodeName;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return null;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
return null;
}
/**
* @Title: getListener
* @Description: 为节点添加 connectState 监听器,实现断线重连,然后添加上节点
* @param nodePath 节点路径
* @param nodeData 节点数据
* @return void 返回类型
*/
public ConnectionStateListener getListener(final String nodePath,final String nodeData){
if(null!=client){
ConnectionStateListener connectListener = new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
if (connectionState == ConnectionState.LOST) {
while (true) {
try {
//手动重连
boolean flag=curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut();
if (flag){
//重新添加节点
clearListener();
createNode(nodePath, nodeData);
client.getConnectionStateListenable().addListener(getListener(nodePath, nodeData));
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}else if(connectionState==ConnectionState.RECONNECTED){
//重新连接成功
}else if(connectionState==ConnectionState.SUSPENDED){
//自动重连,自动新建 schedular的临时节点
}
}

};
return connectListener;
}
return null;
}

public void clearListener(){
ListenerContainer<ConnectionStateListener> list=(ListenerContainer<ConnectionStateListener>) client.getConnectionStateListenable();
list.clear();
}

public void addListener(String nodePath,String nodeData){
client.getConnectionStateListenable().addListener(getListener(nodePath, nodeData));
}

public static void main(String[] args) {
ZookeeperExcutor zke=new ZookeeperExcutor("127.0.0.1:2181",10000, 10000);
String nodeName=zke.createNode("/Test", "test");
if(null!=nodeName){
zke.addListener("/Test", "test");
}
}
}

关于curator

Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量,原生的zookeeper实现起来稍微麻烦一点。
下面是Curator的maven配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.4.2</version>
</dependency>