Spark Streaming状态管理函数updateStateByKey和mapWithState
一、状态管理函数二、mapWithState2.1关于mapWithState2.2mapWithState示例Scala:2.3mapWithState算子应用示例2.4mapWithState应用示例2.5SparkStreaming之mapWithState
三、updateStateByKey3.1关于updateStateByKey3.2updateStateByKey示例Scala:
四、updateStateByKey和mapWithState的区别4.1适用场景
相关内容原文地址:
SparkStreaming之解析mapWithState Spark Streaming状态管理函数 updateStateByKey和mapWithState SparkStreaming之mapWithState
一、状态管理函数
Spark Streaming中状态管理函数包括updateStateByKey和mapWithState,都是用来统计全局key的状态的变化的。它们以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加,在有新的数据信息进入或更新时。能够让用户保持想要的不论什么状。
二、mapWithState
2.1关于mapWithState
mapWithState也会统计全局的key的状态,但是如果没有数据输入,便不会返回之前的key的状态,类似于增量的感觉。
需要自己写一个匿名函数func来实现自己想要的功能。如果有初始化的值得需要,可以使用initialState(RDD)来初始化key的值。 另外,还可以指定timeout函数,该函数的作用是,如果一个key超过timeout设定的时间没有更新值,那么这个key将会失效。这个控制需要在func中实现,必须使用state.isTimingOut()来判断失效的key值。如果在失效时间之后,这个key又有新的值了,则会重新计算。如果没有使用isTimingOut,则会报错。
2.2mapWithState示例Scala:
package spark2x
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.
{DStream, MapWithStateDStream, ReceiverInputDStream
}
import org.apache.spark.streaming.
{Seconds, State, StateSpec, StreamingContext
}
object MapWithState
{
// 设置本地运行模式
def main
(args: Array
[String
]): Unit
= {
val spark
= SparkSession.builder
()
.master
("local[2]")
.appName
("MapWithState")
.getOrCreate
()
// 创建一个context,批次间隔为2秒钟,
val ssc: StreamingContext
= new StreamingContext
(spark.sparkContext, Seconds
(3
))
// 设置checkpoint目录
ssc.checkpoint
(".")
// 创建一个ReceiverInputDStream,从服务器端的netcat接收数据。
// 服务器主机名SC01(SC01已在Window上的hosts文件中做了映射,没做映射的则写ip就OK了),监听端口为6666
val line: ReceiverInputDStream
[String
] = ssc.socketTextStream
("SC01", 6666
)
// 对接收到的数据进行处理,进行切割,分组形式为
(day, 1
) (word 1
)
val wordsStream: DStream
[(String, Int
)] = line.flatMap
(_.split
(" ")).map
((_, 1))
val wordCount: MapWithStateDStream
[String, Int, Int, Any
] = wordsStream.mapWithState
(StateSpec.function
(func
).timeout
(Seconds
(30
)))
// 打印
wordCount.print
()
// 提交
ssc.start
()
//
ssc.awaitTermination
()
}
/**
* 定义一个函数,该函数有三个类型word: String, option: Option
[Int
], state: State
[Int
]
* 其中word代表统计的单词,option代表的是历史数据,state代表的是返回的状态
*/
val func
= (word: String, option: Option
[Int
], state: State
[Int
]) => {
if
(state.isTimingOut
()){
println
(word +
"is timeout")
}else
{
// 获取历史数据,当前值加上上一个批次的该状态的值
val
sum = option.getOrElse
(0
) + state.getOption
().getOrElse
(0
)
// 单词和该单词出现的频率
val wordFreq
= (word, sum
)
// 更新状态
state.update
(sum
)
wordFreq
}
}
}
mapWithState它会按照时间线在每一个批次间隔返回之前的发生改变的或者新的key的状态,不发生变化的不返回。同时mapWithState可以不用设置checkpoint,返回的数据量少,性能和效率都比mapWithState好。
2.3mapWithState算子应用示例
import org
.apache
.spark
.SparkConf
import org
.apache
.spark
.streaming
.{Seconds
, State
, StateSpec
, StreamingContext
}
object MapWithStateApp
{
def
main(args
: Array
[String
]): Unit
= {
val conf
= new SparkConf().setMaster("local[2]").setAppName("MapWithStateApp")
val ssc
= new StreamingContext(conf
,Seconds(5))
ssc
.checkpoint("hdfs://192.168.137.251:9000/spark/data")
val lines
= ssc
.socketTextStream("hadoop000",9999)
val words
= lines
.flatMap(_
.split(" "))
val pairs
= words
.map(x
=>(x
,1)).reduceByKey(_
+_
)
val mappingFunc
= (word
: String
, one
: Option
[Int
], state
: State
[Int
]) => {
val sum
= one
.getOrElse(0) + state
.getOption
.getOrElse(0)
val output
= (word
, sum
)
state
.update(sum
)
output
}
val wordcounts
= pairs
.mapWithState(StateSpec
.function(mappingFunc
))
wordcounts
.print()
ssc
.start()
ssc
.awaitTermination()
}
}
2.4mapWithState应用示例
package stateParse
import org
.apache
.spark
.streaming
._
import org
.apache
.spark
.{SparkConf
, SparkContext
}
object TestMapWithState
{
def
main(args
: Array
[String
]): Unit
= {
val conf
= new SparkConf()
conf
.setAppName(s
"${this.getClass.getSimpleName}")
conf
.setMaster("local[2]")
val sc
= new SparkContext(conf
)
val ssc
= new StreamingContext(sc
, Seconds(3))
ssc
.checkpoint("/checkpoint/")
val line
= ssc
.socketTextStream("127.0.0.1",9999)
val wordDStream
= line
.flatMap(_
.split(",")).map(x
=>(x
,1))
val mappingFunc
= (userId
:String
,value
:Option
[Int
],state
:State
[Int
])=>{
val sum
= value
.getOrElse(0) + state
.getOption().getOrElse(0)
val output
= (userId
,sum
)
state
.update(sum
)
output
}
val stateDStream
= wordDStream
.mapWithState(StateSpec
.function(mappingFunc
).timeout(Minutes(60))).print()
ssc
.start()
ssc
.awaitTermination()
}
}
mapWithState接收的参数是一个StateSpec对象,在StateSpec中封装了状态管理的函数。我们定义了一个状态更新函数mappingFunc,该函数会更新指定用户的状态,同时会返回更新后的状态,将该函数传给mapWithState,并设置状态超时时间。SparkStreaming通过根据我们定义的更新函数,在每个计算时间间隔内更新内部维护的状态,同时返回经过mappingFunc处理后的结果数据流。
2.5SparkStreaming之mapWithState
与updateStateByKey方法相比,使用mapWithState方法能够得到6倍的低延迟的同时维护的key状态的数量要多10倍,这一性能的提升和扩展性可以从基准测试结果得到验证,所有的结果全部在实践间隔为1秒的batch和相同大小的集群中生成。
下图比较的是mapWithState方法和updateStateByKey方法处理1秒的batch所消耗的平均时间。在本例子中,我们为同样数量的的key(0.25-1百万)保存状态,然后已同意的速率(30个更新/s)对其进行更新。可以看到mapWithState方法比updateStateByKey方法快了8倍,从而允许更低的端到端的延迟。
package com
.llcc
.sparkSql
.MyTimeSort
import org
.apache
.spark
.streaming
.StreamingContext
import org
.apache
.spark
.SparkConf
import org
.apache
.spark
.streaming
.Seconds
import org
.apache
.spark
.streaming
.StateSpec
import org
.apache
.spark
.streaming
.State
object MapWithStateDemo
{
def
main(args
: Array
[String
]): Unit
= {
val conf
=new SparkConf().setMaster("local[2]").setAppName("MapWithStateDemo")
val ssc
=new StreamingContext(conf
,Seconds(2));
ssc
.checkpoint(".")
val fileDS
=ssc
.socketTextStream("hadoop1", 9999)
val wordDstream
=fileDS
.flatMap
{ line
=> line
.split("\t") }
.map
{ word
=> (word
,1) }
val mappingFunc
= (word
: String
, one
: Option
[Int
], state
: State
[Int
]) => {
val sum
= one
.getOrElse(0) + state
.getOption
.getOrElse(0)
val output
= (word
, sum
)
state
.update(sum
)
output
}
val initialRDD
= ssc
.sparkContext
.parallelize(List(("hello", 1), ("world", 1)))
val stateDstream
= wordDstream
.mapWithState(
StateSpec
.function(mappingFunc
).initialState(initialRDD
))
stateDstream
.print();
ssc
.start()
ssc
.awaitTermination()
}
}
三、updateStateByKey
3.1关于updateStateByKey
updateStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。
3.2updateStateByKey示例Scala:
package spark2x
import org
.apache
.spark
.SparkContext
import org
.apache
.spark
.sql
.SparkSession
import org
.apache
.spark
.streaming
.{Seconds
, StreamingContext
}
import org
.apache
.spark
.streaming
.dstream
.{DStream
, ReceiverInputDStream
}
object UpdateStateByKeyDemo
{
def
main(args
: Array
[String
]): Unit
= {
val spark
= SparkSession
.builder()
.master("local[2]")
.appName("UpdateStateByKeyDemo")
.getOrCreate()
val conf
: SparkContext
= spark
.sparkContext
val ssc
: StreamingContext
= new StreamingContext(conf
, Seconds(3))
ssc
.checkpoint("hdfs://SC01:8020/user/tmp/cp-20181201")
val line
: ReceiverInputDStream
[String
] = ssc
.socketTextStream("SC01", 6666)
val words
: DStream
[String
] = line
.flatMap(_
.split(" "))
val pairs
: DStream
[(String
, Int
)] = words
.map((_
, 1))
val wordCount
: DStream
[(String
, Int
)] = pairs
.updateStateByKey((values
: Seq
[Int
], state
: Option
[Int
]) => {
var newValue
= state
.getOrElse(0)
for (value
<- values
) {
newValue
+= value
}
Option(newValue
)
})
wordCount
.print()
ssc
.start()
ssc
.awaitTermination()
}
val func
= (it
: Iterator
[(String
, Seq
[Int
], Option
[Int
])]) => {
it
.map(tup
=> {
(tup
._1
, tup
._2
.sum
+ tup
._3
.getOrElse(0))
})
}
}
四、updateStateByKey和mapWithState的区别
updateStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。
mapWithState只返回变化后的key的值,这样做的好处是,我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高(再生产环境中建议使用这个)。
4.1适用场景
updateStateByKey可以用来统计历史数据。例如统计不同时间段用户平均消费金额,消费次数,消费总额,网站的不同时间段的访问量等指标
mapWithState可以用于一些实时性较高,延迟较少的一些场景,例如你在某宝上下单买了个东西,付款之后返回你账户里的余额信息。