浅谈HBase在SpringBoot项目里的应用(含HBaseUtil工具类)
背景:
项目这两个月开始使用HBase来读写数据,网上现成的HBase工具类要么版本混杂,要么只是Demo级别的简单实现,各方面都不完善;
而且我发现HBase查询有很多种方式,首先大方向上有Get和Scan两种,其次行键、列族、列名(限定符)、列值(value)、时间戳版本等多种组合条件,还有各种过滤器的选择,协处理器的应用,所以必须根据自己项目需求和HBase行列设计来自定义HBase工具类和实现类!
经过我自己的研究整理,在此分享下初步的实现方案吧~
注:HBase版本:1.3.0-CDH5.13.0、SpringBoot版本:1.5.9
需要注意的是我用的是原生api,没有用和spring或者springboot整合的HbaseTemplate等,因为这方面资料较少而且听说并没有那么好用…
一、pom.xml依赖
org.apache.hbase hbase-client 1.3.0 org.slf4j slf4j-log4j12 log4j log4j javax.servlet servlet-api org.apache.hadoop hadoop-common 2.6.0 org.apache.hadoop hadoop-mapreduce-client-core 2.6.0 org.apache.hadoop hadoop-mapreduce-client-common 2.6.0 org.apache.hadoop hadoop-hdfs 2.6.0
二、application.yml项目配置
此处我是自定义HBase配置,后面会有专门的配置类来加载这个配置
hbase:
conf:
confMaps:
'hbase.zookeeper.quorum':'cdh1:2181,cdh2:2181,cdh3:2181'
三、HbaseConfig自定义配置类
HbaseConfig.java:
importorg.springframework.boot.context.properties.ConfigurationProperties; importorg.springframework.context.annotation.Configuration; importjava.util.Map; /** *Hbase-Conf配置 * *@Author:yuanj *@Date:2018/10/1210:49 */ @Configuration @ConfigurationProperties(prefix=HbaseConfig.CONF_PREFIX) publicclassHbaseConfig{ publicstaticfinalStringCONF_PREFIX="hbase.conf"; privateMapconfMaps; publicMap getconfMaps(){ returnconfMaps; } publicvoidsetconfMaps(Map confMaps){ this.confMaps=confMaps; } }
不了解@ConfigurationProperties这个注解的兄弟可以去百度下,它可以将application.yml中的配置导入到该类的成员变量里!
也就是说springboot项目启动完成后confMaps变量里已经存在一个key为hbase.zookeeper.quorum,value为cdh1:2181,cdh2:2181,cdh3:2181的entry了!
四、HBaseUtils工具类
首先添加SpringContextHolder工具类,下面会用到:
packagecom.moerlong.credit.core; importorg.springframework.beans.BeansException; importorg.springframework.context.ApplicationContext; importorg.springframework.context.ApplicationContextAware; importorg.springframework.stereotype.Component; /** *Spring的ApplicationContext的持有者,可以用静态方法的方式获取spring容器中的bean */ @Component publicclassSpringContextHolderimplementsApplicationContextAware{ privatestaticApplicationContextapplicationContext; @Override publicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{ SpringContextHolder.applicationContext=applicationContext; } publicstaticApplicationContextgetApplicationContext(){ assertApplicationContext(); returnapplicationContext; } @SuppressWarnings("unchecked") publicstaticTgetBean(StringbeanName){ assertApplicationContext(); return(T)applicationContext.getBean(beanName); } publicstatic TgetBean(Class requiredType){ assertApplicationContext(); returnapplicationContext.getBean(requiredType); } privatestaticvoidassertApplicationContext(){ if(SpringContextHolder.applicationContext==null){ thrownewRuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!"); } } }
HBaseUtils.java:
importcom.moerlong.credit.config.HbaseConfig; importcom.moerlong.credit.core.SpringContextHolder; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.hbase.*; importorg.apache.hadoop.hbase.client.*; importorg.apache.hadoop.hbase.client.coprocessor.AggregationClient; importorg.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; importorg.apache.hadoop.hbase.filter.*; importorg.apache.hadoop.hbase.util.Bytes; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.context.annotation.DependsOn; importorg.springframework.stereotype.Component; importorg.springframework.util.StopWatch; importjava.io.IOException; importjava.util.ArrayList; importjava.util.List; importjava.util.Map; importjava.util.NavigableMap; importjava.util.concurrent.ExecutorService; importjava.util.concurrent.Executors; @DependsOn("springContextHolder")//控制依赖顺序,保证springContextHolder类在之前已经加载 @Component publicclassHBaseUtils{ privateLoggerlogger=LoggerFactory.getLogger(this.getClass()); //手动获取hbaseConfig配置类对象 privatestaticHbaseConfighbaseConfig=SpringContextHolder.getBean("hbaseConfig"); privatestaticConfigurationconf=HBaseConfiguration.create(); privatestaticExecutorServicepool=Executors.newScheduledThreadPool(20);//设置连接池 privatestaticConnectionconnection=null; privatestaticHBaseUtilsinstance=null; privatestaticAdminadmin=null; privateHBaseUtils(){ if(connection==null){ try{ //将hbase配置类中定义的配置加载到连接池中每个连接里 MapconfMap=hbaseConfig.getconfMaps(); for(Map.Entry confEntry:confMap.entrySet()){ conf.set(confEntry.getKey(),confEntry.getValue()); } connection=ConnectionFactory.createConnection(conf,pool); admin=connection.getAdmin(); }catch(IOExceptione){ logger.error("HbaseUtils实例初始化失败!错误信息为:"+e.getMessage(),e); } } } //简单单例方法,如果autowired自动注入就不需要此方法 publicstaticsynchronizedHBaseUtilsgetInstance(){ if(instance==null){ instance=newHBaseUtils(); } returninstance; } /** *创建表 * *@paramtableName表名 *@paramcolumnFamily列族(数组) */ publicvoidcreateTable(StringtableName,String[]columnFamily)throwsIOException{ TableNamename=TableName.valueOf(tableName); //如果存在则删除 if(admin.tableExists(name)){ admin.disableTable(name); admin.deleteTable(name); logger.error("createhtableerror!thistable{}alreadyexists!",name); }else{ HTableDescriptordesc=newHTableDescriptor(name); for(Stringcf:columnFamily){ desc.addFamily(newHColumnDescriptor(cf)); } admin.createTable(desc); } } /** *插入记录(单行单列族-多列多值) * *@paramtableName表名 *@paramrow行名 *@paramcolumnFamilys列族名 *@paramcolumns列名(数组) *@paramvalues值(数组)(且需要和列一一对应) */ publicvoidinsertRecords(StringtableName,Stringrow,StringcolumnFamilys,String[]columns,String[]values)throwsIOException{ TableNamename=TableName.valueOf(tableName); Tabletable=connection.getTable(name); Putput=newPut(Bytes.toBytes(row)); for(inti=0;i >>map=rs.getMap(); for(Cellcell:rs.rawCells()){ StringBufferstringBuffer=newStringBuffer().append(Bytes.toString(cell.getRow())).append("\t") .append(Bytes.toString(cell.getFamily())).append("\t") .append(Bytes.toString(cell.getQualifier())).append("\t") .append(Bytes.toString(cell.getValue())).append("\n"); Stringstr=stringBuffer.toString(); record+=str; } returnrecord; } /** *查找单行单列族单列记录 * *@paramtablename表名 *@paramrowKey行名 *@paramcolumnFamily列族名 *@paramcolumn列名 *@return */ publicstaticStringselectValue(Stringtablename,StringrowKey,StringcolumnFamily,Stringcolumn)throwsIOException{ TableNamename=TableName.valueOf(tablename); Tabletable=connection.getTable(name); Getg=newGet(rowKey.getBytes()); g.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column)); Resultrs=table.get(g); returnBytes.toString(rs.value()); } /** *查询表中所有行(Scan方式) * *@paramtablename *@return */ publicStringscanAllRecord(Stringtablename)throwsIOException{ Stringrecord=""; TableNamename=TableName.valueOf(tablename); Tabletable=connection.getTable(name); Scanscan=newScan(); ResultScannerscanner=table.getScanner(scan); try{ for(Resultresult:scanner){ for(Cellcell:result.rawCells()){ StringBufferstringBuffer=newStringBuffer().append(Bytes.toString(cell.getRow())).append("\t") .append(Bytes.toString(cell.getFamily())).append("\t") .append(Bytes.toString(cell.getQualifier())).append("\t") .append(Bytes.toString(cell.getValue())).append("\n"); Stringstr=stringBuffer.toString(); record+=str; } } }finally{ if(scanner!=null){ scanner.close(); } } returnrecord; } /** *根据rowkey关键字查询报告记录 * *@paramtablename *@paramrowKeyword *@return */ publicListscanReportDataByRowKeyword(Stringtablename,StringrowKeyword)throwsIOException{ ArrayList<>list=newArrayList<>(); Tabletable=connection.getTable(TableName.valueOf(tablename)); Scanscan=newScan(); //添加行键过滤器,根据关键字匹配 RowFilterrowFilter=newRowFilter(CompareFilter.CompareOp.EQUAL,newSubstringComparator(rowKeyword)); scan.setFilter(rowFilter); ResultScannerscanner=table.getScanner(scan); try{ for(Resultresult:scanner){ //TODO此处根据业务来自定义实现 list.add(null); } }finally{ if(scanner!=null){ scanner.close(); } } returnlist; } /** *根据rowkey关键字和时间戳范围查询报告记录 * *@paramtablename *@paramrowKeyword *@return */ publicListscanReportDataByRowKeywordTimestamp(Stringtablename,StringrowKeyword,LongminStamp,LongmaxStamp)throwsIOException{ ArrayList<>list=newArrayList<>(); Tabletable=connection.getTable(TableName.valueOf(tablename)); Scanscan=newScan(); //添加scan的时间范围 scan.setTimeRange(minStamp,maxStamp); RowFilterrowFilter=newRowFilter(CompareFilter.CompareOp.EQUAL,newSubstringComparator(rowKeyword)); scan.setFilter(rowFilter); ResultScannerscanner=table.getScanner(scan); try{ for(Resultresult:scanner){ //TODO此处根据业务来自定义实现 list.add(null); } }finally{ if(scanner!=null){ scanner.close(); } } returnlist; } /** *删除表操作 * *@paramtablename */ publicvoiddeleteTable(Stringtablename)throwsIOException{ TableNamename=TableName.valueOf(tablename); if(admin.tableExists(name)){ admin.disableTable(name); admin.deleteTable(name); } } /** *利用协处理器进行全表count统计 * *@paramtablename */ publicLongcountRowsWithCoprocessor(Stringtablename)throwsThrowable{ TableNamename=TableName.valueOf(tablename); HTableDescriptordescriptor=admin.getTableDescriptor(name); StringcoprocessorClass="org.apache.hadoop.hbase.coprocessor.AggregateImplementation"; if(!descriptor.hasCoprocessor(coprocessorClass)){ admin.disableTable(name); descriptor.addCoprocessor(coprocessorClass); admin.modifyTable(name,descriptor); admin.enableTable(name); } //计时 StopWatchstopWatch=newStopWatch(); stopWatch.start(); Scanscan=newScan(); AggregationClientaggregationClient=newAggregationClient(conf); Longcount=aggregationClient.rowCount(name,newLongColumnInterpreter(),scan); stopWatch.stop(); System.out.println("RowCount:"+count+",全表count统计耗时:"+stopWatch.getTotalTimeMillis()); returncount; } }
五、使用
接下来只需要在项目业务类里注入hbaseUtils就可以使用了:
@Autowired
privateHBaseUtilshBaseUtils;
补充知识:springboot整合Hbase
springboot项目需要整合SpringCloud
依赖
org.apache.hbase hbase-shaded-client 1.2.6
yml配置:
自定义配置读取zookeeper配置
hbase:
zookeeper:
quorum:hbase126-node2:2181
config配置:
importnet.cc.commons.exception.CCRuntimeException; importorg.apache.hadoop.hbase.HBaseConfiguration; importorg.apache.hadoop.hbase.HConstants; importorg.apache.hadoop.hbase.client.Connection; importorg.apache.hadoop.hbase.client.ConnectionFactory; importorg.springframework.beans.factory.annotation.Value; importorg.springframework.context.annotation.Bean; importorg.springframework.context.annotation.Configuration; importorg.springframework.context.annotation.Scope; importjava.io.IOException; importjava.util.function.Supplier; /** *@Authorwangqiubao *@Date2019/9/2415:28 *@Description **/ @Configuration publicclassUcareHbaseConfiguration{ /** *读取HBase的zookeeper地址 */ @Value("${hbase.zookeeper.quorum}") privateStringquorum; /** *配置HBase连接参数 * *@return */ @Bean publicorg.apache.hadoop.conf.ConfigurationhbaseConfig(){ org.apache.hadoop.conf.Configurationconfig=HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM,quorum); returnconfig; } //每次调用get方法就会创建一个Connection @Bean publicSupplierhbaseConnSupplier(){ return()->{ try{ returnhbaseConnection(); }catch(IOExceptione){ thrownewCCRuntimeException(e); } }; } @Bean //@Scope标明模式,默认单例模式.prototype多例模式 //若是在其他类中直接@Autowired引入的,多例就无效了,因为那个类在初始化的时候,已经创建了创建了这个bean了,之后调用的时候,不会重新创建,若是想要实现多例,就要每次调用的时候,手动获取bean @Scope(value="prototype") publicConnectionhbaseConnection()throwsIOException{ returnConnectionFactory.createConnection(hbaseConfig()); } }
使用
spring管理
/** *内部已实现线程安全的连接池 */ @Autowired privateConnectionhbaseConnection;
插入/更新数据
publicvoidaaaa()throwsIOException{ try(Tabletable=hbaseConnection.getTable(TableName.valueOf("表名"))){//获取表连接 //配置一条数据 //行键 Putput=newPut(Bytes.toBytes("key主键")); put.addColumn(Bytes.toBytes("列族"),Bytes.toBytes("列"),Bytes.toBytes("值")); .....//每个有数据的列都要一个addColumn //put插入数据 table.put(put); } }
查询
根据主键查询内容
try(Tabletable=hbaseConnection.getTable(TableName.valueOf("表名"))){ Resultresult=table.get(newGet(asRowKey(date,acid))); if(result==null)returnnull; //列名为starttime,最后一条就是该航班最新的航迹 CelllatestCell=Iterables.getLast(result.listCells()); returnAdsbTrackProto.AdsbTrack.parseFrom(CellUtil.cloneValue(latestCell)); }
以上这篇浅谈HBase在SpringBoot项目里的应用(含HBaseUtil工具类)就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。