type
status
date
slug
summary
tags
category
icon
password

1、概述

Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等。
Curator由一系列的模块构成,
  • curator-framework:对zookeeper的底层api的一些封装
  • curator-client:提供一些客户端的操作,例如重试策略等
  • curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
引入依赖

2、Curator的基本Api

2.1 创建会话

2.1.1 使用静态工程方法创建客户端

一个例子如下:
newClient静态工厂方法包含四个主要参数:
参数名
说明
connectionString
服务器列表,格式host1:port1,host2:port2,...
retryPolicy
重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs
会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs
连接创建超时时间,单位毫秒,默认60000ms

2.1.2 使用Fluent风格的Api创建会话

核心参数变为流式设置,一个列子如下:

2.1.3 创建包含隔离命名空间的会话

为了实现不同的Zookeeper业务之间的隔离,需要为每个业务分配一个独立的命名空间(NameSpace),即指定一个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下面的例子)当客户端指定了独立命名空间为“/base”,那么该客户端对Zookeeper上的数据节点的操作都是基于该目录进行的。通过设置Chroot可以将客户端应用与Zookeeper服务端的一课子树相对应,在多个应用共用一个Zookeeper集群的场景下,这对于实现不同应用之间的相互隔离十分有意义。

2.2 启动客户端

当创建会话成功,得到client的实例然后可以直接调用其start( )方法:

2.3 数据节点操作

2.3.1 创建数据节点

Zookeeper的节点创建模式:
  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带序列号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:临时并且带序列号
创建一个节点,初始内容为空
注意:如果没有设置节点属性,节点创建模式默认为持久化节点,内容默认为空
创建一个节点,附带初始化内容
创建一个节点,指定创建模式(临时节点),内容为空
创建一个节点,指定创建模式(临时节点),附带初始化内容
创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点
这个creatingParentContainersIfNeeded()接口非常有用,因为一般情况开发人员在创建一个子节点必须判断它的父节点是否存在,如果不存在直接创建会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点。

2.3.2 删除数据节点

删除一个节点
注意,此方法只能删除叶子节点,否则会抛出异常。
删除一个节点,并且递归删除其所有的子节点
删除一个节点,强制指定版本进行删除
删除一个节点,强制保证删除
guaranteed()接口是一个保障措施,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功。
注意:上面的多个流式接口是可以自由组合的,例如:

2.3.3 读取数据节点数据

读取一个节点的数据内容
注意,此方法返的返回值是byte[ ];
读取一个节点的数据内容,同时获取到该节点的stat

2.3.4 更新数据节点数据

更新一个节点的数据内容
注意:该接口会返回一个Stat实例
更新一个节点的数据内容,强制指定版本进行更新

2.3.5 检查节点是否存在

注意:该方法返回一个Stat实例,用于检查ZNode是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用forPath( )指定要操作的ZNode

2.3.6 获取某个节点的所有子节点路径

注意:该方法的返回值为List<String>,获得ZNode的子节点Path列表。 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的父ZNode
 

2.4 事务

CuratorFramework的实例包含inTransaction( )接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交。一个例子如下:

2.5 异步接口

