分布式——基于zookeeper实现的分布式锁

1. 概述

之前在学校一直都没怎么关注分布式的的应用,在实际的开发场景中,因为都是一些小的项目,也没类似的需求。来到公司后才知道,几乎所有的大的应用都是在分布式环境下运行,就是每个业务都是独立开始部署的,同时每个应用都部署了N台机器。分布式环境一个非常大的区别就是应用并不是运行在一个 JVM 上的,那么所有以本地内存执行为前提的相关技术都将不适用,都需要重写。
所以我一直打算从零开始整理所有在分布式环境下所需要应用或改造的技术,于是就有了这个系列——分布式应用开发与学习。

关于分布式,同步与锁就是其中一例(当然还有非常非常多其它的技术),今天就整理了一下基于 ZooKeeper 的分布式锁的实现原理与方式。

2. ZooKeeper

ZooKeeper 是大数据处理框架 Hadoop 下的一个子项目,是一种用于分布式应用程序的高性能协调服务框架。ZooKeeper 维护大型集群环境中所需的公共对象。这些对象的示例包括配置信息,分层命名空间等。应用程序利用这些服务来协调大型集群的分布式处理。

它是如何工作的?

想象一下,Hadoop集群跨越500个或更多服务器。那么就需要有一个集中管理整个集群的方案,包括在名称、组和同步服务,配置管理等。Hadoop集群的其他开源项目也需要跨集群服务,ZooKeeper 就是为这个场景而生的,嵌入ZooKeeper可以很方便地构建同步服务。

对于应用程序,ZooKeeper提供了跨节点同步的基础结构。它通过在ZooKeeper服务器上的内存中维护状态类型信息来实现此目的。ZooKeeper服务器保留整个系统状态的副本,并将此信息保存在本地日志文件中。多个ZooKeeper服务器支持大型Hadoop集群(主服务器同步顶级服务器)。

在ZooKeeper中,应用程序可以创建所谓的znode(一个在ZooKeeper服务器上保留在内存中的文件)。znode可以由群集中的任何节点更新,群集中的任何节点都可以注册以通知该znode的更改(在ZooKeeper的说法中,服务器可以设置为“监视”特定的znode)。

使用这个znode基础结构,应用程序可以通过在ZooKeeper znode中更新它们的状态来同步它们在分布式集群中的任务,通知群集的其余部分特定节点的状态更改。此群集范围的状态集中服务对于跨大型分布式服务器集的管理和序列化任务至关重要。

更多关于 ZooKeeper 的信息可以去官网http://zookeeper.apache.org/或者查看我之前关于 ZooKeeper 的文章分布式——基于zookeeper构建分布式应用,这并不是我今天要整理的重点,但确实是前提,你必须了解znode的几种类型和各自的特点才能更好地理解我们怎么利用它的这些特性来实现分布式锁。

3. 锁服务

分布式锁是一种在进程集合之间提供互斥的机制。在任何时候,只有一个进程可以保持锁定。分布式锁可以用于大型分布式系统中的领导者选举,其中领导者是在任何时间点持有锁的进程。

不要将ZooKeeper自己的领导者选举与一般领导者选举服务混淆,后者可以使用ZooKeeper原语构建。Zoo-Keeper自己的领导人选举没有公开曝光,不像我们在这里描述的一般领导人选举服务的类型,

要使用ZooKeeper实现分布式锁,我们使用临时顺序节点。这个想法很简单:首先指定一个锁定节点,通常描述被锁定的实体,比如 /leader,那么想要获取锁的客户端创建临时顺序的节点作为锁节点的子节点,同时所有的客户端都会监听/leader的孩子节点(当发生变化时确认是否是自己获得了锁)。

在任何时间点,具有最低序列号的客户端都会持有锁。例如,如果两个客户端大约在同一时间创建子节点,/leader/lock-1/leader/lock-2,则创建/leader/lock-1的客户端持有锁,因为它的节点具有最低的序列号。

这样只需删除/leader/lock-1即可释放锁定,这个过程可以是主动的,也可能是被动的(比如客户端进程死亡,它将被删除,因为它是一个短暂的znode)。创建/leader/lock-2的客户端监听到这个事件后,在比较了所有的序号后,发现自己变成最小的序号了,于是将获得锁。

