curator-上手
curator是Netflix公司开源的一个zookeeper客户端,目前由apache进行维护。与原生客户端相比,curator的抽象层更高,功能也更丰富,是目前zookeeper使用范围最广的java客户端
依赖
<!-- 对zookeeper的底层api的一些封装 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>
创建客户端
public static CuratorFramework getCurator(){
return CuratorFrameworkFactory.builder()
.connectString("localhost:2181")//服务地址
.sessionTimeoutMs(10000)//会话超时时间
.retryPolicy(new RetryNTimes(3,5000))//重试策略
.namespace("dqn")//命名空间,指定命名空间后clinet的作用操作都会以其开头
.build();
}
使用
curator.start();//开启连接
curator.close();//断开连接
重试策略
重试策略主要分为两大类:
- RetryForever:代表一直重试,直到连接成功
- SleepingRetry:基于一定间隔时间的重试
判断服务状态
CuratorFrameworkState state = curator.getState();
System.out.println(state == CuratorFrameworkState.STARTED);
创建节点
curator.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //节点类型
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/data","aaa".getBytes());
CreateMode节点类型
- PERSISTENT:永久节点
- PERSISTENT_SEQUENTIAL:永久有序节点
- EPHEMERAL:临时节点
- EPHEMERAL_SEQUENTIAL:临时有序节点
获取节点信息
Stat stat = new Stat();
byte[] bytes = curator.getData().storingStatIn(stat).forPath("/data");
System.out.println("节点数据:"+new String(bytes));
System.out.println("节点信息:"+stat);
Stat节点信息
- czxid:数据节点创建时的事务id
- ctime:数据节点创建时的时间
- mzxid:数据节点最后一次更新时的事务id
- mtime:数据节点最后一次更新时的时间
- pzxid:数据节点的子节点最后一次被修改的事务id
- cversion:子节点的更改次数
- version:节点数据的更改次数
- aversion:节点的acl的更改次数
- ephemeralOwner:如果节点是临时节点,则表示创建该节点的会话的SessionID;如果节点是持久节点,则属性值为0
- dataLength:数据内容的长度
- numChildren:数据节点当前的子节点个数
获取子节点列表
List<String> list = curator.getChildren().forPath("/");
list.forEach(System.out::println);
更新节点
curator.setData().withVersion(0)//传入版本号,如果版本号错误则拒绝更新
.forPath("/data","bbb".getBytes());
删除节点
curator.delete()
.guaranteed()//如果删除失败,会重新执行,直到成功
.deletingChildrenIfNeeded()//如果有子节点会递归删除
.withVersion(1)//版本号,如果版本号有误则拒绝操作
.forPath("/data");
判断节点是否存在
Stat stat = curator.checkExists().forPath("/data");
System.out.println("节点是否存在:"+(stat!=null));
一次性监听
curator.getData().usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {
System.out.println("节点:"+event.getPath()+"发生了事件:"+event.getType());
}
}).forPath("/data");
永久性监听
//使用nodeCache包装节点,对其注册的监听作用于节点,是永久性的
NodeCache nodeCache = new NodeCache(curator,"/data");
//设置为true,代表创建nodeCache时,就去获取对应节点的值并缓存
nodeCache.start(true);
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData currentData = nodeCache.getCurrentData();
if (currentData != null){
System.out.println("节点路径:"+currentData.getPath()+
"数据:"+new String(currentData.getData()));
}
}
});
监听子节点
PathChildrenCache pathChildrenCache = new PathChildrenCache(curator,"/data",true);
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
List<ChildData> childDataList = pathChildrenCache.getCurrentData();
System.out.println("子节点列表:");
childDataList.forEach(childData -> System.out.println(childData.getPath()));
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
System.out.println("节点:"+event.getData().getPath()+
"发生事件:"+event.getType());
}
});
PathChildrenCache.StartMode初始化方式
- NORMAL:异步初始化
- BUILD_INITIAL_CACHE:同步初始化
- POST_INITIALIZED_EVENT:异步并通知,会调用INITIALIZED事件
PathChildrenCacheEvent.Type节点状态
- CHILD_ADDED:添加子节点
- CHILD_UPDATED:子节点被修改
- CHILD_REMOVED:子节点被删除
- CONNECTION_SUSPENDED:连接中断
- CONNECTION_RECONNECTED:重新连接
- CONNECTION_LOST:连接丢失
- INITIALIZED:初始化完成
事务
curator.inTransaction()
.create().forPath("/node2","aaa".getBytes())
.and().
create().forPath("/node3","bbb".getBytes())
.and()
.delete().forPath("/node1")
.and()
.commit();
一系列操作要么全部成功,要么全部失败