上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。
CuratorEventType
事件类型
对应CuratorFramework实例的方法
CREATE
#create()
DELETE
#delete()
EXISTS
#checkExists()
GET_DATA
#getData()
SET_DATA
#setData()
CHILDREN
#getChildren()
SYNC
#sync(String,Object)
GET_ACL
#getACL()
SET_ACL
#setACL()
WATCHED
#Watcher(Watcher)
CLOSING
#close()
响应码(#getResultCode())
响应码
意义
0
OK,即调用成功
-4
ConnectionLoss,即客户端与服务端断开连接
-110
NodeExists,即节点已经存在
-112
SessionExpired,即会话过期
一个异步创建节点的例子如下:
注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理。

3、Curator recipes(高级特性)

curator-recipes 提供了一些zk的典型使用场景的参考。下面主要介绍一下开发中常用的组件。

3.1 事件监听

zookeeper原生支持通过注册watcher来进行事件监听,但是其使用不是特别方便,需要开发人员自己反复注册watcher,比较繁琐。
Curator引入Cache来实现对zookeeper服务端事务的监听。Cache是Curator中对事件监听的包装,其对事件的监听其实可以近似看作是一个本地缓存视图和远程Zookeeper视图的对比过程。同时,Curator能够自动为开发人员处理反复注册监听,从而大大简化原生api开发的繁琐过程。

3.1.1 Node Cache

NodeCache可以监听指定的节点,注册监听器后,节点的变化会通知相应的监听器

3.1.2 Path Cache

Path Cache 用来监听ZNode的子节点事件,包括added、updateed、removed,Path Cache会同步子节点的状态,产生的事件会传递给注册的PathChildrenCacheListener。

3.1.3 Tree Cache

Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。

3.2 选举

curator 提供了两种方式,分别是 Leader Latch 和 Leader Election 。

3.2.1 Leader Latch

随机从候选着中选出一台作为leader,选中之后除非调用close()释放leadship,否则其他的后选择无法成为leader

3.2.2 Leader Election

通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。 而LeaderLatch则一直持有leadership, 除非调用close方法,否则它不会释放领导权。

3.3 分布式锁

3.3.1 可重入锁Shared Reentrant Lock

Shared意味着锁是全局可见的, 客户端都可以请求锁。 Reentrant和JDK的ReentrantLock类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。 它是由类InterProcessMutex来实现。 它的构造函数为:
通过acquire获得锁,并提供超时机制:
通过release()方法释放锁。 InterProcessMutex 实例可以重用。 Revoking ZooKeeper recipes wiki定义了可协商的撤销机制。 为了撤销mutex, 调用下面的方法:

3.3.2 不可重入锁Shared Lock

使用InterProcessSemaphoreMutex,调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入

3.3.3 可重入读写锁Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock. 一个读写锁管理一对相关的锁。 一个负责读操作,另外一个负责写操作。 读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。 此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。 这也意味着写锁可以降级成读锁, 比如请求写锁 —>读锁 —->释放写锁。 从读锁升级成写锁是不成的。 主要由两个类实现:

3.3.4 信号量Shared Semaphore

一个计数的信号量类似JDK的Semaphore。 JDK中Semaphore维护的一组许可(permits),而Cubator中称之为租约(Lease)。注意,所有的实例必须使用相同的numberOfLeases值。 调用acquire会返回一个租约对象。 客户端必须在finally中close这些租约对象,否则这些租约会丢失掉。 但是, 但是,如果客户端session由于某种原因比如crash丢掉, 那么这些客户端持有的租约会自动close, 这样其它客户端可以继续使用这些租约。 租约还可以通过下面的方式返还:
注意一次你可以请求多个租约,如果Semaphore当前的租约不够,则请求线程会被阻塞。 同时还提供了超时的重载方法:
主要类有:

3.3.5 多锁对象Multi Shared Lock

Multi Shared Lock是一个锁的容器。 当调用acquire, 所有的锁都会被acquire,如果请求失败,所有的锁都会被release。 同样调用release时所有的锁都被release(失败被忽略)。 基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。 主要涉及两个类:
它的构造函数需要包含的锁的集合,或者一组ZooKeeper的path。

3.4 栅栏barrier

1)DistributedBarrier构造函数中barrierPath参数用来确定一个栅栏,只要barrierPath参数相同(路径相同)就是同一个栅栏。通常情况下栅栏的使用如下:
1.主导client设置一个栅栏
2.其他客户端就会调用waitOnBarrier()等待栅栏移除,程序处理线程阻塞
3.主导client移除栅栏,其他客户端的处理程序就会同时继续运行。
DistributedBarrier类的主要方法如下:
setBarrier() - 设置栅栏
waitOnBarrier() - 等待栅栏移除
removeBarrier() - 移除栅栏
2)双栅栏Double Barrier
双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算,当计算完成时,离开栅栏。双栅栏类是DistributedDoubleBarrier DistributedDoubleBarrier类实现了双栅栏的功能。它的构造函数如下:
memberQty是成员数量,当enter方法被调用时,成员被阻塞,直到所有的成员都调用了enter。当leave方法被调用时,它也阻塞调用线程,直到所有的成员都调用了leave。
注意:参数memberQty的值只是一个阈值,而不是一个限制值。当等待栅栏的数量大于或等于这个值栅栏就会打开!
与栅栏(DistributedBarrier)一样,双栅栏的barrierPath参数也是用来确定是否是同一个栅栏的,双栅栏的使用情况如下:
1.从多个客户端在同一个路径上创建双栅栏(DistributedDoubleBarrier),然后调用enter()方法,等待栅栏数量达到memberQty时就可以进入栅栏。
2.栅栏数量达到memberQty,多个客户端同时停止阻塞继续运行,直到执行leave()方法,等待memberQty个数量的栅栏同时阻塞到leave()方法中。
3.memberQty个数量的栅栏同时阻塞到leave()方法中,多个客户端的leave()方法停止阻塞,继续运行。
DistributedDoubleBarrier类的主要方法如下: enter()、enter(long maxWait, TimeUnit unit) - 等待同时进入栅栏
leave()、leave(long maxWait, TimeUnit unit) - 等待同时离开栅栏
异常处理:DistributedDoubleBarrier会监控连接状态,当连接断掉时enter()和leave方法会抛出异常。

3.5 计数器Counters

利用ZooKeeper可以实现一个集群共享的计数器。 只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器, 一个是用int来计数,一个用long来计数。

3.5.1 SharedCount

这个类使用int类型来计数。 主要涉及三个类。
SharedCount代表计数器, 可以为它增加一个SharedCountListener,当计数器改变时此Listener可以监听到改变的事件,而SharedCountReader可以读取到最新的值, 包括字面值和带版本信息的值VersionedValue。

3.5.2 DistributedAtomicLong

除了计数的范围比SharedCount大了之外, 它首先尝试使用乐观锁的方式设置计数器, 如果不成功(比如期间计数器已经被其它client更新了), 它使用InterProcessMutex方式来更新计数值。 此计数器有一系列的操作:
  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增加特定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值
你必须检查返回结果的succeeded(), 它代表此操作是否成功。 如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。
Relate Posts
Kafka系列:Kafka入门Zookeeper笔记
mcbilla
mcbilla
一个普通的干饭人🍚
Announcement
type
status
date
slug
summary
tags
category
icon
password
🎉欢迎来到飙戈的博客🎉
-- 感谢您的支持 ---
👏欢迎学习交流👏