Hbase--协处理器coprocessors

mac2025-10-11  9

Hbase–协处理器coprocessors

文章目录

Hbase--协处理器coprocessors一:协处理器1.什么是协处理器?2.协处理器的类型3.Observer1)类型2) 适用场景3)执行流程 4.Endpoint1)介绍2)应用场景以及分析 5.总结 二:协处理器的实践应用1.Endpoint的应用准备工作: 具体步骤:第一步:使用Protobuf生成序列化类 2.Observer 的应用创建二级索引具体步骤:1.编写代码:2.打成 jar 包(cppp.jar),上传到 hdfs 中的 hbasecp 目录下3. 建 hbase 表,请按以下顺序操作4.校验

一:协处理器

1.什么是协处理器?

Hbase在0.92之后引入了协处理器(coprocessors),有效的解决了Hbase不能建立二级索引,难以进行聚合操作的问题,coprocessorss可以使用户将部分逻辑在数据存放端即 HBase RegionServer 服务端进行计算,也即允许用户在 RegionServer 运行用户自定义的代码。

2.协处理器的类型

按照使用范围:协处理器可分系统协处理器和表协处理器。

按照功能:协处理器又可分为观察者(Observer) 和 终端 (Endpoint) 两类。

3.Observer

1)类型

目前 HBase 内置实现的 Observer 主要有以下几个:

WALObserver:提供控制 WAL 的钩子函数;MasterObserver:可以被用作管理或 DDL 类型的操作,这些是集群级的事件;RegionObserver:用户可以用这种处理器处理数据修改事件,它们与表的 Region 联系紧密;BulkLoadObserver:进行 BulkLoad 的操作之前或之后会触发这个钩子函数;RegionServerObserver :RegionServer 上发生的一些操作可以触发一些这个钩子函数,这个是 RegionServer 级别的事件;EndpointObserver:每当用户调用 Endpoint 之前或之后会触发这个钩子,主要提供了一些回调方法。
2) 适用场景
权限校验:在执行Get或Put操作之前,可以使用preGet或prePut方法检查权限;完整性约束: HBase 不支持关系型数据库中的外键功能,可以通过触发器在插入或者删除数据的时候,对关联的数据进行检查;二级索引:使用钩子关联行修改操作来维护二级索引。
3)执行流程

以RegionObserver为例:

客户端发出 put 请求该请求被分派给合适的 RegionServer 和 RegionCoprocessorHost 拦截该请求,然后在该表的每个 RegionObserver 上调用 prePut()prePut() 处理后,在 Region 执行 Put 操作Region 产生的结果再次被 CoprocessorHost 拦截,调用 postPut()最终结果被返回给客户端

4.Endpoint

1)介绍

Endpoint 协处理器类似传统数据库中的存储过程,客户端可以调用这些 Endpoint 协处理器执行一段 Server 端代码,并将 Server 端代码的结果返回给客户端进一步处理。

2)应用场景以及分析
minmaxavgsumdistinctgroup by

最常见的用法就是进行聚集操作。如果没有协处理器,当用户需要找出一张表中的最大数据,即 max 聚合操作,就必须进行全表扫描,在客户端代码内遍历扫描结果,并执行求最大值的操作。这样的方法无法利用底层集群的并发能力,而将所有计算都集中到 Client 端统一执行,势必效率低下。利用 Coprocessor,用户可以将求最大值的代码部署到 HBase Server 端,HBase 将利用底层 cluster 的多个节点并发执行求最大值的操作。即在每个 Region 范围内执行求最 大值的代码,将每个 Region 的最大值在 Region Server 端计算出,仅仅将该 max 值返回给客户端。在客户端进一步将多个 Region 的最大值进一步处理而找到其中的最大值。这样整体 的执行效率就会提高很多

工作原理:

5.总结

Observer 允许集群在正常的客户端操作过程中可以有不同的行为表现

Endpoint 允许扩展集群的能力,对客户端应用开放新的运算命令


observer 类似于 RDBMS 中的触发器,类似于动态代理增强方法,主要在服务端工作endpoint 类似于 RDBMS 中的存储过程,主要在服务端工作
observer 可以实现权限管理、优先级设置、监控、ddl 控制、二级索引等功能endpoint 可以实现 min、max、avg、sum、distinct、group by 等功能

二:协处理器的实践应用

1.Endpoint的应用

