使用redis实现分布式锁基本就2步:第一多线程请求处理获取锁时,使用setnx指令,只有当key不存在时才设值,注意设置超时时间。
set key value [expiration EX seconds|PX milliseconds] [NX|XX]
第二相关业务处理完成之后,谁加的锁应该由谁去及时释放,即删除指定的key,注意删除时要匹配加锁线程的value,避免因超时误删其它线程的锁,这里使用Lua脚本执行原子操作,也可以先判断key value再del。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
----
下面使用单节点redis,redis版本5.0.8,版本客户端使用jedis3.2.0,完成相关加锁,释放锁的操作。
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
注意使用jedisPool时默认最大连接数是8,使用完记得关闭连接,否则会造成线程等待,set成功时返回"OK",lua脚本删除成功时,返回"1"。
public boolean lock(String key, String value, long waitTimeOut, SetParams params) {
Jedis jedis = null;
try {
jedis = JedisPool.getResource();
long
for (; ; ) {
String lock = jedis.set(key, value, params);
if ("OK".equals(lock)) {
return true;
}
long wait = System.currentTimeMillis() - start;
if (wait >= waitTimeOut) {
return false;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
log.error("");
return false;
}
}
} catch (Exception e) {
e.printStackTrace();
return false;
} finally {
jedis.close();
}
}
public boolean unlock(String key, String value) {
Jedis jedis = null;
try {
jedis = JedisPool.getResource();
String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +
"then return redis.call('del',KEYS[1]) " +
"else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(key),
Collections.singletonList(value));
if ("1".equals(result.toString())) {
return true;
}
return false;
} catch (Exception e) {
e.printStackTrace();
return false;
} finally {
jedis.close();
}
}
----
最后,在controller里简单测试一下锁的效果:
@GetMapping("hello")
public String hello() {
String value = System.nanoTime() + "." + UUID.randomUUID();
CountDownLatch latch = new CountDownLatch(1000);
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
redisLock.lock(KEY,value,1000,SetParams.setParams().nx().px(500000));
amount--;
System.out.println(Thread.currentThread() + ":" + amount);
redisLock.unlock(KEY,value);
latch.countDown();
}).start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return "amount:" + amount;
}
不加锁时多次执行会出现数据错误,添加锁后数据正常,可以使用专业的工具进行测试验证,这里为了等待所有线程执行完成使用了countDownLatch。
----
这里的分布式锁是基于redis单点完成的,使用单机可能会出现单节点故障。且锁是不可重入的。通常redis都是集群部署的,可以使用redission来完成集群redis的分布式锁操作,实现了redlock算法,且value关联线程,实现了锁的可重入。