目录
集成kerberos的kudu 访问 kudu Api (java)kudu Impala JDBC (java)kudu Spark (scala)代码如下:
public class KuduKerberosAuth { /** * 初始化访问Kerberos访问 * @param debug 是否启用Kerberos的Debug模式 */ public static void initKerberosENV(Boolean debug) { try { System.setProperty("java.security.krb5.conf","D:\\cdh\\kudu\\src\\main\\kerberos\\krb5.conf"); // System.setProperty("java.security.krb5.conf","/lvm/data3/zhc/krb5.conf"); System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); if (debug){ System.setProperty("sun.security.krb5.debug", "true"); } UserGroupInformation.loginUserFromKeytab("gree1@GREE.IO", "D:\\cdh\\kudu\\src\\main\\kerberos\\gree1.keytab"); // UserGroupInformation.loginUserFromKeytab("gree1@GREE.IO", "/lvm/data3/zhc/gree1.keytab"); System.out.println(UserGroupInformation.getCurrentUser()); } catch(Exception e) { e.printStackTrace(); } } }文件放到resources 文件夹里面要放到根目录下面
1.core-site.xml
代码如下:
获取kudu客户端
import org.apache.hadoop.security.UserGroupInformation; import org.apache.kudu.client.KuduClient; import java.io.IOException; import java.security.PrivilegedExceptionAction; public class GetKuduClient { private static final String KUDU_MASTERS = System.getProperty("kuduMasters", "cdh-master01:7051,cdh-master02:7051,cdh-master03:7051"); public static KuduClient getKuduClient() { KuduClient client = null; try { client = UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<KuduClient>() { @Override public KuduClient run() throws Exception { return new KuduClient.KuduClientBuilder(KUDU_MASTERS).build(); } } ); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return client; } }main函数
import kudutest.KuduApiTest; import kudutest.client.GetKuduClient; import org.apache.kudu.client.KuduClient; import kudujavautil.KuduKerberosAuth; public class KuduApiMain { public static void main(String[] args) { /* * 通过kerberos 认证 * */ KuduKerberosAuth.initKerberosENV(false); /* * 获取kudu客户端 * */ KuduClient client= GetKuduClient.getKuduClient(); /* * 查询表中字段 * */ KuduApiTest.getTableData(client,"kudutest","zhckudutest1","id"); /* * 创建一个表名 * */ // KuduApiTest.createTableData(client,"zhckudutest1"); /* *列出kudu下的所有表 * */ // KuduApiTest.tableListShow(client); /* * 向指定的kudu表中upsert数据 * */ // KuduApiTest.upsertTableData(client,"zhckudutest1",10); /* * 删除kudu表 * */ // KuduApiTest.dropTableData(client,"zhckudutest"); } }操作kudu表
import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.*; import java.util.ArrayList; import java.util.List; public class KuduApiTest { /** * 获取kudu表里面的数据 */ public static void getTableData(KuduClient client, String database, String table, String columns) { try { KuduTable kudutable = client.openTable( table); KuduScanner kuduScanner = client.newScannerBuilder(kudutable).build(); while (kuduScanner.hasMoreRows()) { RowResultIterator rowResultIterator = kuduScanner.nextRows(); while (rowResultIterator.hasNext()) { RowResult rowResult = rowResultIterator.next(); System.out.println(rowResult.getString(columns)); } } try { client.close(); } catch (KuduException e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 向kudu表里面插入数据 */ public static void upsertTableData(KuduClient client, String tableName, int numRows ) { try { KuduTable kuduTable = client.openTable(tableName); KuduSession kuduSession = client.newSession(); //设置Kudu提交数据方式,这里设置的为手动刷新,默认为自动提交 kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); for(int i =0; i < numRows; i++) { String userInfo_str = "abcdef,ghigk"; Insert upsert = kuduTable.newInsert(); PartialRow row = upsert.getRow(); String[] userInfo = userInfo_str.split(","); if(userInfo.length == 2) { row.addString("id", userInfo[0]); row.addString("name", userInfo[1]); } kuduSession.apply(upsert); } kuduSession.flush(); kuduSession.close(); } catch (KuduException e) { e.printStackTrace(); } } /** * 创建一个kudu 表 */ public static void createTableData(KuduClient client, String tableName) { List<ColumnSchema> columns = new ArrayList<>(); //在添加列时可以指定每一列的压缩格式 columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).key(true). compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING). compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build()); Schema schema = new Schema(columns); CreateTableOptions createTableOptions = new CreateTableOptions(); List<String> hashKeys = new ArrayList<>(); hashKeys.add("id"); int numBuckets = 8; createTableOptions.addHashPartitions(hashKeys, numBuckets); try { if (!client.tableExists(tableName)) { client.createTable(tableName, schema, createTableOptions); } System.out.println("成功创建Kudu表:" + tableName); } catch (KuduException e) { e.printStackTrace(); } } /** * 列出Kudu下所有的表 * @param client */ public static void tableListShow(KuduClient client) { try { ListTablesResponse listTablesResponse = client.getTablesList(); List<String> tblist = listTablesResponse.getTablesList(); for(String tableName : tblist) { System.out.println(tableName); } } catch (KuduException e) { e.printStackTrace(); } } /** * 删除表 * */ public static void dropTableData(KuduClient client, String tableName) { try { client.deleteTable(tableName); } catch (KuduException e) { e.printStackTrace(); } } } package kudutest; import org.apache.hadoop.security.UserGroupInformation; import org.apache.kudu.client.*; import java.io.IOException; import java.security.PrivilegedExceptionAction; public class KuduClientTest { private static final String KUDU_MASTERS = System.getProperty("kuduMasters", "cdh-master01:7051,cdh-master02:7051,cdh-master03:7051"); // private static final String KUDU_MASTERS = System.getProperty("kuduMasters", "sns-cdh-namenode2:7051,sns-cdh-namenode1:7051,sns-cdh-datanode1:7051"); /** * 获取kudu表里面的数据 * */ static void getTableData(){ System.out.println("-----------------------------------------------"); System.out.println("Will try to connect to Kudu master(s) at " + KUDU_MASTERS); System.out.println("-----------------------------------------------"); try { KuduClient client = UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<KuduClient>() { @Override public KuduClient run() throws Exception { return new KuduClient.KuduClientBuilder(KUDU_MASTERS).build(); } } ); KuduTable table = client.openTable("impala::kudutest.kudu_table"); // KuduTable table = client.openTable("impala::test.test"); KuduScanner kuduScanner = client.newScannerBuilder(table).build(); while (kuduScanner.hasMoreRows()) { RowResultIterator rowResultIterator = kuduScanner.nextRows(); while (rowResultIterator.hasNext()) { RowResult rowResult = rowResultIterator.next(); System.out.println(rowResult.getString("name")); // System.out.println(rowResult.getString("t1")); } } try { client.close(); } catch (KuduException e) { e.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }LDAP验证代码如下:
public class GetImpalaClient { //驱动 private static String diiver = "com.cloudera.impala.jdbc41.Driver"; //LDAP 认证 private static String ldap_URL = "jdbc:impala://cdh-master03:25004/default;AuthMech=3;SSL=1;SSLTrustStore=/lvm/data3/zhc/cm-auto-global_truststore.jks"; private static String user="gree1"; private static String password="000000"; //kerberos 认证 private static String kerberos_URL = "jdbc:impala://cdh-master03:25004/default;AuthMech=1;KrbRealm=GREE.IO;KrbHostFQDN=cdh-master03;KrbServiceName=impala;SSL=1;SSLTrustStore=D:/cdh/kudu/src/main/ssl/cm-auto-global_truststore.jks"; //LADP认证 public static Connection getKuduClientLDAP() throws ClassNotFoundException, SQLException { Class.forName(diiver); Connection connection= DriverManager.getConnection(ldap_URL,user,password); System.out.println("这是LDAP认证"); return connection; } }kerberos验证代码如下:
//kerberos认证 public static Connection getKuduClientKerberos() throws IOException { //kerberos 认证 KuduKerberosAuth.initKerberosENV(false); Connection client = null; try { client = (Connection) UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { Class.forName(diiver); return DriverManager.getConnection(kerberos_URL); } } ); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("这是KERBEROS认证"); return client; }文件放到resources 文件夹里面要放到根目录下面
1.core-site.xml
文件放到resources 文件夹里面要放到根目录下面
1.core-site.xml
访问代码如下:
import org.apache.spark.sql.SparkSession import kuduscalautil.{GetKuduConnect, KerberosAuth} object KuduSparkTest { //main def main(args: Array[String]): Unit = { new KerberosAuth().kerberosAuth(false); val spark = SparkSession.builder.appName("zhc_SparkTest").master("local[*]").getOrCreate(); val kuduContext = new GetKuduConnect().getKuduContext(spark.sqlContext.sparkContext); //创建表 new KuduSparkFunction().createTable(kuduContext,spark,"impala_kudu.zhcTestKudu",false); } } import java.io.IOException import java.security.PrivilegedExceptionAction import org.apache.hadoop.security.UserGroupInformation import org.apache.kudu.spark.kudu.KuduContext import org.apache.spark.SparkContext class GetKuduConnect { val kuduMaster: String = System.getProperty("kuduMasters", "cdh-master01:7051,cdh-master02:7051,cdh-master03:7051"); def getKuduContext(sparkcontext:SparkContext): KuduContext = { var kuduContext:KuduContext = null try kuduContext = UserGroupInformation.getLoginUser.doAs(new PrivilegedExceptionAction[KuduContext]() { @throws[Exception] override def run: KuduContext =new KuduContext(kuduMaster, sparkcontext) }) catch { case e: IOException => e.printStackTrace() case e: InterruptedException => e.printStackTrace() } return kuduContext } } import org.apache.kudu.client import org.apache.kudu.spark.kudu.KuduContext import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ class KuduSparkFunction { val tableNumReplicas: Int = Integer.getInteger("tableNumReplicas", 1) val nameCol = "name"; val idCol = "id"; val logger = LoggerFactory.getLogger(KuduSparkTest.getClass) /* * 创建表 * */ def createTable(kuduContext:KuduContext,spark:SparkSession,tableName:String,delectTable:Boolean):Unit={ val schema = StructType( List( StructField(idCol, StringType, false), StructField(nameCol, StringType, false) ) ) //创建表之后是否删除表 var tableIsDelete = delectTable; try { if (kuduContext.tableExists(tableName)) { throw new RuntimeException(tableName + ":table already exists") } println(s"开始创建表$tableName") kuduContext.createTable(tableName, schema, Seq(idCol), new client.CreateTableOptions().addHashPartitions(List(idCol).asJava, 3).setNumReplicas(tableNumReplicas)) println("创建成功") } catch { case unknown: Throwable => logger.error(s"got an exception" + unknown) } finally { if (tableIsDelete) { logger.info(s"deleting table '$tableName'") kuduContext.deleteTable(tableName) } logger.info(s"closing dowm the session") spark.close() } } }转载于:https://www.cnblogs.com/HarSenZhao/p/11510112.html