锁获取的伪代码如下:

  1. 在lock节点下创建一个名为lock-临时顺序节点并记住它的实际路径名(create操作的返回值),获取锁定节点的孩子列表并设置一个 Watch
  2. 如果在第1步中创建的节点的路径名具最小子顺序号,则已获取锁。
  3. 等待Watch设置的通知,在事件到达后然后转到步骤2。

4. 羊群效应

虽然这个算法是正确的,但它有一些问题。第一个问题是这种实施受到羊群效应的影响。考虑成百上千的客户端,都试图获得锁。每个客户端都会在锁定znode上放置一个监视,以查看其子集的更改。每次释放锁定时,所有的客户端都被唤醒,并来检查自己是否是最小序号的节点,从而产生“羊群效应”。

但实际上应该只有少数客户端(分布式锁场景应该只有一个客户端)可以进行实际的锁操作,在这种情况下,只有一个客户端将成功获取锁定。羊群效应导致了维护和发送监视事件到所有客户端的过程会导致流量高峰,这会给ZooKeeper服务器带来压力。

为了避免羊群效应,我们需要优化通知条件。实现锁的关键是,应该只有当具有先前序列号的子znode消失时,才需要通知该客户端,而不是在删除(或创建)任何子znode时都通知我。在我们的例子中,如果客户端创建了/leader/lock-1/leader/lock-2/leader/lock-3,那么当/leader/lock-2消失时,只需要通知持有/leader/ lock-3 节点的客户端。当/leader/lock-1消失或添加新的/leader/lock-4时,并不需要通知它。

5. 可恢复的异常

上面描述的锁定算法还有另一个问题是:没有处理由于创建操作失败的情况(比如由于连接丢失而失败)。回想一下,在这种情况下,我们不知道操作是成功还是失败。创建顺序节点是一种非幂等操作,因此我们不能简单地重试,因为如果第一次创建已经成功,我们将会有一个永远不会删除的孤立节点(至少在客户端会话结束之前)。

同时,重新连接后,客户端无法判断它是否创建了任何子znode。通过在znode名称中嵌入标识符,如果它遭受连接丢失,它可以检查锁定节点的任何子节点是否在其名称中具有其标识符。如果子节点包含其标识符,则它知道创建操作成功,它不应该创建另一个子节点。如果没有子标识符在其名称中,则客户端可以安全地创建新的顺序子节点。

客户端的会话标识符是一个长整数,对于ZooKeeper服务是唯一的,因此非常适合在连接丢失事件中标识客户端。可以通过调用ZooKeeper getSessionId() API 方法来获取会话标识符。

使用类似lock- <sessionId>的名称创建临时顺序节点,以便当ZooKeeper附加序列号时,名称将变为lock- <sessionId> - <sequenceNumber>。序列号对于父级是唯一的,而不是子级的名称,因此这种技术允许子节点识别其创建者以及添加创建顺序。

6. 不可恢复的异常

如果客户端的ZooKeeper会话到期,客户端创建的临时节点将被删除,有效地放弃锁定或至少丧失客户端获取锁定的权利。使用锁的应用程序应该意识到它不再持有锁,清理它的状态,然后通过创建一个新的锁对象并尝试获取它来重新开始。请注意,由应用程序控制此进程,而不是锁实现,因为它无法猜测应用程序如何清理其状态。

7. 具体实现

