集成kerberos的kudu(1.10.0) 访问

mac2022-06-30  14

目录

集成kerberos的kudu 访问 kudu Api (java)kudu Impala JDBC (java)kudu Spark  (scala)

集成kerberos的kudu 访问

kudu Api (java)

1. 首先需要进行kerberos的验证(需要将相应用户的keytab文件引入本地)

代码如下:

public class KuduKerberosAuth { /** * 初始化访问Kerberos访问 * @param debug 是否启用Kerberos的Debug模式 */ public static void initKerberosENV(Boolean debug) { try { System.setProperty("java.security.krb5.conf","D:\\cdh\\kudu\\src\\main\\kerberos\\krb5.conf"); // System.setProperty("java.security.krb5.conf","/lvm/data3/zhc/krb5.conf"); System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); if (debug){ System.setProperty("sun.security.krb5.debug", "true"); } UserGroupInformation.loginUserFromKeytab("gree1@GREE.IO", "D:\\cdh\\kudu\\src\\main\\kerberos\\gree1.keytab"); // UserGroupInformation.loginUserFromKeytab("gree1@GREE.IO", "/lvm/data3/zhc/gree1.keytab"); System.out.println(UserGroupInformation.getCurrentUser()); } catch(Exception e) { e.printStackTrace(); } } }

2.Maven 依赖

<properties> <kudu-version>1.10.0-cdh6.3.0</kudu-version> </properties> <dependencies> <!--认证依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.0.0-cdh6.3.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0-cdh6.3.0</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>${kudu-version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> </dependency> </dependencies>

3.引入hadoop 配置文件

文件放到resources 文件夹里面要放到根目录下面

1.core-site.xml

4. 进行访问

代码如下:

获取kudu客户端

import org.apache.hadoop.security.UserGroupInformation; import org.apache.kudu.client.KuduClient; import java.io.IOException; import java.security.PrivilegedExceptionAction; public class GetKuduClient { private static final String KUDU_MASTERS = System.getProperty("kuduMasters", "cdh-master01:7051,cdh-master02:7051,cdh-master03:7051"); public static KuduClient getKuduClient() { KuduClient client = null; try { client = UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<KuduClient>() { @Override public KuduClient run() throws Exception { return new KuduClient.KuduClientBuilder(KUDU_MASTERS).build(); } } ); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return client; } }

main函数

import kudutest.KuduApiTest; import kudutest.client.GetKuduClient; import org.apache.kudu.client.KuduClient; import kudujavautil.KuduKerberosAuth; public class KuduApiMain { public static void main(String[] args) { /* * 通过kerberos 认证 * */ KuduKerberosAuth.initKerberosENV(false); /* * 获取kudu客户端 * */ KuduClient client= GetKuduClient.getKuduClient(); /* * 查询表中字段 * */ KuduApiTest.getTableData(client,"kudutest","zhckudutest1","id"); /* * 创建一个表名 * */ // KuduApiTest.createTableData(client,"zhckudutest1"); /* *列出kudu下的所有表 * */ // KuduApiTest.tableListShow(client); /* * 向指定的kudu表中upsert数据 * */ // KuduApiTest.upsertTableData(client,"zhckudutest1",10); /* * 删除kudu表 * */ // KuduApiTest.dropTableData(client,"zhckudutest"); } }

操作kudu表

import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.*; import java.util.ArrayList; import java.util.List; public class KuduApiTest { /** * 获取kudu表里面的数据 */ public static void getTableData(KuduClient client, String database, String table, String columns) { try { KuduTable kudutable = client.openTable( table); KuduScanner kuduScanner = client.newScannerBuilder(kudutable).build(); while (kuduScanner.hasMoreRows()) { RowResultIterator rowResultIterator = kuduScanner.nextRows(); while (rowResultIterator.hasNext()) { RowResult rowResult = rowResultIterator.next(); System.out.println(rowResult.getString(columns)); } } try { client.close(); } catch (KuduException e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 向kudu表里面插入数据 */ public static void upsertTableData(KuduClient client, String tableName, int numRows ) { try { KuduTable kuduTable = client.openTable(tableName); KuduSession kuduSession = client.newSession(); //设置Kudu提交数据方式,这里设置的为手动刷新,默认为自动提交 kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); for(int i =0; i < numRows; i++) { String userInfo_str = "abcdef,ghigk"; Insert upsert = kuduTable.newInsert(); PartialRow row = upsert.getRow(); String[] userInfo = userInfo_str.split(","); if(userInfo.length == 2) { row.addString("id", userInfo[0]); row.addString("name", userInfo[1]); } kuduSession.apply(upsert); } kuduSession.flush(); kuduSession.close(); } catch (KuduException e) { e.printStackTrace(); } } /** * 创建一个kudu 表 */ public static void createTableData(KuduClient client, String tableName) { List<ColumnSchema> columns = new ArrayList<>(); //在添加列时可以指定每一列的压缩格式 columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).key(true). compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING). compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build()); Schema schema = new Schema(columns); CreateTableOptions createTableOptions = new CreateTableOptions(); List<String> hashKeys = new ArrayList<>(); hashKeys.add("id"); int numBuckets = 8; createTableOptions.addHashPartitions(hashKeys, numBuckets); try { if (!client.tableExists(tableName)) { client.createTable(tableName, schema, createTableOptions); } System.out.println("成功创建Kudu表:" + tableName); } catch (KuduException e) { e.printStackTrace(); } } /** * 列出Kudu下所有的表 * @param client */ public static void tableListShow(KuduClient client) { try { ListTablesResponse listTablesResponse = client.getTablesList(); List<String> tblist = listTablesResponse.getTablesList(); for(String tableName : tblist) { System.out.println(tableName); } } catch (KuduException e) { e.printStackTrace(); } } /** * 删除表 * */ public static void dropTableData(KuduClient client, String tableName) { try { client.deleteTable(tableName); } catch (KuduException e) { e.printStackTrace(); } } } package kudutest; import org.apache.hadoop.security.UserGroupInformation; import org.apache.kudu.client.*; import java.io.IOException; import java.security.PrivilegedExceptionAction; public class KuduClientTest { private static final String KUDU_MASTERS = System.getProperty("kuduMasters", "cdh-master01:7051,cdh-master02:7051,cdh-master03:7051"); // private static final String KUDU_MASTERS = System.getProperty("kuduMasters", "sns-cdh-namenode2:7051,sns-cdh-namenode1:7051,sns-cdh-datanode1:7051"); /** * 获取kudu表里面的数据 * */ static void getTableData(){ System.out.println("-----------------------------------------------"); System.out.println("Will try to connect to Kudu master(s) at " + KUDU_MASTERS); System.out.println("-----------------------------------------------"); try { KuduClient client = UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<KuduClient>() { @Override public KuduClient run() throws Exception { return new KuduClient.KuduClientBuilder(KUDU_MASTERS).build(); } } ); KuduTable table = client.openTable("impala::kudutest.kudu_table"); // KuduTable table = client.openTable("impala::test.test"); KuduScanner kuduScanner = client.newScannerBuilder(table).build(); while (kuduScanner.hasMoreRows()) { RowResultIterator rowResultIterator = kuduScanner.nextRows(); while (rowResultIterator.hasNext()) { RowResult rowResult = rowResultIterator.next(); System.out.println(rowResult.getString("name")); // System.out.println(rowResult.getString("t1")); } } try { client.close(); } catch (KuduException e) { e.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }

kudu Impala JDBC (java)

1.kerberos验证或者LDAP 验证(需要将ssl证书文件引入本地)

LDAP验证代码如下:

public class GetImpalaClient { //驱动 private static String diiver = "com.cloudera.impala.jdbc41.Driver"; //LDAP 认证 private static String ldap_URL = "jdbc:impala://cdh-master03:25004/default;AuthMech=3;SSL=1;SSLTrustStore=/lvm/data3/zhc/cm-auto-global_truststore.jks"; private static String user="gree1"; private static String password="000000"; //kerberos 认证 private static String kerberos_URL = "jdbc:impala://cdh-master03:25004/default;AuthMech=1;KrbRealm=GREE.IO;KrbHostFQDN=cdh-master03;KrbServiceName=impala;SSL=1;SSLTrustStore=D:/cdh/kudu/src/main/ssl/cm-auto-global_truststore.jks"; //LADP认证 public static Connection getKuduClientLDAP() throws ClassNotFoundException, SQLException { Class.forName(diiver); Connection connection= DriverManager.getConnection(ldap_URL,user,password); System.out.println("这是LDAP认证"); return connection; } }

kerberos验证代码如下:

//kerberos认证 public static Connection getKuduClientKerberos() throws IOException { //kerberos 认证 KuduKerberosAuth.initKerberosENV(false); Connection client = null; try { client = (Connection) UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { Class.forName(diiver); return DriverManager.getConnection(kerberos_URL); } } ); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("这是KERBEROS认证"); return client; }

2.maven 依赖

<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.0.0-cdh6.3.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0-cdh6.3.0</version> </dependency> <dependency> <groupId>com.cloudera.impala.jdbc</groupId> <artifactId>ImpalaJDBC41</artifactId> <version>2.5.43</version> </dependency> <dependency> <groupId>com.cloudera.impala.jdbc</groupId> <artifactId>hive_metastore</artifactId> <version>2.5.43</version> </dependency> <dependency> <groupId>com.cloudera.impala.jdbc</groupId> <artifactId>hive_service</artifactId> <version>2.5.43</version> </dependency> <dependency> <groupId>com.cloudera.impala.jdbc</groupId> <artifactId>ql</artifactId> <version>2.5.43</version> </dependency> <dependency> <groupId>com.cloudera.impala.jdbc</groupId> <artifactId>TCLIServiceClient</artifactId> <version>2.5.43</version> </dependency> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.9.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> </dependency> </dependencies>

3.hadoop文件

文件放到resources 文件夹里面要放到根目录下面

1.core-site.xml

4. 代码访问

//main函数 public static void main(String[] args) throws SQLException, ClassNotFoundException, IOException { /* * 获取impala connection(kerberos 认证) * */ // Connection kerberosConn = GetImpalaClient.getKuduClientKerberos(); /* * 获取impala connection(LDAP 认证) * */ Connection ldapConn = GetImpalaClient.getKuduClientLDAP(); /* * 通过impala 获取kudu 表里面的数据(kerberos 认证) * */ // KuduImpalaTest.getKuduData(kerberosConn, "kudutest", "zhckudutest1"); /* * 通过impala 获取kudu 表里面的数据(LDAP 认证) * */ KuduImpalaTest.getKuduData(ldapConn, "kudutest", "zhckudutest1"); } //获取kudu表里面的数据 public static void getKuduData( Connection connection,String database,String tableName) throws SQLException, ClassNotFoundException, IOException { PreparedStatement ps=null; ResultSet rs=null; try { ps = connection.prepareStatement("select * from "+database+"."+tableName); rs = ps.executeQuery(); while (rs.next()) { System.out.println(rs.getString(1) + " ****** " + rs.getString(2)); } try{ connection.close(); }catch (Exception e){ e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } }

kudu Spark  (scala)

1.kerberos 认证(需要将相应用户的keryab文件引入本地)

def kerberosAuth(debug: Boolean): Unit = { try { System.setProperty("java.security.krb5.conf", "D:\\cdh\\kudu\\src\\main\\kerberos\\krb5.conf") // System.setProperty("java.security.krb5.conf","/lvm/data3/zhc/krb5.conf"); System.setProperty("javax.security.auth.useSubjectCredsOnly", "false") if (debug) System.setProperty("sun.security.krb5.debug", "true") UserGroupInformation.loginUserFromKeytab("gree1@GREE.IO", "D:\\cdh\\kudu\\src\\main\\kerberos\\gree1.keytab") // UserGroupInformation.loginUserFromKeytab("gree1@GREE.IO", "/lvm/data3/zhc/gree1.keytab"); System.out.println(UserGroupInformation.getCurrentUser) } catch { case e: Exception => e.printStackTrace() } }

2.maven依赖

<properties> <kudu-version>1.10.0-cdh6.3.0</kudu-version> </properties> <dependencies> <!--认证依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.0.0-cdh6.3.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0-cdh6.3.0</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-spark2_2.11</artifactId> <version>${kudu-version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.0-cdh6.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.0-cdh6.3.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> </dependency> </dependencies>

3.hadoop文件

文件放到resources 文件夹里面要放到根目录下面

1.core-site.xml

4.代码访问(scala)

访问代码如下:

import org.apache.spark.sql.SparkSession import kuduscalautil.{GetKuduConnect, KerberosAuth} object KuduSparkTest { //main def main(args: Array[String]): Unit = { new KerberosAuth().kerberosAuth(false); val spark = SparkSession.builder.appName("zhc_SparkTest").master("local[*]").getOrCreate(); val kuduContext = new GetKuduConnect().getKuduContext(spark.sqlContext.sparkContext); //创建表 new KuduSparkFunction().createTable(kuduContext,spark,"impala_kudu.zhcTestKudu",false); } } import java.io.IOException import java.security.PrivilegedExceptionAction import org.apache.hadoop.security.UserGroupInformation import org.apache.kudu.spark.kudu.KuduContext import org.apache.spark.SparkContext class GetKuduConnect { val kuduMaster: String = System.getProperty("kuduMasters", "cdh-master01:7051,cdh-master02:7051,cdh-master03:7051"); def getKuduContext(sparkcontext:SparkContext): KuduContext = { var kuduContext:KuduContext = null try kuduContext = UserGroupInformation.getLoginUser.doAs(new PrivilegedExceptionAction[KuduContext]() { @throws[Exception] override def run: KuduContext =new KuduContext(kuduMaster, sparkcontext) }) catch { case e: IOException => e.printStackTrace() case e: InterruptedException => e.printStackTrace() } return kuduContext } } import org.apache.kudu.client import org.apache.kudu.spark.kudu.KuduContext import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ class KuduSparkFunction { val tableNumReplicas: Int = Integer.getInteger("tableNumReplicas", 1) val nameCol = "name"; val idCol = "id"; val logger = LoggerFactory.getLogger(KuduSparkTest.getClass) /* * 创建表 * */ def createTable(kuduContext:KuduContext,spark:SparkSession,tableName:String,delectTable:Boolean):Unit={ val schema = StructType( List( StructField(idCol, StringType, false), StructField(nameCol, StringType, false) ) ) //创建表之后是否删除表 var tableIsDelete = delectTable; try { if (kuduContext.tableExists(tableName)) { throw new RuntimeException(tableName + ":table already exists") } println(s"开始创建表$tableName") kuduContext.createTable(tableName, schema, Seq(idCol), new client.CreateTableOptions().addHashPartitions(List(idCol).asJava, 3).setNumReplicas(tableNumReplicas)) println("创建成功") } catch { case unknown: Throwable => logger.error(s"got an exception" + unknown) } finally { if (tableIsDelete) { logger.info(s"deleting table '$tableName'") kuduContext.deleteTable(tableName) } logger.info(s"closing dowm the session") spark.close() } } }

转载于:https://www.cnblogs.com/HarSenZhao/p/11510112.html

最新回复(0)