Hbase中自带一个count命令用来计数,但是这个命令是单线程的,运行在Client,没有利用到集群的资源,效率低下。现在利用Endpoint自定义一个count计数器,利用服务端集群的资源,运算后得到的结果返回到客户端。终端是动态RPC插件的接口,它的实现代码被安装在服务器端,从而能够通过HBase RPC唤醒。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个终端,它们的实现代码会被目标region远程执行,结果会返回到终端。用户可以结合使用这些强大的插件接口,为HBase添加全新的特性。

准备工作:

1、开发endpoint需要用到google protobuf,protobuf用于生成RPC框架代码,protpbuf版本需要和hbase对应,版本不一致可能会存在问题,具体版本可查看hbase安装目录下的lib中protobuf的版本。我这里的protobuf是2.5.0,从网上下载protoc-2.5.0-win32.zip,解压后可得到protoc.exe,将protoc.exe配置到环境变量中备用。 protpbuf下载链接 /protpbuf下载链接

创建一个maven工程在pom.xml中添加如下依赖

<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <jdk.version>1.7</jdk.version> <hbase.version>1.2.5</hbase.version> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> </dependencies>

创建一个Endpoint的基本流程可以归纳为:

创建一个通信协议:准备一个proto文件,然后使用protoc工具来生成协议类文件。这个文件需要在服务端及客户端存 在。创建一个Service类,实现具体的业务逻辑创建表时指定使用这个EndPoint,或者是全局配置。创建一个Client类,调用这个RPC方法。

具体步骤:

第一步:使用Protobuf生成序列化类

与 Observer 类型不同的是,Endpoint 协处理器需要与服务区直接通信,服务端是对于 Protobuf Service 的实现,所以两者之间会有一个基于 protocol 的 RPC 接口,客户端和服务端都需要进行基于接口的代码逻辑实现。

1)先准备一个 proto 文件 Count.proto,使用 ProtoBuf 的 message 做为消息传递的格式,使用 Rpc 做为传输协议,一般会定义三个 ProtoBuf 域,用于请求、响应、和业务实现:

syntax = "proto2"; option java_package = "me.w1992wishes.hbase.inaction.coprocessors"; option java_outer_classname = "CountCoprocessor"; option java_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED; /*具体的消息 *每个message域对应一个内部类,该内部类还包含一个Builder内部类 *域内字段会生成对应的 setter和getter方法 *使用 Builder 内部类来对字段赋值 **/ message CountRequest { required string startKey = 1; required string endKey = 2; } message CountResponse { required int64 count = 1 [default = 0]; } /*提供服务的类 *该类没有Builder内部类 */ service CountService { rpc count(CountRequest) returns (CountResponse); }

2)在命令行执行命令生成 Java 类:

C:\Users\11295>protoc --java_out=D:\Count.proto

3)Endpoint Coprocessor服务端实现:

