springboot整合hbase

mac2026-04-18  8

前言

因公司业务场景,原本使用phoenix数据库(hbase的一个sql层)的查询服务需要改为直连hbase,因为phoenix建表 映射到hbase这个过程需要的时间太长了,大数据组的同事是这么说的,也许是我们服务器资源不够吧。原本写的很 nice的sql要全部重构了!

java连接hbase注意事项

1、对hbase不熟悉的建议先大致了解一下它的基本原理及重要组成部分,它是一个非关系型的分布式存储系统 2、选择正确版本的jar,根据hbase的版本选择,实在不知道的话可以去hbase的安装目录里找hbase-client.jar 3、注意数据入库时的编码与查询的时候解码的格式是否一致,否则会出现乱码现象 4、查询时注意检查和确定几个关键:表名、列族、列名(区分大小写),列名的大小写弄错将导致过滤器失效! 5、本文使用的是原生的java的api,除此之外可以使用与spring整合的hbaseTemplate操作hbase,有过一层小小的封装

一、导入相应的jar包

<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency>

注:因为我项目中有引用phoenix-core.jar,里面包含了hbase相关的包所以不需要引用这个包了

二、hbase相关配置文件搞起来(application.properties)

hbase.zookeeper.quorum=hadoop-02,hadoop-03,hadoop-04 hbase.zookeeper.property.clientPort=2181 hbase.rootdir=/hbase

三、添加读取配置文件的类

@Component public class HbaseConfig { public static String quorum; public static String zkPort; public static String rootdir; public static final String COLUMN_FAMILY = "info"; public static final String TABLE_NAME_RESUME_LIST = "ODS_RESUME_LIST_PHOENIX"; public static final String TABLE_NAME_CV_RESUME_INFO = "ODS_RESUME_LIST_PHOENIX"; @Value("${hbase.zookeeper.quorum}") public void setQuorum(String quorum) { this.quorum = quorum; } @Value("${hbase.zookeeper.property.clientPort}") public void setZkPort(String zkPort) { this.zkPort = zkPort; } @Value("${hbase.rootdir}") public void setRootdir(String rootdir) { this.rootdir = rootdir; } }

四、编写连接hbase工具类

@Component public class HbaseUtils { private Logger LOGGER = LoggerFactory.getLogger(this.getClass()); private static Configuration conf = HBaseConfiguration.create(); private static ExecutorService pool = Executors.newScheduledThreadPool(20); // 设置连接池 private static Connection connection = null; private static HbaseUtils instance = null; private static Admin admin = null; private HbaseUtils() { if (connection == null) { try { conf.set("hbase.zookeeper.quorum", HbaseConfig.quorum); conf.set("hbase.zookeeper.property.clientPort", HbaseConfig.zkPort); conf.set("hbase.rootdir", HbaseConfig.rootdir); connection = ConnectionFactory.createConnection(conf, pool); admin = connection.getAdmin(); } catch (IOException e) { LOGGER.error("HbaseUtils实例初始化失败!错误信息为:" + e.getMessage(), e); } } } // 简单单例方法,如果autowired自动注入就不需要此方法 public static synchronized HbaseUtils getInstance() { if (instance == null) { instance = new HbaseUtils(); } return instance; } public static Connection getConn() { return connection; } public int countByFilter(String tableName, FilterList filterList) { if (StringUtils.isBlank(tableName)) { return 0; } int count = 0; try { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); scan.setFilter(filterList); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { if (result.size() > 0) { count++; } } return count; } catch (IOException e) { LOGGER.error("query hbase error.tableName={}", tableName, e); } return count; } public List<Map<String, String>> listByFilter(String tableName, FilterList filterList) { if (StringUtils.isBlank(tableName)) { return Collections.emptyList(); } try { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); scan.setFilter(filterList); ResultScanner scanner = table.getScanner(scan); List<Map<String, String>> list = new ArrayList<>(); for (Result result : scanner) { Map<String, String> obj = new HashMap<>(); for (Cell cell : result.listCells()) { obj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)).trim()); } list.add(obj); } return list; } catch (IOException e) { LOGGER.error("query hbase error.tableName={}", tableName, e); } return Collections.emptyList(); } public static SingleColumnValueFilter builEqualValueFilter(String column, String value) { SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter( Bytes.toBytes(HbaseConfig.COLUMN_FAMILY), Bytes.toBytes(column), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(value)); singleColumnValueFilter.setFilterIfMissing(true); return singleColumnValueFilter; } }

五、单元测试

@Test public void testHBase1() { // System.out.println(JSON.toJSONString(hbaseResumeListService.list("华为", ""))); // 获取所有表名 try { Admin admin = HbaseUtils.getInstance().getConn().getAdmin(); TableName[] tableNames = admin.listTableNames(); for (TableName tableName : tableNames) { System.out.println(tableName.getNameAsString()); } } catch (Exception e) { e.printStackTrace(); } } @Test public void testHBase5() throws IOException { //建表 Admin admin = HbaseUtils.getInstance().getConn().getAdmin(); HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("T_XP_TEST")); //设置列族名 hTableDescriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("base"))); admin.createTable(hTableDescriptor); System.out.println("表是否建成功?=" + admin.tableExists(TableName.valueOf("T_XP_TEST"))); } @Test public void testHBaseAdd() throws IOException { //插入数据 Table table = HbaseUtils.getInstance().getConn().getTable(TableName.valueOf("T_XP_TEST")); Put put = new Put(Bytes.toBytes("r1")); put.addColumn(Bytes.toBytes("base"), Bytes.toBytes("name"), Bytes.toBytes("xp")); put.addColumn(Bytes.toBytes("base"), Bytes.toBytes("age"), Bytes.toBytes("18")); put.addColumn(Bytes.toBytes("base"), Bytes.toBytes("gender"), Bytes.toBytes("1")); put.addColumn(Bytes.toBytes("base"), Bytes.toBytes("phone"), Bytes.toBytes("13262831974")); put.addColumn(Bytes.toBytes("base"), Bytes.toBytes("team"), Bytes.toBytes("湖人")); table.put(put); } @Test public void testHBaseAll() throws IOException { //扫描全表数据 Table table = HbaseUtils.getInstance().getConn().getTable(TableName.valueOf("T_XP_TEST")); ResultScanner scanner = table.getScanner(new Scan()); List<Map<String, String>> list = new ArrayList<>(); for (Result result : scanner) { Map<String, String> obj = new HashMap<>(); for (Cell cell : result.listCells()) { obj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } list.add(obj); } System.out.println(JSON.toJSONString(list)); } @Test public void testHBaseFilter() throws IOException { Table table = HbaseUtils.getInstance().getConn().getTable(TableName.valueOf("T_XP_TEST")); Scan scan = new Scan(); // 设置过滤器,设置多个时用scan.setFilter(filterList); filterList.add.... scan.setFilter(new SingleColumnValueFilter(Bytes.toBytes("base"), Bytes.toBytes("team"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes("湖人"))); ResultScanner scanner = table.getScanner(scan); List<Map<String, String>> list = new ArrayList<>(); for (Result result : scanner) { Map<String, String> obj = new HashMap<>(); for (Cell cell : result.listCells()) { obj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } list.add(obj); } System.out.println(JSON.toJSONString(list)); }
最新回复(0)