浅谈Java(SpringBoot)基于zookeeper的分布式锁实现
通过zookeeper实现分布式锁
1、创建zookeeper的client
首先通过CuratorFrameworkFactory创建一个连接zookeeper的连接CuratorFrameworkclient
publicclassCuratorFactoryBeanimplementsFactoryBean,InitializingBean,DisposableBean{ privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(ContractFileInfoController.class); privateStringconnectionString; privateintsessionTimeoutMs; privateintconnectionTimeoutMs; privateRetryPolicyretryPolicy; privateCuratorFrameworkclient; publicCuratorFactoryBean(StringconnectionString){ this(connectionString,500,500); } publicCuratorFactoryBean(StringconnectionString,intsessionTimeoutMs,intconnectionTimeoutMs){ this.connectionString=connectionString; this.sessionTimeoutMs=sessionTimeoutMs; this.connectionTimeoutMs=connectionTimeoutMs; } @Override publicvoiddestroy()throwsException{ LOGGER.info("Closingcuratorframework..."); this.client.close(); LOGGER.info("Closedcuratorframework."); } @Override publicCuratorFrameworkgetObject()throwsException{ returnthis.client; } @Override publicClass>getObjectType(){ returnthis.client!=null?this.client.getClass():CuratorFramework.class; } @Override publicbooleanisSingleton(){ returntrue; } @Override publicvoidafterPropertiesSet()throwsException{ if(StringUtils.isEmpty(this.connectionString)){ thrownewIllegalStateException("connectionStringcannotbeempty."); }else{ if(this.retryPolicy==null){ this.retryPolicy=newExponentialBackoffRetry(1000,2147483647,180000); } this.client=CuratorFrameworkFactory.newClient(this.connectionString,this.sessionTimeoutMs,this.connectionTimeoutMs,this.retryPolicy); this.client.start(); this.client.blockUntilConnected(30,TimeUnit.MILLISECONDS); } } publicvoidsetConnectionString(StringconnectionString){ this.connectionString=connectionString; } publicvoidsetSessionTimeoutMs(intsessionTimeoutMs){ this.sessionTimeoutMs=sessionTimeoutMs; } publicvoidsetConnectionTimeoutMs(intconnectionTimeoutMs){ this.connectionTimeoutMs=connectionTimeoutMs; } publicvoidsetRetryPolicy(RetryPolicyretryPolicy){ this.retryPolicy=retryPolicy; } publicvoidsetClient(CuratorFrameworkclient){ this.client=client; } }
2、封装分布式锁
根据CuratorFramework创建InterProcessMutex(分布式可重入排它锁)对一行数据进行上锁
publicInterProcessMutex(CuratorFrameworkclient,Stringpath){
this(client,path,newStandardLockInternalsDriver());
}
使用acquire方法
1、acquire():入参为空,调用该方法后,会一直堵塞,直到抢夺到锁资源,或者zookeeper连接中断后,上抛异常。
2、acquire(longtime,TimeUnitunit):入参传入超时时间、单位,抢夺时,如果出现堵塞,会在超过该时间后,返回false。
publicvoidacquire()throwsException{
if(!this.internalLock(-1L,(TimeUnit)null)){
thrownewIOException("Lostconnectionwhiletryingtoacquirelock:"+this.basePath);
}
}
publicbooleanacquire(longtime,TimeUnitunit)throwsException{
returnthis.internalLock(time,unit);
}
释放锁mutex.release();
publicvoidrelease()throwsException{
ThreadcurrentThread=Thread.currentThread();
InterProcessMutex.LockDatalockData=(InterProcessMutex.LockData)this.threadData.get(currentThread);
if(lockData==null){
thrownewIllegalMonitorStateException("Youdonotownthelock:"+this.basePath);
}else{
intnewLockCount=lockData.lockCount.decrementAndGet();
if(newLockCount<=0){
if(newLockCount<0){
thrownewIllegalMonitorStateException("Lockcounthasgonenegativeforlock:"+this.basePath);
}else{
try{
this.internals.releaseLock(lockData.lockPath);
}finally{
this.threadData.remove(currentThread);
}
}
}
}
}
封装后的DLock代码
1、调用InterProcessMutexprocessMutex=dLock.mutex(path);
2、手动释放锁processMutex.release();
3、需要手动删除路径dLock.del(path);
推荐使用:
都是函数式编程
在业务代码执行完毕后会释放锁和删除path
1、这个有返回结果
publicTmutex(Stringpath,ZkLockCallbackzkLockCallback,longtime,TimeUnittimeUnit)
2、这个无返回结果
publicvoidmutex(Stringpath,ZkVoidCallBackzkLockCallback,longtime,TimeUnittimeUnit)
publicclassDLock{
privatefinalLoggerlogger;
privatestaticfinallongTIMEOUT_D=100L;
privatestaticfinalStringROOT_PATH_D="/dLock";
privateStringlockRootPath;
privateCuratorFrameworkclient;
publicDLock(CuratorFrameworkclient){
this("/dLock",client);
}
publicDLock(StringlockRootPath,CuratorFrameworkclient){
this.logger=LoggerFactory.getLogger(DLock.class);
this.lockRootPath=lockRootPath;
this.client=client;
}
publicInterProcessMutexmutex(Stringpath){
if(!StringUtils.startsWith(path,"/")){
path=Constant.keyBuilder(newObject[]{"/",path});
}
returnnewInterProcessMutex(this.client,Constant.keyBuilder(newObject[]{this.lockRootPath,"",path}));
}
publicTmutex(Stringpath,ZkLockCallbackzkLockCallback)throwsZkLockException{
returnthis.mutex(path,zkLockCallback,100L,TimeUnit.MILLISECONDS);
}
publicTmutex(Stringpath,ZkLockCallbackzkLockCallback,longtime,TimeUnittimeUnit)throwsZkLockException{
StringfinalPath=this.getLockPath(path);
InterProcessMutexmutex=newInterProcessMutex(this.client,finalPath);
try{
if(!mutex.acquire(time,timeUnit)){
thrownewZkLockException("acquirezklockreturnfalse");
}
}catch(Exceptionvar13){
thrownewZkLockException("acquirezklockfailed.",var13);
}
Tvar8;
try{
var8=zkLockCallback.doInLock();
}finally{
this.releaseLock(finalPath,mutex);
}
returnvar8;
}
privatevoidreleaseLock(StringfinalPath,InterProcessMutexmutex){
try{
mutex.release();
this.logger.info("deletezknodepath:{}",finalPath);
this.deleteInternal(finalPath);
}catch(Exceptionvar4){
this.logger.error("dlock","releaselockfailed,path:{}",finalPath,var4);
//LogUtil.error(this.logger,"dlock","releaselockfailed,path:{}",newObject[]{finalPath,var4});
}
}
publicvoidmutex(Stringpath,ZkVoidCallBackzkLockCallback,longtime,TimeUnittimeUnit)throwsZkLockException{
StringfinalPath=this.getLockPath(path);
InterProcessMutexmutex=newInterProcessMutex(this.client,finalPath);
try{
if(!mutex.acquire(time,timeUnit)){
thrownewZkLockException("acquirezklockreturnfalse");
}
}catch(Exceptionvar13){
thrownewZkLockException("acquirezklockfailed.",var13);
}
try{
zkLockCallback.response();
}finally{
this.releaseLock(finalPath,mutex);
}
}
publicStringgetLockPath(StringcustomPath){
if(!StringUtils.startsWith(customPath,"/")){
customPath=Constant.keyBuilder(newObject[]{"/",customPath});
}
StringfinalPath=Constant.keyBuilder(newObject[]{this.lockRootPath,"",customPath});
returnfinalPath;
}
privatevoiddeleteInternal(StringfinalPath){
try{
((ErrorListenerPathable)this.client.delete().inBackground()).forPath(finalPath);
}catch(Exceptionvar3){
this.logger.info("deletezknodepath:{}failed",finalPath);
}
}
publicvoiddel(StringcustomPath){
StringlockPath="";
try{
lockPath=this.getLockPath(customPath);
((ErrorListenerPathable)this.client.delete().inBackground()).forPath(lockPath);
}catch(Exceptionvar4){
this.logger.info("deletezknodepath:{}failed",lockPath);
}
}
}
@FunctionalInterface publicinterfaceZkLockCallback{ TdoInLock(); } @FunctionalInterface publicinterfaceZkVoidCallBack{ voidresponse(); } publicclassZkLockExceptionextendsException{ publicZkLockException(){ } publicZkLockException(Stringmessage){ super(message); } publicZkLockException(Stringmessage,Throwablecause){ super(message,cause); } }
配置CuratorConfig
@Configuration
publicclassCuratorConfig{
@Value("${zk.connectionString}")
privateStringconnectionString;
@Value("${zk.sessionTimeoutMs:500}")
privateintsessionTimeoutMs;
@Value("${zk.connectionTimeoutMs:500}")
privateintconnectionTimeoutMs;
@Value("${zk.dLockRoot:/dLock}")
privateStringdLockRoot;
@Bean
publicCuratorFactoryBeancuratorFactoryBean(){
returnnewCuratorFactoryBean(connectionString,sessionTimeoutMs,connectionTimeoutMs);
}
@Bean
@Autowired
publicDLockdLock(CuratorFrameworkclient){
returnnewDLock(dLockRoot,client);
}
}
测试代码
@RestController
@RequestMapping("/dLock")
publicclassLockController{
@Autowired
privateDLockdLock;
@RequestMapping("/lock")
publicMaptestDLock(Stringno){
finalStringpath=Constant.keyBuilder("/test/no/",no);
Longmutex=0l;
try{
System.out.println("在拿锁:"+path+System.currentTimeMillis());
mutex=dLock.mutex(path,()->{
try{
System.out.println("拿到锁了"+System.currentTimeMillis());
Thread.sleep(10000);
System.out.println("操作完成了"+System.currentTimeMillis());
}finally{
returnSystem.currentTimeMillis();
}
},1000,TimeUnit.MILLISECONDS);
}catch(ZkLockExceptione){
System.out.println("拿不到锁呀"+System.currentTimeMillis());
}
returnCollections.singletonMap("ret",mutex);
}
@RequestMapping("/dlock")
publicMaptestDLock1(Stringno){
finalStringpath=Constant.keyBuilder("/test/no/",no);
Longmutex=0l;
try{
System.out.println("在拿锁:"+path+System.currentTimeMillis());
InterProcessMutexprocessMutex=dLock.mutex(path);
processMutex.acquire();
System.out.println("拿到锁了"+System.currentTimeMillis());
Thread.sleep(10000);
processMutex.release();
System.out.println("操作完成了"+System.currentTimeMillis());
}catch(ZkLockExceptione){
System.out.println("拿不到锁呀"+System.currentTimeMillis());
e.printStackTrace();
}catch(Exceptione){
e.printStackTrace();
}
returnCollections.singletonMap("ret",mutex);
}
@RequestMapping("/del")
publicMapdelDLock(Stringno){
finalStringpath=Constant.keyBuilder("/test/no/",no);
dLock.del(path);
returnCollections.singletonMap("ret",1);
}
}
以上所述是小编给大家介绍的Java(SpringBoot)基于zookeeper的分布式锁实现详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对毛票票网站的支持!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。