浅谈Java(SpringBoot)基于zookeeper的分布式锁实现
通过zookeeper实现分布式锁
1、创建zookeeper的client
首先通过CuratorFrameworkFactory创建一个连接zookeeper的连接CuratorFrameworkclient
publicclassCuratorFactoryBeanimplementsFactoryBean,InitializingBean,DisposableBean{ privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(ContractFileInfoController.class); privateStringconnectionString; privateintsessionTimeoutMs; privateintconnectionTimeoutMs; privateRetryPolicyretryPolicy; privateCuratorFrameworkclient; publicCuratorFactoryBean(StringconnectionString){ this(connectionString,500,500); } publicCuratorFactoryBean(StringconnectionString,intsessionTimeoutMs,intconnectionTimeoutMs){ this.connectionString=connectionString; this.sessionTimeoutMs=sessionTimeoutMs; this.connectionTimeoutMs=connectionTimeoutMs; } @Override publicvoiddestroy()throwsException{ LOGGER.info("Closingcuratorframework..."); this.client.close(); LOGGER.info("Closedcuratorframework."); } @Override publicCuratorFrameworkgetObject()throwsException{ returnthis.client; } @Override publicClass>getObjectType(){ returnthis.client!=null?this.client.getClass():CuratorFramework.class; } @Override publicbooleanisSingleton(){ returntrue; } @Override publicvoidafterPropertiesSet()throwsException{ if(StringUtils.isEmpty(this.connectionString)){ thrownewIllegalStateException("connectionStringcannotbeempty."); }else{ if(this.retryPolicy==null){ this.retryPolicy=newExponentialBackoffRetry(1000,2147483647,180000); } this.client=CuratorFrameworkFactory.newClient(this.connectionString,this.sessionTimeoutMs,this.connectionTimeoutMs,this.retryPolicy); this.client.start(); this.client.blockUntilConnected(30,TimeUnit.MILLISECONDS); } } publicvoidsetConnectionString(StringconnectionString){ this.connectionString=connectionString; } publicvoidsetSessionTimeoutMs(intsessionTimeoutMs){ this.sessionTimeoutMs=sessionTimeoutMs; } publicvoidsetConnectionTimeoutMs(intconnectionTimeoutMs){ this.connectionTimeoutMs=connectionTimeoutMs; } publicvoidsetRetryPolicy(RetryPolicyretryPolicy){ this.retryPolicy=retryPolicy; } publicvoidsetClient(CuratorFrameworkclient){ this.client=client; } }
2、封装分布式锁
根据CuratorFramework创建InterProcessMutex(分布式可重入排它锁)对一行数据进行上锁
publicInterProcessMutex(CuratorFrameworkclient,Stringpath){ this(client,path,newStandardLockInternalsDriver()); }
使用acquire方法
1、acquire():入参为空,调用该方法后,会一直堵塞,直到抢夺到锁资源,或者zookeeper连接中断后,上抛异常。
2、acquire(longtime,TimeUnitunit):入参传入超时时间、单位,抢夺时,如果出现堵塞,会在超过该时间后,返回false。
publicvoidacquire()throwsException{ if(!this.internalLock(-1L,(TimeUnit)null)){ thrownewIOException("Lostconnectionwhiletryingtoacquirelock:"+this.basePath); } } publicbooleanacquire(longtime,TimeUnitunit)throwsException{ returnthis.internalLock(time,unit); }
释放锁mutex.release();
publicvoidrelease()throwsException{ ThreadcurrentThread=Thread.currentThread(); InterProcessMutex.LockDatalockData=(InterProcessMutex.LockData)this.threadData.get(currentThread); if(lockData==null){ thrownewIllegalMonitorStateException("Youdonotownthelock:"+this.basePath); }else{ intnewLockCount=lockData.lockCount.decrementAndGet(); if(newLockCount<=0){ if(newLockCount<0){ thrownewIllegalMonitorStateException("Lockcounthasgonenegativeforlock:"+this.basePath); }else{ try{ this.internals.releaseLock(lockData.lockPath); }finally{ this.threadData.remove(currentThread); } } } } }
封装后的DLock代码
1、调用InterProcessMutexprocessMutex=dLock.mutex(path);
2、手动释放锁processMutex.release();
3、需要手动删除路径dLock.del(path);
推荐使用:
都是函数式编程
在业务代码执行完毕后会释放锁和删除path
1、这个有返回结果
publicTmutex(Stringpath,ZkLockCallbackzkLockCallback,longtime,TimeUnittimeUnit)
2、这个无返回结果
publicvoidmutex(Stringpath,ZkVoidCallBackzkLockCallback,longtime,TimeUnittimeUnit)
publicclassDLock{ privatefinalLoggerlogger; privatestaticfinallongTIMEOUT_D=100L; privatestaticfinalStringROOT_PATH_D="/dLock"; privateStringlockRootPath; privateCuratorFrameworkclient; publicDLock(CuratorFrameworkclient){ this("/dLock",client); } publicDLock(StringlockRootPath,CuratorFrameworkclient){ this.logger=LoggerFactory.getLogger(DLock.class); this.lockRootPath=lockRootPath; this.client=client; } publicInterProcessMutexmutex(Stringpath){ if(!StringUtils.startsWith(path,"/")){ path=Constant.keyBuilder(newObject[]{"/",path}); } returnnewInterProcessMutex(this.client,Constant.keyBuilder(newObject[]{this.lockRootPath,"",path})); } publicTmutex(Stringpath,ZkLockCallback zkLockCallback)throwsZkLockException{ returnthis.mutex(path,zkLockCallback,100L,TimeUnit.MILLISECONDS); } public Tmutex(Stringpath,ZkLockCallback zkLockCallback,longtime,TimeUnittimeUnit)throwsZkLockException{ StringfinalPath=this.getLockPath(path); InterProcessMutexmutex=newInterProcessMutex(this.client,finalPath); try{ if(!mutex.acquire(time,timeUnit)){ thrownewZkLockException("acquirezklockreturnfalse"); } }catch(Exceptionvar13){ thrownewZkLockException("acquirezklockfailed.",var13); } Tvar8; try{ var8=zkLockCallback.doInLock(); }finally{ this.releaseLock(finalPath,mutex); } returnvar8; } privatevoidreleaseLock(StringfinalPath,InterProcessMutexmutex){ try{ mutex.release(); this.logger.info("deletezknodepath:{}",finalPath); this.deleteInternal(finalPath); }catch(Exceptionvar4){ this.logger.error("dlock","releaselockfailed,path:{}",finalPath,var4); //LogUtil.error(this.logger,"dlock","releaselockfailed,path:{}",newObject[]{finalPath,var4}); } } publicvoidmutex(Stringpath,ZkVoidCallBackzkLockCallback,longtime,TimeUnittimeUnit)throwsZkLockException{ StringfinalPath=this.getLockPath(path); InterProcessMutexmutex=newInterProcessMutex(this.client,finalPath); try{ if(!mutex.acquire(time,timeUnit)){ thrownewZkLockException("acquirezklockreturnfalse"); } }catch(Exceptionvar13){ thrownewZkLockException("acquirezklockfailed.",var13); } try{ zkLockCallback.response(); }finally{ this.releaseLock(finalPath,mutex); } } publicStringgetLockPath(StringcustomPath){ if(!StringUtils.startsWith(customPath,"/")){ customPath=Constant.keyBuilder(newObject[]{"/",customPath}); } StringfinalPath=Constant.keyBuilder(newObject[]{this.lockRootPath,"",customPath}); returnfinalPath; } privatevoiddeleteInternal(StringfinalPath){ try{ ((ErrorListenerPathable)this.client.delete().inBackground()).forPath(finalPath); }catch(Exceptionvar3){ this.logger.info("deletezknodepath:{}failed",finalPath); } } publicvoiddel(StringcustomPath){ StringlockPath=""; try{ lockPath=this.getLockPath(customPath); ((ErrorListenerPathable)this.client.delete().inBackground()).forPath(lockPath); }catch(Exceptionvar4){ this.logger.info("deletezknodepath:{}failed",lockPath); } } }
@FunctionalInterface publicinterfaceZkLockCallback{ TdoInLock(); } @FunctionalInterface publicinterfaceZkVoidCallBack{ voidresponse(); } publicclassZkLockExceptionextendsException{ publicZkLockException(){ } publicZkLockException(Stringmessage){ super(message); } publicZkLockException(Stringmessage,Throwablecause){ super(message,cause); } }
配置CuratorConfig
@Configuration publicclassCuratorConfig{ @Value("${zk.connectionString}") privateStringconnectionString; @Value("${zk.sessionTimeoutMs:500}") privateintsessionTimeoutMs; @Value("${zk.connectionTimeoutMs:500}") privateintconnectionTimeoutMs; @Value("${zk.dLockRoot:/dLock}") privateStringdLockRoot; @Bean publicCuratorFactoryBeancuratorFactoryBean(){ returnnewCuratorFactoryBean(connectionString,sessionTimeoutMs,connectionTimeoutMs); } @Bean @Autowired publicDLockdLock(CuratorFrameworkclient){ returnnewDLock(dLockRoot,client); } }
测试代码
@RestController @RequestMapping("/dLock") publicclassLockController{ @Autowired privateDLockdLock; @RequestMapping("/lock") publicMaptestDLock(Stringno){ finalStringpath=Constant.keyBuilder("/test/no/",no); Longmutex=0l; try{ System.out.println("在拿锁:"+path+System.currentTimeMillis()); mutex=dLock.mutex(path,()->{ try{ System.out.println("拿到锁了"+System.currentTimeMillis()); Thread.sleep(10000); System.out.println("操作完成了"+System.currentTimeMillis()); }finally{ returnSystem.currentTimeMillis(); } },1000,TimeUnit.MILLISECONDS); }catch(ZkLockExceptione){ System.out.println("拿不到锁呀"+System.currentTimeMillis()); } returnCollections.singletonMap("ret",mutex); } @RequestMapping("/dlock") publicMaptestDLock1(Stringno){ finalStringpath=Constant.keyBuilder("/test/no/",no); Longmutex=0l; try{ System.out.println("在拿锁:"+path+System.currentTimeMillis()); InterProcessMutexprocessMutex=dLock.mutex(path); processMutex.acquire(); System.out.println("拿到锁了"+System.currentTimeMillis()); Thread.sleep(10000); processMutex.release(); System.out.println("操作完成了"+System.currentTimeMillis()); }catch(ZkLockExceptione){ System.out.println("拿不到锁呀"+System.currentTimeMillis()); e.printStackTrace(); }catch(Exceptione){ e.printStackTrace(); } returnCollections.singletonMap("ret",mutex); } @RequestMapping("/del") publicMapdelDLock(Stringno){ finalStringpath=Constant.keyBuilder("/test/no/",no); dLock.del(path); returnCollections.singletonMap("ret",1); } }
以上所述是小编给大家介绍的Java(SpringBoot)基于zookeeper的分布式锁实现详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对毛票票网站的支持!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。