知道了上述的原理以及各种异常情况后,要具体实现就比较简单了,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
public class DistributedLock {

private static final byte[] data = { 0x12, 0x34 };
private ZooKeeperx zookeeper = ZooKeeperClient.getInstance();
private final String root; //根节点路径
private String id;
private LockNode idName;
private String ownerId;
private String lastChildId;
private Throwable other = null;
private KeeperException exception = null;
private InterruptedException interrupt = null;

public DistributedLock(String root) {
this.root = root;
ensureExists(root);
}

/**
* 尝试获取锁操作,阻塞式可被中断
*/
public void lock() throws InterruptedException, KeeperException {
// 可能初始化的时候就失败了
if (exception != null) {
throw exception;
}

if (interrupt != null) {
throw interrupt;
}

if (other != null) {
throw new NestableRuntimeException(other);
}

if (isOwner()) {//锁重入
return;
}

BooleanMutex mutex = new BooleanMutex();
acquireLock(mutex);
// 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试
try {
mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true
// mutex.get();
} catch (TimeoutException e) {
if (!mutex.state()) {
lock();
}
}

if (exception != null) {
throw exception;
}

if (interrupt != null) {
throw interrupt;
}

if (other != null) {
throw new NestableRuntimeException(other);
}
}

/**
* 尝试获取锁对象, 不会阻塞
*
* @throws InterruptedException
* @throws KeeperException
*/
public boolean tryLock() throws KeeperException {
// 可能初始化的时候就失败了
if (exception != null) {
throw exception;
}

if (isOwner()) {//锁重入
return true;
}

acquireLock(null);

if (exception != null) {
throw exception;
}

if (interrupt != null) {
Thread.currentThread().interrupt();
}

if (other != null) {
throw new NestableRuntimeException(other);
}

return isOwner();
}

/**
* 释放锁对象
*/
public void unlock() throws KeeperException {
if (id != null) {
try {
zookeeper.delete(root + "/" + id, -1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (KeeperException.NoNodeException e) {
// do nothing
} finally {
id = null;
}
} else {
//do nothing
}
}

private void ensureExists(final String path) {
try {
Stat stat = zookeeper.exists(path, false);
if (stat != null) {
return;
}

zookeeper.create(path, data, CreateMode.PERSISTENT);
} catch (KeeperException e) {
exception = e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
interrupt = e;
}
}

/**
* 返回锁对象对应的path
*/
public String getRoot() {
return root;
}

/**
* 判断当前是不是锁的owner
*/
public boolean isOwner() {
return id != null && ownerId != null && id.equals(ownerId);
}

/**
* 返回当前的节点id
*/
public String getId() {
return this.id;
}

// ===================== helper method =============================

/**
* 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作
*/
private Boolean acquireLock(final BooleanMutex mutex) {
try {
do {
if (id == null) {//构建当前lock的唯一标识
long sessionId = zookeeper.getDelegate().getSessionId();
String prefix = "x-" + sessionId + "-";
//如果第一次,则创建一个节点
String path = zookeeper.create(root + "/" + prefix, data,
CreateMode.EPHEMERAL_SEQUENTIAL);
int index = path.lastIndexOf("/");
id = StringUtils.substring(path, index + 1);
idName = new LockNode(id);
}

if (id != null) {
List<String> names = zookeeper.getChildren(root, false);
if (names.isEmpty()) {
id = null;//异常情况,重新创建一个
} else {
//对节点进行排序
SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
for (String name : names) {
sortedNames.add(new LockNode(name));
}

if (sortedNames.contains(idName) == false) {
id = null;//清空为null,重新创建一个
continue;
}

//将第一个节点做为ownerId
ownerId = sortedNames.first().getName();
if (mutex != null && isOwner()) {
mutex.set(true);//直接更新状态,返回
return true;
} else if (mutex == null) {
return isOwner();
}

SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
if (!lessThanMe.isEmpty()) {
//关注一下排队在自己之前的最近的一个节点
LockNode lastChildName = lessThanMe.last();
lastChildId = lastChildName.getName();
//异步watcher处理
zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {

public void asyncProcess(WatchedEvent event) {
acquireLock(mutex);
}

});

if (stat == null) {
acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去
}
} else {
if (isOwner()) {
mutex.set(true);
} else {
id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同
}
}
}
}
} while (id == null);
} catch (KeeperException e) {
exception = e;
if (mutex != null) {
mutex.set(true);
}
} catch (InterruptedException e) {
interrupt = e;
if (mutex != null) {
mutex.set(true);
}
} catch (Throwable e) {
other = e;
if (mutex != null) {
mutex.set(true);
}
}

if (isOwner() && mutex != null) {
mutex.set(true);
}
return Boolean.FALSE;
}
}

8. 第三方实现

由于这个场景太常见了,所以目前也现出了许多基于 ZooKeeper 的分布式的最佳实践,其中Apache Curator Reentrant Lock就是一个非常好的官方推荐的实现。它的使用方式非常的简单,大家可以去官网或者 Google 了解一下,有了这个,我们基本上不再需要自行去实现分布式锁服务了。大概的使用模式如下代码所示:

1
2
3
4
5
6
7
8
9
InterProcessMutex mutex = new InterProcessMutex(client, lockPath);
// 获取锁,这里是阻塞的,会一直等待直到获取锁
mutex.acquire();
try {
// 执行业务
} finally {
// 注意,一定要 finally 里面释放锁
mutex.release();
}