小C菜鸟关注
0.1212018.04.16 23:44:24字数 420阅读 3,557
大家可能都知道通过上面这条命令,就可以在Flink集群上部署一个Job,执行你想要的功能,那么具体这个Job是怎么提交到集群的机器上,并执行的呢?
如果仔细去看flink脚本的代码就会发现,最终会执行以下命令:
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.CliFrontend "$@"可以看出,是通过CliFrontend类去执行相应的操作,它是所有job的入口类,通过解析用户传递的参数(jar包,mainClass等),读取flink的环境,配置信息等,封装成PackagedProgram,最终通过ClusterClient提交给Flink集群。 首先看下入口:
/** * Submits the job based on the arguments. */ public static void main(final String[] args) { EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args); try { final CliFrontend cli = new CliFrontend(); SecurityUtils.install(new SecurityConfiguration(cli.config)); int retCode = SecurityUtils.getInstalledContext() .runSecured(new Callable<Integer>() { @Override public Integer call() { return cli.parseParameters(args); } }); System.exit(retCode); } catch (Throwable t) { LOG.error("Fatal error while running command line interface.", t); t.printStackTrace(); System.exit(31); } }可以看出这里的逻辑很简单,核心就是调用cli.parseParameters(args);方法,那么我们再来看下这个方法:
/** * Parses the command line arguments and starts the requested action. * * @param args command line arguments of the client. * @return The return code of the program */ public int parseParameters(String[] args) { // check for action if (args.length < 1) { CliFrontendParser.printHelp(); System.out.println("Please specify an action."); return 1; } // get action String action = args[0]; // remove action from parameters final String[] params = Arrays.copyOfRange(args, 1, args.length); // do action switch (action) { case ACTION_RUN: return run(params); case ACTION_LIST: return list(params); case ACTION_INFO: return info(params); case ACTION_CANCEL: return cancel(params); case ACTION_STOP: return stop(params); case ACTION_SAVEPOINT: return savepoint(params); case "-h": case "--help": CliFrontendParser.printHelp(); return 0; case "-v": case "--version": String version = EnvironmentInformation.getVersion(); String commitID = EnvironmentInformation.getRevisionInformation().commitId; System.out.print("Version: " + version); System.out.println(!commitID.equals(EnvironmentInformation.UNKNOWN) ? ", Commit ID: " + commitID : ""); return 0; default: System.out.printf("\"%s\" is not a valid action.\n", action); System.out.println(); System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\"."); System.out.println(); System.out.println("Specify the version option (-v or --version) to print Flink version."); System.out.println(); System.out.println("Specify the help option (-h or --help) to get help on the command."); return 1; } }这里会根据action的不同而调用不同的方法,因为我们此时执行的是run操作,那么我们主要看下run,即执行程序:
/** * Executions the run action. * * @param args Command line arguments for the run action. */ protected int run(String[] args) { LOG.info("Running 'run' command."); RunOptions options; try { //解析运行命令,生成运行时的参数 options = CliFrontendParser.parseRunCommand(args); } catch (CliArgsException e) { return handleArgException(e); } catch (Throwable t) { return handleError(t); } // evaluate help flag if (options.isPrintHelp()) { CliFrontendParser.printHelpForRun(); return 0; } if (options.getJarFilePath() == null) { return handleArgException(new CliArgsException("The program JAR file was not specified.")); } PackagedProgram program; try { LOG.info("Building program from JAR file"); //创建一个封装入口类、jar文件、classpath路径、用户配置参数的实例:PackagedProgram program = buildProgram(options); } catch (FileNotFoundException e) { return handleArgException(e); } catch (Throwable t) { return handleError(t); } ClusterClient client = null; try { //根据集群类型创建不同的ClusterClient,比如StandaloneClusterClient,YarnClusterClient分别对应Standalone集群和Yarn集群 client = createClient(options, program); client.setPrintStatusDuringExecution(options.getStdoutLogging()); client.setDetached(options.getDetachedMode()); LOG.debug("Client slots is set to {}", client.getMaxSlots()); LOG.debug(options.getSavepointRestoreSettings().toString()); int userParallelism = options.getParallelism(); LOG.debug("User parallelism is set to {}", userParallelism); if (client.getMaxSlots() != -1 && userParallelism == -1) { logAndSysout("Using the parallelism provided by the remote cluster (" + client.getMaxSlots() + "). " + "To use another parallelism, set it at the ./bin/flink client."); userParallelism = client.getMaxSlots(); } else if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) { userParallelism = defaultParallelism; } //最重要的方法,执行程序,通过client将程序提交到JobManager return executeProgram(program, client, userParallelism); } catch (Throwable t) { return handleError(t); } finally { if (client != null) { try { client.shutdown(); } catch (Exception e) { LOG.warn("Could not properly shut down the cluster client.", e); } } if (program != null) { program.deleteExtractedLibraries(); } } }即将进入核心的逻辑,看下executeProgram方法做了什么:
// -------------------------------------------------------------------------------------------- // Interaction with programs and JobManager // -------------------------------------------------------------------------------------------- protected int executeProgram(PackagedProgram program, ClusterClient client, int parallelism) { logAndSysout("Starting execution of program"); JobSubmissionResult result; try { // CluterClient提交程序 result = client.run(program, parallelism); } catch (ProgramParametrizationException e) { return handleParametrizationException(e); } catch (ProgramMissingJobException e) { return handleMissingJobException(); } catch (ProgramInvocationException e) { return handleError(e); } finally { program.deleteExtractedLibraries(); } if (null == result) { logAndSysout("No JobSubmissionResult returned, please make sure you called " + "ExecutionEnvironment.execute()"); return 1; } if (result.isJobExecutionResult()) { logAndSysout("Program execution finished"); JobExecutionResult execResult = result.getJobExecutionResult(); System.out.println("Job with JobID " + execResult.getJobID() + " has finished."); System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms"); Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults(); if (accumulatorsResult.size() > 0) { System.out.println("Accumulator Results: "); System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult)); } } else { logAndSysout("Job has been submitted with JobID " + result.getJobID()); } return 0; }下面的逻辑跳入到了CluterClient中,通过执行它的run方法提交程序:
/** * General purpose method to run a user jar from the CliFrontend in either blocking or detached mode, depending * on whether {@code setDetached(true)} or {@code setDetached(false)}. * * @param prog the packaged program * @param parallelism the parallelism to execute the contained Flink job * @return The result of the execution * @throws ProgramMissingJobException * @throws ProgramInvocationException */ public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException { Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); if (prog.isUsingProgramEntryPoint()) { // 如果包含入口类(非交互模式提交Job) // JobWithJars是一个Flink数据流计划,包含了jar中所有的类,以及用于加载用户代码的ClassLoader final JobWithJars jobWithJars; if (hasUserJarsInClassPath(prog.getAllLibraries())) { jobWithJars = prog.getPlanWithoutJars(); } else { jobWithJars = prog.getPlanWithJars(); } return run(jobWithJars, parallelism, prog.getSavepointSettings()); } else if (prog.isUsingInteractiveMode()) { // 使用交互模式提交Job log.info("Starting program in interactive mode"); final List<URL> libraries; if (hasUserJarsInClassPath(prog.getAllLibraries())) { libraries = Collections.emptyList(); } else { libraries = prog.getAllLibraries(); } ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries, prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(), prog.getSavepointSettings()); ContextEnvironment.setAsContext(factory); try { // 调用main方法 prog.invokeInteractiveModeForExecution(); if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) { throw new ProgramMissingJobException("The program didn't contain a Flink job."); } if (isDetached()) { // in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute(); } else { // in blocking mode, we execute all Flink jobs contained in the user code and then return here return this.lastJobExecutionResult; } } finally { ContextEnvironment.unsetContext(); } } else { throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode."); } }这里我们主要关注包含入口类的情况,继续往下看:
// 此时Client已经连接到Flink的集群,该调用将被阻塞直到执行完成 public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings) throws CompilerException, ProgramInvocationException { ClassLoader classLoader = jobWithJars.getUserCodeClassLoader(); if (classLoader == null) { throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader."); } //这里根据流或批,为每一个operator进行优化,例如shuffle的方式,hash join、sort-merge join或者广播等进行优化 OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism); //根据优化后的执行计划,jar文件,classpath,类加载器,保存点设置运行 return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings); } public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException { JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointSettings); // 提交job return submitJob(job, classLoader); } /** * Calls the subclasses' submitJob method. It may decide to simply call one of the run methods or it may perform * some custom job submission logic. * * @param jobGraph The JobGraph to be submitted * @return JobSubmissionResult */ protected abstract JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException;可以看出,最终调用链到了submitJob方法,它是一个抽象方法,我们以StandaloneClusterClient实现为例进行讲解,它最终又会根据是否是分离模式,分别调用ClusterClient的runDetached和run方法。
@Override protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { if (isDetached()) { return super.runDetached(jobGraph, classLoader); } else { return super.run(jobGraph, classLoader); } }我们看下run方法:
/** * Submits a JobGraph blocking. * * @param jobGraph The JobGraph * @param classLoader User code class loader to deserialize the results and errors (may contain custom classes). * @return JobExecutionResult * @throws ProgramInvocationException */ public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { waitForClusterToBeReady(); final ActorSystem actorSystem; try { actorSystem = actorSystemLoader.get(); } catch (FlinkException fe) { throw new ProgramInvocationException("Could not start the ActorSystem needed to talk to the " + "JobManager.", fe); } try { logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion."); this.lastJobExecutionResult = JobClient.submitJobAndWait( actorSystem, flinkConfig, highAvailabilityServices, jobGraph, timeout, printStatusDuringExecution, classLoader); return lastJobExecutionResult; } catch (JobExecutionException e) { throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); } }最终是通过JobClient进行Job的提交。