Spark Streaming任务延迟监控及告警

mac2025-07-19  4

概述

StreamingListener 是针对spark streaming的各个阶段的事件监听机制。

StreamingListener接口

//需要监听spark streaming中各个阶段的事件只需实现这个特质中对应的事件函数即可 //本身既有注释说明 trait StreamingListener { /** Called when the streaming has been started */ /** streaming 启动的事件 */ def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { } /** Called when a receiver has been started */ /** 接收启动事件 */ def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { } /** Called when a receiver has reported an error */ def onReceiverError(receiverError: StreamingListenerReceiverError) { } /** Called when a receiver has been stopped */ def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { } /** Called when a batch of jobs has been submitted for processing. */ /** 每个批次提交的事件 */ def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { } /** Called when processing of a batch of jobs has started. */ /** 每个批次启动的事件 */ def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } /** Called when processing of a batch of jobs has completed. */ /** 每个批次完成的事件 */ def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } /** Called when processing of a job of a batch has started. */ def onOutputOperationStarted( outputOperationStarted: StreamingListenerOutputOperationStarted) { } /** Called when processing of a job of a batch has completed. */ def onOutputOperationCompleted( outputOperationCompleted: StreamingListenerOutputOperationCompleted) { } }

自定义StreamingListener

功能:监控批次处理时间,若超过阈值则告警,每次告警间隔2分钟

class SparkStreamingDelayListener(private val appName:String, private val duration: Int,private val times: Int) extends StreamingListener{ private val logger = LoggerFactory.getLogger("SparkStreamingDelayListener") //每个批次完成时执行 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { val batchInfo = batchCompleted.batchInfo val processingStartTime = batchCompleted.batchInfo.processingStartTime val numRecords = batchCompleted.batchInfo.numRecords val processingEndTime = batchInfo.processingEndTime val processingDelay = batchInfo.processingDelay val totalDelay = batchInfo.totalDelay //将每次告警时间写入redis,用以判断告警间隔大于2分钟 val jedis = RedisClusterClient.getJedisClusterClient() val current_time = (System.currentTimeMillis / 1000).toInt val redis_time = jedis.get(appName) var flag = false if(redis_time==null || current_time-redis_time.toInt>120){ jedis.set(appName,current_time.toString) flag = true } //若批次处理延迟大于批次时长指定倍数,并且告警间隔大约2分钟,则告警 if(totalDelay.get >= times * duration * 1000 && flag){ val monitorContent = appName ": numRecords ->" numRecords ",processingDelay ->" processingDelay.get/1000 " s,totalDelay -> " totalDelay.get/1000 "s" println(monitorContent) val msg = "Streaming_" appName "_DelayTime:" totalDelay.get/1000 "S" val getURL = "http://node1:8002/message/weixin?msg=" msg HttpClient.doGet(getURL) } } }

应用

//streamingListener不需要在配置中设置,可以直接添加到streamingContext中 object My{ def main(args : Array[String]) : Unit = { val sparkConf = new SparkConf() val ssc = new StreamingContext(sparkConf,Seconds(20)) ssc.addStreamingListener(new SparkStreamingDelayListener("Userid2Redis", duration,times)) .... } }

订阅关注微信公众号《大数据技术进阶》,及时获取更多大数据架构和应用相关技术文章!

最新回复(0)