Oozie的action更具执行方式分两类,同步和异步。同步是指在Oozie服务器上执行同步动作,并阻塞执行线程,直到它完成(这种模式会影响Ooize Server的性能不推荐使用),像email、ssh 等action通过oozie Server自身调用相关jar,不在hadoop集群中启动mapper任务的就是同步action;异步是启动一个异步动作,并立即返回结果,而不等待动作结束,实际的执行发生在Hadoop计算节点上,在Oozie Server之外,像mr、shell、java就是异步的action。
自定义一个可以执行mysql的sql并把结结果保存在本地的同步Action。
maven依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>behc</groupId> <artifactId>oozie-custom-action</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>attached</goal> </goals> </execution> </executions> <configuration> <descriptors> <descriptor>assembly.xml</descriptor> </descriptors> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> <properties> <oozie.version>4.3.1</oozie.version> <java.version>1.8</java.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <mysql.version>5.1.31</mysql.version> </properties> <dependencies> <dependency> <groupId>org.apache.oozie</groupId> <artifactId>oozie-core</artifactId> <version>${oozie.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> </dependencies> </project> package com.bonc.oozie; import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.util.XmlUtils; import org.jdom.Element; import org.jdom.JDOMException; import org.jdom.Namespace; import java.io.PrintWriter; import java.sql.*; import java.util.HashSet; import java.util.Set; /** * @program: ooziecustomaction * @description: oozie自定义同步action * @author: chendeyong * @create: 2019-10-10 11:48 */ public class MySQLSyncActionExecutor extends ActionExecutor{ private static final String SYNC_MYSQL_ACTION_NS="uri:oozie:sync-mysql-action:0.1"; private static final String ACTION_NAME="syncMysql"; /* * ConstructorAction developers need to write a no-arg constructor * that ultimately calls thesuper-class constructor passing the new action name (e.g., super("syncMysql")). * End users will use this name to define the new action type in the workflow XML. * */ protected MySQLSyncActionExecutor(){ super(ACTION_NAME); } /* * start(ActionExecutor.Context context, Action action) * Oozie invokes this method when it needs to execute the action. * Oozie passes two parameters to this method. * The first parameter context provides the APIs to access all workflow configurations variables for this action, * set the action status,and return any data to be used in the execution path. * The second parameterSupporting Custom Action Types info * includes the action’s definition from the workflow XML. * All synchronousactions must override this method because this method performs the actual execution. * At the end,the method needs to call context.setExecutionData(externalStatus, actionData) * to pass back the action status * */ @Override public void start(Context context, WorkflowAction workflowAction) throws ActionExecutorException { context.setStartData("-","-","-"); try{ Element actionXml = XmlUtils.parseXml(workflowAction.getConf()); Namespace ns = actionXml.getNamespace(SYNC_MYSQL_ACTION_NS); String jdbcUrl = actionXml.getChildTextTrim("jdbcUrl",ns); String sql = actionXml.getChildTextTrim("sql",ns); String sqlOutPutFilePath = actionXml.getChildTextTrim("sql_output_file_path",ns); runMysql(jdbcUrl,sql,sqlOutPutFilePath); context.setExecutionData("OK",null); }catch (JDOMException e){ throw convertException(e); } } /* * end(ActionExecutor.Context context, Action action) * Oozie invokes this method when the execution is finished. * In this method, the action executor should perform any cleanup required after completion. * The implementation usually calls context.setEndData(status, signalValue). * The status and signal value determine the next course of action. * */ @Override public void end(Context context, WorkflowAction workflowAction) throws ActionExecutorException { if(workflowAction.getExternalStatus().equals("OK")){ context.setEndData(WorkflowAction.Status.OK,WorkflowAction.Status.OK.toString()); }else { context.setEndData(WorkflowAction.Status.ERROR,WorkflowAction.Status.ERROR.toString()); } } /*check(ActionExecutor.Context context, Action action) * Oozie calls this method to check the action status. * For synchronous actions,Oozie does not need or call this method. * Therefore, for this example, it’s recom‐mended this method just throw an UnsupportedOperationException. * */ @Override public void check(Context context, WorkflowAction workflowAction) throws ActionExecutorException { throw new UnsupportedOperationException(); } /* * kill(ActionExecutor.Context context, Action action) * Oozie executes this method when it needs to kill the action for any reason. * Typical implementation of this method calls * context.setEndData(status,signalValue), passing Action.Status.KILLED as the status and ERROR as the sig‐nalValue * */ @Override public void kill(Context context, WorkflowAction workflowAction) throws ActionExecutorException { context.setEndData(WorkflowAction.Status.KILLED,"ERROR"); } private static Set<String> COMPLETED_STATUS = new HashSet<String>(); static { COMPLETED_STATUS.add("SUCCEEDED"); COMPLETED_STATUS.add("KILLED"); COMPLETED_STATUS.add("FAILED"); COMPLETED_STATUS.add("FAILED_KILLED"); } /* * isCompleted(externStatus) * This utility method is used to determine if an action status is in a terminal state. * */ @Override public boolean isCompleted(String s) { return COMPLETED_STATUS.contains(s); } private void runMysql(String jdbcUrl,String sql,String sqlOutPutFilePath) throws ActionExecutorException{ Connection connect = null; Statement statement = null; ResultSet resultSet = null; try { Class.forName("com.mysql.jdbc.Driver"); connect = DriverManager.getConnection(jdbcUrl); statement = connect.createStatement(); resultSet = statement.executeQuery(sql); writeResultSet(resultSet,sqlOutPutFilePath); }catch (Exception e){ throw convertException(e); }finally { try { if (resultSet != null) resultSet.close(); if (statement != null) statement.close(); if (connect != null) connect.close(); }catch (Exception e){ throw convertException(e); } } } private void writeResultSet(ResultSet resultSet,String sqlOutPutFilePath) throws Exception{ PrintWriter out; if(sqlOutPutFilePath != null && sqlOutPutFilePath.length()>0){ out = new PrintWriter(sqlOutPutFilePath); }else{ out = new PrintWriter(System.out); } ResultSetMetaData metaData = resultSet.getMetaData(); while (resultSet.next()){ for(int i=1;i<=metaData.getColumnCount();i++){ out.println(metaData.getCatalogName(i)+"="+resultSet.getNString(i)); } } out.close(); } }在工程目录\src\main\resources\sync_mysql-0.1.xsd文件下编写
<?xml version="1.0" encoding="UTF-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"表示数据类型等定义来自w3 xmlns:sync-mysql="uri:oozie:sync-mysql-action:0.1" 表示文档中要定义的元素来自什么命名空间elementFormDefault="qualified" targetNamespace="uri:oozie:sync-mysql-action:0.1"> <xs:element name="syncMysql" type="sync-mysql:SYNC_MYSQL_TYPE"/>此处表示要定义一个元素 <xs:complexType name="SYNC_MYSQL_TYPE"> <xs:sequence> <xs:element name="jdbcUrl" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="sql" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="sql_output_file_path" type="xs:string" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> </xs:schema>XSD–>xml Schema-Definition
定义一个xml文档有什么元素定义一个xml文档都会有什么属性定义某个节点都有什么样子的字节点,可以有多少子节点,多少个子节点,子节点出现的顺序定义元素和属性的数据类型定义元素和属性的默认值或固定值上传编译好的jar包,注意sync_mysql-0.1.xsd文件在jar的根目录下;
直接上传jar包(不包含依赖)到server的$OOZIE_HOME/libext目录下
停止服务执行(需要删除PID文件rm $OOZIE_HOME/oozie-server/temp/oozie.pid):
[oozie@hadoop01 bin]$ ./oozie-setup.sh prepare-war ... INFO: Adding extension: /opt/beh/core/oozie/libext/zookeeper-3.4.6.jar INFO: Adding extension: /opt/beh/core/oozie/libext/zookeeper-3.4.6-tests.jar New Oozie WAR file with added 'ExtJS library, JARs' at /opt/beh/core/oozie/oozie-server/webapps/oozie.war INFO: Oozie is ready to be started修改oozie-site.xml
<property> <name>oozie.service.ActionService.executor.ext.classes</name> <value>com.bonc.oozie.MySQLSyncActionExecutor</value> </property> <property> <name>oozie.service.SchemaService.wf.ext.schemas</name> <value>sync_mysql-0.1.xsd</value> </property>启动oozie服务
报错: INFO: validateJarFile(/opt/beh/core/oozie/oozie-server/webapps/oozie/WEB-INF/lib/oozie-custom-action-1.0-SNAPSHOT-jar-with-dependencies.jar) - jar not loaded. See Servlet Spec 2.3, section 9.7.2. Offending class: javax/servlet/Servlet.class Oct 10, 2019 7:15:29 PM org.apache.catalina.loader.WebappClassLoader validateJarFile INFO: validateJarFile(/opt/beh/core/oozie/oozie-server/webapps/oozie/WEB-INF/lib/servlet-api-2.5.jar) - jar not loaded. See Servlet Spec 2.3, section 9.7.2. Offending class: javax/servlet/Servlet.class #上传jar包包含的依赖发生冲突,除去依赖,重新生产war包结果符合预期
[oozie@hadoop01 sync-mysql]$ more /tmp/my_sync_sqloutput.txt id=1 name=zhangsan id=2 name=wang id=2 name=wang id=3 name=lisi 异常: Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-261222913-172.16.13.12-1564812628651:blk_1073773469_32902 file=/jobs/sync-mysql/workflow.xml 可能原因: /oozie/WEB-INF/lib下hdfs的client jar包(hadoop-hdfs-2.8.5.jar等)和外面集群不一致导致的,全部替换即可大概4-5个jar包 对于异步action,Oozie需要向hadoop提交一个启动器map任务,以及启动action所需的所有jar和配置。特别地,在Oozie中执行任何异步action时涉及到两个重要的类,一是派生自ActionExecutor的类将启动mapper作业提交给Hadoop集群以及包括实际action所需的jar和配置。ActionExecutor使用Hadoop的hdfs将jar和配置传递给正确的计算节点。启动作业最终会启动一个由Oozie实现的单件任务来执行任何动作类型,这个映射器被广泛称为LauncherMapper。
map任务(LauncherMapper)调用在计算节点上运行的action execution class的main()方法。不同的action类型扩展这个类来创建它们自己的主类来执行特定于action的代码。
实现 MySQLActionExecutor继承JavaActionExecutor(这个类需要从Ooize源码中拷贝到当前包下,有些方法需要重写,但由于方法修饰符导致权限不够)
package com.bonc.oozie; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.action.hadoop.LauncherMain; import org.apache.oozie.action.hadoop.MapReduceMain; import org.jdom.Element; import org.jdom.Namespace; import java.util.List; /** * @program: ooziecustomaction * @description: 自定义异步action * @author: chendeyong * @create: 2019-10-12 10:02 */ public class MySQLActionExecutor extends JavaActionExecutor { public static final String MYSQL_MAIN_CLASS_NAME = "com.bonc.oozie.MysqlMain"; public static final String JDBC_URL = "oozie.mysql.jdbc.url"; public static final String SQL_COMMAND="oozie.mysql.sql.command"; public static final String SQL_OUTPUT_PATH="oozie.mysql.sql.output.path"; public MySQLActionExecutor(){ super("mysql"); } /* * The method getLauncherClasses returns the list of classes * required to be executed by the launcher mapper. * Ooize server makes these classes available * to the launcher mapper through the distributed cache. * */ @Override public List<Class> getLauncherClasses() { List<Class> classes = super.getLauncherClasses(); classes.add(LauncherMain.class); classes.add(MapReduceMain.class); try { classes.add(Class.forName(MYSQL_MAIN_CLASS_NAME)); }catch (ClassNotFoundException e){ throw new RuntimeException("Class not found",e); } return classes; } /* * The method getLauncherMain returns the ActionMainclass(org.apache.oozie.action.hadoop.MySqlMain). * The launcher map code calls the main() method of this class. * */ @Override protected String getLauncherMain(Configuration launcherConf, Element actionXml) { return launcherConf.get("oozie.launcher.action.main.class", MYSQL_MAIN_CLASS_NAME); } /* * The setupActionConf method adds the configurationthat * is passed to the ActionMain class through a configuration file. * */ @Override Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) throws ActionExecutorException { super.setupActionConf(actionConf, context, actionXml, appPath); Namespace ns = actionXml.getNamespace(); String sql = actionXml.getChild("sql",ns).getTextTrim(); String jdbcUrl = actionXml.getChild("jdbcUrl",ns).getTextTrim(); String sqlOutPath = actionXml.getChild("sql_output_file_path",ns).getTextTrim(); actionConf.set(JDBC_URL,jdbcUrl); actionConf.set(SQL_COMMAND,sql); actionConf.set(SQL_OUTPUT_PATH,sqlOutPath); return actionConf; } /* * getDefaultShareLibName returns the name of the subdirectory under the systemsharelib directory. * This subdirectory hosts most of the JARs required to execute thisaction. * In this example, the mysql-connector-java-*.jar file * needs to be copied to themysql/ subdirectory under the sharelib directory. * */ @Override protected String getDefaultShareLibName(Element actionXml) { return "mysql"; } } 实现MysqlMain继承LauncherMain
package com.bonc.oozie; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.oozie.action.hadoop.LauncherMain; import java.io.BufferedWriter; import java.io.File; import java.io.OutputStreamWriter; import java.sql.*; /** * @program: ooziecustomaction * @description: Mysql action main 入口 * @author: chendeyong * @create: 2019-10-12 16:10 */ public class MysqlMain extends LauncherMain { public static void main(String args[]) throws Exception { run(MysqlMain.class,args); } @Override protected void run(String[] strings) throws Exception { System.out.println(); System.out.println("Oozie MySql action configuration"); System.out.println("============================================="); // loading action conf prepared by Oozie Configuration actionConf = new Configuration(false); String actionXml = System.getProperty("oozie.action.conf.xml"); if(actionXml == null){ throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]"); } if(!new File(actionXml).exists()){ throw new RuntimeException("Action Configuration XML file ["+actionXml+"] dose not exist"); } actionConf.addResource(new Path("file:///",actionXml)); String jdbcUrl = actionConf.get(MySQLActionExecutor.JDBC_URL); if(jdbcUrl == null ){ throw new RuntimeException("Action Configuration does not have "+MySQLActionExecutor.JDBC_URL+"property"); } String sqlCommand = actionConf.get(MySQLActionExecutor.SQL_COMMAND); if(jdbcUrl == null ){ throw new RuntimeException("Action Configuration does not have "+MySQLActionExecutor.SQL_COMMAND+"property"); } String sqlOutputPath = actionConf.get(MySQLActionExecutor.SQL_OUTPUT_PATH); if(jdbcUrl == null ){ throw new RuntimeException("Action Configuration does not have "+MySQLActionExecutor.SQL_OUTPUT_PATH+"property"); } System.out.println("Mysql coomands :" + sqlCommand); System.out.println("JDBC url :" + jdbcUrl); System.out.println("sqlOutputPath " + sqlOutputPath); System.out.println("===================================================="); System.out.println(); System.out.println(">>> Connecting to MySQL and executing sql now >>>"); System.out.println(); System.out.flush(); runMysql(jdbcUrl,sqlCommand,sqlOutputPath); } private void runMysql(String jdbcUrl,String sql,String sqlOutPutFilePath) throws RuntimeException{ Connection connect = null; Statement statement = null; ResultSet resultSet = null; try { Class.forName("com.mysql.jdbc.Driver"); connect = DriverManager.getConnection(jdbcUrl); statement = connect.createStatement(); resultSet = statement.executeQuery(sql); writeResultSet(resultSet,sqlOutPutFilePath); }catch (Exception e){ System.out.println(e.getMessage()); throw new RuntimeException("sql execution fail"); }finally { try { if (resultSet != null) resultSet.close(); if (statement != null) statement.close(); if (connect != null) connect.close(); }catch (Exception e){ System.out.println(e.getMessage()); throw new RuntimeException("connect close fail"); } } } private void writeResultSet(ResultSet resultSet,String sqlOutPutFilePath) throws Exception{ Configuration configuration = new Configuration(); Path outPath = new Path(sqlOutPutFilePath); BufferedWriter out =null; FileSystem fs = null; try { fs = outPath.getFileSystem(configuration); if (fs.exists(outPath)) { fs.delete(outPath, true); } fs.mkdirs(outPath); Path outFile = new Path(outPath, "sql.out"); out = new BufferedWriter(new OutputStreamWriter(fs.create(outFile), "UTF-8")); ResultSetMetaData metaData = resultSet.getMetaData(); while (resultSet.next()) { for (int i = 1; i <= metaData.getColumnCount(); i++) { out.write(metaData.getColumnName(i) + "=" + resultSet.getString(i)); } out.write("\n"); } }catch (Exception e){ System.out.println(e.getMessage()); } finally { if(out != null){ out.close(); } if(fs != null){ fs.close(); } } } }