Error Recover and Version Check

最近经常在工作中碰到需要访问分布式共享的资源。一般这种资源都是互斥访问。但免不了A最开始在修改,过了一段时间会话过期了,同一资源又被B修改,之后A回过神来又想修改该资源,一般这时候修改会失败。这时候A需要重新申请对该资源的互斥访问。假设A、B都可以执行对资源X的原子++操作,通常来说只希望A或者B其中一个对资源X进行++操作(负责资源X的原子++操作),但有可能出现以上情况,流程如下:

  1. A获取资源X的互斥锁;
  2. A检查资源X的值等于1,准备修改为2;
  3. 但这时候A因为线程调度或者FullGC其他什么原因Hang住了,导致互斥锁会话过期,且资源X被分配给B负责;
  4. 这时候B又尝试获取X的互斥锁,资源X的值等于1,并将资源X的值修改为2,释放互斥锁;
  5. 这时候A回过神来,资源X又被分配给A负责
  6. A准备修改资源X为2,但因为互斥锁过期了,所以需要重新获取;

但是如果A重新获取互斥锁,然后直接修改资源X为2,那么就不满足+1的操作了。其实也就是B对资源X的修改被覆盖掉了。

这时候我们有两种选择:

  1. 直接让A进行的操作失败,让上游重试或者错误向上传递;
  2. 让A重新进行所有的流程,获取资源X的互斥锁,得到资源X的值,然后修改资源X的值+1,释放互斥锁;

如果采用第一种方式的话,我们原子++的操作的伪码如下。

1
2
3
4
acquire remote resource X exclude lock
var x = value(X)
set X = x + 1 // if error then throw exception
release remote resource X exclude lock

细心的小老弟可以观察到每次操作都要获取远程的互斥锁,难免操作成本有点高。可以想到的优化方式,可以像数据库连接池一样复用这个互斥锁。优化代码如下,不仅仅没有重复获取和释放互斥锁,也减少了每次去获取资源X的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
synchronized atomicIncrement() {
if don't acquire remote resource X exclude lock {
recover();
}
try {
set X = ++x; // if error then throw exception
} catch(Throwable t) {
recover();
throw t;
}
}

recover() {
acquire remote resource X exclude lock
x = value(X)
}

之前的伪码可以抽象成通用的分布式资源独占访问模型如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
synchronized operateRemoteResource() {
if don't acquire remote resource X exclude lock {
recover();
}
try {
check local x's info and
modify X
} catch(Throwable t) {
recover();
throw t;
}
}

synchronized recover() {
acquire remote resource X exclude lock
x = get X's info
}

一切看似很完美,我们还能怎么优化呢。假设check local x’s info是个耗时的操作,那将其从锁中移除,可以减少锁持有的时间。如果是直接移到锁外面,读到的local x’s info可能是stale的,因为从check local x’s info到operateRemoteResource可能因为修改异常已经重新recover过了,这时候local x’s info可能已经发生了变化,直接modify X其实是有问题的。

为了解决这个问题,我们可以通过类似MVCC的版本号,在check local x’s info 获取当前的version,然后在operateRemoteResource里面对比版本还是否一致,如果不一致,则说明已经重新recover过了,前面的检查是stale的local x’s info,这时候就需要放弃这次操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
opVersion = version
check local x's info

synchronized operateRemoteResource(opVersion) {
if (opVersion < version) {
throw ex;
}
try {
modify X
} catch(Throwable t) {
recover();
throw t;
}
}

synchronized recover() {
acquire remote resource X exclude lock
x = get X's info
version++;
}

最初的recover操作放到流程外

如果这个分布式资源在多线程的情况下并不需要通过锁来进行互斥访问,这时候一般不会依赖X的信息,而只是访问修改X而已,那么代码又可以变成如下。

1
2
3
4
5
6
7
8
9
10
11
12
operateRemoteResource() {
try {
modify X
} catch(Throwable t) {
recover();
throw t;
}
}

synchronized recover() {
acquire remote resource X exclude lock
}

但是这种又会带来另一个问题,当多线程同时出现修改X异常,则会出现多次recover的场景,这个是我们不希望看到的。我们希望的是针对通过一次recover就能恢复的异常只recover一次。代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
operateRemoteResource() {
opVersion = version
if (!recovering) {
throw ex;
}
try {
modify X
} catch(Throwable t) {
recover(opVersion);
throw t;
}
}

void recover(opVersion) {
if (opVersion < version || !recovering) {
return;
}
synchronized {
if (opVersion < version || !recovering) {
return;
}
recovering = false;
}
synchronized {
acquire remote resource X exclude lock
version++;
recovering = true;
}
}