/** * 说明:hbase 协处理器 Endpooint 的服务端代码 * 功能:继承通过 protocol buffer 生成的 rpc 接口,在服务端获取指定列的数据后进行求和操作,最后将结果返回客户端 * * @author w1992wishes 2019/8/1 16:58 */ public class RelationCountEndpoint extends CountCoprocessor.CountService implements RegionCoprocessor { private static final Logger LOG = LoggerFactory.getLogger(RelationCountEndpoint.class); private RegionCoprocessorEnvironment env; @Override public Iterable<Service> getServices() { return Collections.singleton(this); } @Override public void start(CoprocessorEnvironment env) throws IOException { if (env instanceof RegionCoprocessorEnvironment) { this.env = (RegionCoprocessorEnvironment) env; LOG.info("****** {} start. ******", this.getClass().getName()); } else { LOG.warn("****** Must be loaded on a table region .******"); throw new CoprocessorException("Must be loaded on a table region!"); } } @Override public void stop(CoprocessorEnvironment env) throws IOException { LOG.info("****** {} stop. ******", this.getClass().getName()); } @Override public void followedByCount(RpcController controller, CountCoprocessor.CountRequest request, RpcCallback<CountCoprocessor.CountResponse> done) { Scan scan = new Scan(); byte[] startKey = Bytes.toBytes(request.getStartKey()); LOG.info("****** startKey {}. ******", request.getStartKey()); scan.withStartRow(startKey); scan.setFilter(new PrefixFilter(startKey)); scan.addColumn(RELATION_FAM, FROM); scan.readVersions(1); CountCoprocessor.CountResponse response = null; try (InternalScanner scanner = env.getRegion().getScanner(scan)) { List<Cell> results = new ArrayList<>(); boolean hasMore; long sum = 0L; do { // count 个数 hasMore = scanner.next(results); sum += results.size(); // 两次循环之间清空本地结果缓存 results.clear(); // 累加 /*hasMore = scanner.next(results); for (Cell cell : results) { sum = sum + Bytes.toLong(CellUtil.cloneValue(cell)); } results.clear();*/ } while (hasMore); // 设置返回结果 response = CountCoprocessor.CountResponse.newBuilder().setCount(sum).build(); } catch (IOException ioe) { ResponseConverter.setControllerException(controller, ioe); } // 将rpc结果返回给客户端 done.run(response); } }

4)Endpoint Coprocessor 客户端实现:

public long followedByCount(final String userId) throws Throwable { Table followed = conn.getTable(TableName.valueOf(FOLLOWED_TABLE_NAME)); final byte[] startKey = Md5Utils.md5sum(userId); final byte[] endKey = Arrays.copyOf(startKey, startKey.length); endKey[endKey.length - 1]++; final CountCoprocessor.CountRequest request = CountCoprocessor.CountRequest.newBuilder() .setStartKey(Bytes.toString(startKey)) .setEndKey(Bytes.toString(endKey)) .build(); Batch.Call<CountCoprocessor.CountService, Long> callable = countService -> { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<CountCoprocessor.CountResponse> rpcCallback = new BlockingRpcCallback<>(); countService.followedByCount(controller, request, rpcCallback); CountCoprocessor.CountResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } return (response != null && response.getCount() != 0) ? response.getCount() : 0; }; Map<byte[], Long> results = followed.coprocessorService( CountCoprocessor.CountService.class, startKey, endKey, callable); long sum = 0; for (Map.Entry<byte[], Long> e : results.entrySet()) { sum += e.getValue(); } return sum; }

2.Observer 的应用

创建二级索引

row key 在 HBase 中是以 B+ tree 结构化有序存储的,所以 scan 起来会比较效率。单表以 row key 存储索引,column value 存储 id 值或其他数据 ,这就是 Hbase 索引表的结构。 由于 HBase 本身没有二级索引(Secondary Index)机制,基于索引检索数据只能单纯地依靠 RowKey,为了能支持多条件查询,开发者需要将所有可能作为查询条件的字段一一拼接到 RowKey 中,这是 HBase 开发中极为常见的做法 在社交类应用中,经常需要快速检索各用户的关注列表 guanzhu,同时,又需要反向检索各 种户的粉丝列表 fensi,为了实现这个需求,最佳实践是建立两张互为反向的表:

插入一条关注信息时,为了减轻应用端维护反向索引表的负担,可用 Observer 协处理器实现:

具体步骤:

1.编写代码:
package com.ghgj.hbase; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; public class TestCoprocessor extends BaseRegionObserver { static Configuration config = HBaseConfiguration.create(); static HTable table = null; static{ config.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181,hadoop04:2181,hadoop05:2181"); try { table = new HTable(config, "guanzhu"); } catch (Exception e) { e.printStackTrace(); } } @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { // super.prePut(e, put, edit, durability); byte[] row = put.getRow(); Cell cell = put.get("f1".getBytes(), "from".getBytes()).get(0); Put putIndex = new Put(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()); putIndex.addColumn("f1".getBytes(), "from".getBytes(), row); table.put(putIndex); table.close(); } }
2.打成 jar 包(cppp.jar),上传到 hdfs 中的 hbasecp 目录下
[root@hadoop01 soft]# hadoop fs -put cppp.jar /hbasecp
3. 建 hbase 表,请按以下顺序操作
hbase(main):036:0> create 'guanzhu','f1' hbase(main):036:0> create 'fensi','f1' hbase(main):036:0> disable 'fensi' hbase(main):036:0> alter 'fensi',METHOD => 'table_att','coprocessor' => 'hdfs://myha01/hbasecp/cpp.jar|com.ghgj.hbase.TestCoprocessor|1001|'

# 理解 coprocessor 的四个参数,分别用’|'隔开的

1、 你的协处理器 jar 包所在 hdfs 上的路径

2、 协处理器类全限定名

3、 协处理器加载顺序

4、 传参

hbase(main):036:0> enable 'fensi'
4.校验
testPut("fensi","c","f1","from","b");
最新回复(0)