sink是将数据源最终写入文件或者数据库或者其他中间件当中。
上述1在之前写过。将数据写入txt。print为打印到控制台。第三则是将数据写入kafka和redis中。
可以将数据写入如下的消息中间件或者Nosql数据库当中。下面中间件是对Flink sink的支持。
Apache Kafka (source/sink)Apache Cassandra (sink)Elasticsearch (sink)Hadoop FileSystem (sink)RabbitMQ (source/sink)Apache ActiveMQ (source/sink)Redis (sink)实现自定义的sink 实现SinkFunction接口 或者继承RichSinkFunction 参考org.apache.flink.streaming.connectors.redis.RedisSink
下面的demo是将数据写入redis当中。通过网络socket获取数据源。
添加redis flink的依赖
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>代码如下:
package com.caozg.stream.sink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; /** * 接收socket数据,把数据保存到redis中 * * list * * lpush list_key value * */ public class StreamingDemoToRedis { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 15000, "\n"); //lpsuh l_words word //对数据进行组装,把string转化为tuple2<String,String> DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() { public Tuple2<String, String> map(String value) throws Exception { return new Tuple2<String, String>("l_words", value); } }); //创建redis的配置 FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() .setHost("127.0.0.1") .setPort(6379) .build(); //创建redissink RedisSink<Tuple2<String, String>> redisSink = new RedisSink<Tuple2<String, String>>(conf, new MyRedisMapper()); l_wordsData.addSink(redisSink); env.execute("StreamingDemoToRedis"); } public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>>{ //表示从接收的数据中获取需要操作的redis key public String getKeyFromData(Tuple2<String, String> data) { return data.f0; } //表示从接收的数据中获取需要操作的redis value public String getValueFromData(Tuple2<String, String> data) { return data.f1; } public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.LPUSH); } } }通过nc -l 15000。然后生产单词数据。打开redis客户端,使用monitor监控命令,可以看到数据的变化:
caozg@caozg-PC:/usr/local/redis/bin$ ./redis-cli 127.0.0.1:6379> 127.0.0.1:6379> monitor OK 1572572362.919833 [0 127.0.0.1:37358] "LPUSH" "l_words" "heloo" 1572572373.034903 [0 127.0.0.1:37366] "LPUSH" "l_words" "kkk" 1572572376.342446 [0 127.0.0.1:37374] "LPUSH" "l_words" "mon" 1572572377.745541 [0 127.0.0.1:37376] "LPUSH" "l_words" "dhshf" 1572572401.888554 [0 127.0.0.1:37388] "auth" "123456"最后使用工具可以看下redis: