前言
因公司业务场景,原本使用phoenix数据库(hbase的一个sql层)的查询服务需要改为直连hbase,因为phoenix建表
映射到hbase这个过程需要的时间太长了,大数据组的同事是这么说的,也许是我们服务器资源不够吧。原本写的很
nice的sql要全部重构了!
java连接hbase注意事项
1、对hbase不熟悉的建议先大致了解一下它的基本原理及重要组成部分,它是一个非关系型的分布式存储系统 2、选择正确版本的jar,根据hbase的版本选择,实在不知道的话可以去hbase的安装目录里找hbase-client.jar 3、注意数据入库时的编码与查询的时候解码的格式是否一致,否则会出现乱码现象 4、查询时注意检查和确定几个关键:表名、列族、列名(区分大小写),列名的大小写弄错将导致过滤器失效! 5、本文使用的是原生的java的api,除此之外可以使用与spring整合的hbaseTemplate操作hbase,有过一层小小的封装
一、导入相应的jar包
<dependency>
<groupId>org
.apache
.hbase
</groupId
>
<artifactId>hbase
-client
</artifactId
>
<version>1.2.0</version
>
<exclusions>
<exclusion>
<groupId>org
.slf4j
</groupId
>
<artifactId>slf4j
-log4j12
</artifactId
>
</exclusion
>
<exclusion>
<groupId>log4j
</groupId
>
<artifactId>log4j
</artifactId
>
</exclusion
>
<exclusion>
<groupId>javax
.servlet
</groupId
>
<artifactId>servlet
-api
</artifactId
>
</exclusion
>
</exclusions
>
</dependency
>
注:因为我项目中有引用phoenix-core.jar,里面包含了hbase相关的包所以不需要引用这个包了
二、hbase相关配置文件搞起来(application.properties)
hbase
.zookeeper
.quorum
=hadoop
-02,hadoop
-03,hadoop
-04
hbase
.zookeeper
.property
.clientPort
=2181
hbase
.rootdir
=/hbase
三、添加读取配置文件的类
@Component
public class HbaseConfig {
public static String quorum
;
public static String zkPort
;
public static String rootdir
;
public static final String COLUMN_FAMILY
= "info";
public static final String TABLE_NAME_RESUME_LIST
= "ODS_RESUME_LIST_PHOENIX";
public static final String TABLE_NAME_CV_RESUME_INFO
= "ODS_RESUME_LIST_PHOENIX";
@Value("${hbase.zookeeper.quorum}")
public void setQuorum(String quorum
) {
this.quorum
= quorum
;
}
@Value("${hbase.zookeeper.property.clientPort}")
public void setZkPort(String zkPort
) {
this.zkPort
= zkPort
;
}
@Value("${hbase.rootdir}")
public void setRootdir(String rootdir
) {
this.rootdir
= rootdir
;
}
}
四、编写连接hbase工具类
@Component
public class HbaseUtils {
private Logger LOGGER
= LoggerFactory
.getLogger(this.getClass());
private static Configuration conf
= HBaseConfiguration
.create();
private static ExecutorService pool
= Executors
.newScheduledThreadPool(20);
private static Connection connection
= null
;
private static HbaseUtils instance
= null
;
private static Admin admin
= null
;
private HbaseUtils() {
if (connection
== null
) {
try {
conf
.set("hbase.zookeeper.quorum", HbaseConfig
.quorum
);
conf
.set("hbase.zookeeper.property.clientPort", HbaseConfig
.zkPort
);
conf
.set("hbase.rootdir", HbaseConfig
.rootdir
);
connection
= ConnectionFactory
.createConnection(conf
, pool
);
admin
= connection
.getAdmin();
} catch (IOException e
) {
LOGGER
.error("HbaseUtils实例初始化失败!错误信息为:" + e
.getMessage(), e
);
}
}
}
public static synchronized HbaseUtils
getInstance() {
if (instance
== null
) {
instance
= new HbaseUtils();
}
return instance
;
}
public static Connection
getConn() {
return connection
;
}
public int countByFilter(String tableName
, FilterList filterList
) {
if (StringUtils
.isBlank(tableName
)) {
return 0;
}
int count
= 0;
try {
Table table
= connection
.getTable(TableName
.valueOf(tableName
));
Scan scan
= new Scan();
scan
.setFilter(filterList
);
ResultScanner scanner
= table
.getScanner(scan
);
for (Result result
: scanner
) {
if (result
.size() > 0) {
count
++;
}
}
return count
;
} catch (IOException e
) {
LOGGER
.error("query hbase error.tableName={}", tableName
, e
);
}
return count
;
}
public List
<Map
<String, String>> listByFilter(String tableName
, FilterList filterList
) {
if (StringUtils
.isBlank(tableName
)) {
return Collections
.emptyList();
}
try {
Table table
= connection
.getTable(TableName
.valueOf(tableName
));
Scan scan
= new Scan();
scan
.setFilter(filterList
);
ResultScanner scanner
= table
.getScanner(scan
);
List
<Map
<String, String>> list
= new ArrayList<>();
for (Result result
: scanner
) {
Map
<String, String> obj
= new HashMap<>();
for (Cell cell
: result
.listCells()) {
obj
.put(Bytes
.toString(CellUtil
.cloneQualifier(cell
)),
Bytes
.toString(CellUtil
.cloneValue(cell
)).trim());
}
list
.add(obj
);
}
return list
;
} catch (IOException e
) {
LOGGER
.error("query hbase error.tableName={}", tableName
, e
);
}
return Collections
.emptyList();
}
public static SingleColumnValueFilter
builEqualValueFilter(String column
, String value
) {
SingleColumnValueFilter singleColumnValueFilter
= new SingleColumnValueFilter(
Bytes
.toBytes(HbaseConfig
.COLUMN_FAMILY
), Bytes
.toBytes(column
), CompareFilter
.CompareOp
.EQUAL
,
Bytes
.toBytes(value
));
singleColumnValueFilter
.setFilterIfMissing(true);
return singleColumnValueFilter
;
}
}
五、单元测试
@Test
public void testHBase1() {
try {
Admin admin
= HbaseUtils
.getInstance().getConn().getAdmin();
TableName
[] tableNames
= admin
.listTableNames();
for (TableName tableName
: tableNames
) {
System
.out
.println(tableName
.getNameAsString());
}
} catch (Exception e
) {
e
.printStackTrace();
}
}
@Test
public void testHBase5() throws IOException
{
Admin admin
= HbaseUtils
.getInstance().getConn().getAdmin();
HTableDescriptor hTableDescriptor
= new
HTableDescriptor(TableName
.valueOf("T_XP_TEST"));
hTableDescriptor
.addFamily(new HColumnDescriptor(Bytes
.toBytes("base")));
admin
.createTable(hTableDescriptor
);
System
.out
.println("表是否建成功?=" +
admin
.tableExists(TableName
.valueOf("T_XP_TEST")));
}
@Test
public void testHBaseAdd() throws IOException
{
Table table
=
HbaseUtils
.getInstance().getConn().getTable(TableName
.valueOf("T_XP_TEST"));
Put put
= new Put(Bytes
.toBytes("r1"));
put
.addColumn(Bytes
.toBytes("base"), Bytes
.toBytes("name"), Bytes
.toBytes("xp"));
put
.addColumn(Bytes
.toBytes("base"), Bytes
.toBytes("age"), Bytes
.toBytes("18"));
put
.addColumn(Bytes
.toBytes("base"), Bytes
.toBytes("gender"), Bytes
.toBytes("1"));
put
.addColumn(Bytes
.toBytes("base"), Bytes
.toBytes("phone"),
Bytes
.toBytes("13262831974"));
put
.addColumn(Bytes
.toBytes("base"), Bytes
.toBytes("team"), Bytes
.toBytes("湖人"));
table
.put(put
);
}
@Test
public void testHBaseAll() throws IOException
{
Table table
=
HbaseUtils
.getInstance().getConn().getTable(TableName
.valueOf("T_XP_TEST"));
ResultScanner scanner
= table
.getScanner(new Scan());
List
<Map
<String, String>> list
= new ArrayList<>();
for (Result result
: scanner
) {
Map
<String, String> obj
= new HashMap<>();
for (Cell cell
: result
.listCells()) {
obj
.put(Bytes
.toString(CellUtil
.cloneQualifier(cell
)),
Bytes
.toString(CellUtil
.cloneValue(cell
)));
}
list
.add(obj
);
}
System
.out
.println(JSON
.toJSONString(list
));
}
@Test
public void testHBaseFilter() throws IOException
{
Table table
=
HbaseUtils
.getInstance().getConn().getTable(TableName
.valueOf("T_XP_TEST"));
Scan scan
= new Scan();
scan
.setFilter(new SingleColumnValueFilter(Bytes
.toBytes("base"),
Bytes
.toBytes("team"),
CompareFilter
.CompareOp
.EQUAL
, Bytes
.toBytes("湖人")));
ResultScanner scanner
= table
.getScanner(scan
);
List
<Map
<String, String>> list
= new ArrayList<>();
for (Result result
: scanner
) {
Map
<String, String> obj
= new HashMap<>();
for (Cell cell
: result
.listCells()) {
obj
.put(Bytes
.toString(CellUtil
.cloneQualifier(cell
)),
Bytes
.toString(CellUtil
.cloneValue(cell
)));
}
list
.add(obj
);
}
System
.out
.println(JSON
.toJSONString(list
));
}