Flink接收RabbitMQ数据写入到Oracle

mac2025-02-09  10

文件内容

FlinkMain.java

import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; public class FlinkMain { public static void main(String[] args) throws Exception { // 1,执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2,RabbitMQ配置 RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("192.168.1.3") .setPort(5673) .setUserName("panfeng") .setPassword("panfeng") .setVirtualHost("/panfeng") .build(); // 3,添加资源 DataStreamSource<String> dataStreamSource = env.addSource(new RMQSource<String>( connectionConfig, "flink", true, new SimpleStringSchema())); // 4,添加到流,去执行接收到的数据进行入库 dataStreamSource.addSink(new SinkOracle()); // 5,执行工作,定义一个工作名称 env.execute("rabbitmq flink oracle"); } }

SinkOracle.java

import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class SinkOracle extends RichSinkFunction<String> { private Connection connection; private PreparedStatement statement; // 1,初始化 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("oracle.jdbc.OracleDriver"); connection = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:orcl", "scott", "123456"); statement = connection.prepareStatement("INSERT INTO FLINK VALUES (SEQ_FLINK.NEXTVAL,?)"); } // 2,执行 @Override public void invoke(String value, Context context) throws Exception { System.out.println("value.toString()-------" + value.toString()); statement.setString(1, value); statement.execute(); } // 3,关闭 @Override public void close() throws Exception { super.close(); if (statement != null) statement.close(); if (connection != null) connection.close(); } }

pom.xml

<dependencies> <!--flink-java--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.9.0</version> </dependency> <!--flink-streaming-java_2.11--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.9.0</version> </dependency> <!--flink-connector-rabbitmq_2.11--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq_2.11</artifactId> <version>1.9.0</version> </dependency> <!--Oracle--> <dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc8</artifactId> <version>12.2.0.1.0</version> </dependency> </dependencies>

测试步骤

执行 Flink.java中的主方法,往对应队列中传入数据,可以输入到控制台

如果想把配置信息写文件application.properties的话配置文件内容 db.driver=oracle.jdbc.OracleDriver db.url=jdbc:oracle:thin:@10.18.20.180:1521:MUDATA db.username=MD_REF db.password=MD_REF_2018 rmq.host=10.18.20.13 rmq.port=5672 rmq.username=camel rmq.password=camel123 rmq.vhost=reference rmq.exchanges=ref.muservice.input rmq.queue.airport=two.airport.muservice.input rmq.queue.city=two.city.muservice.input rmq.queue.country=two.country.muservice.input 读取RabbitMQ import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; public class CountryFlinkMain { public static void main(String[] args) throws Exception { // 1,执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2,读取 country.properties 配置 ParameterTool pt=ParameterTool.fromPropertiesFile("flink-crew-assign/src/main/resources/country.properties"); // 3,RabbitMQ配置 RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost(pt.get("rmq.host")) .setPort(Integer.parseInt(pt.get("rmq.port"))) .setUserName(pt.get("rmq.username")) .setPassword(pt.get("rmq.password")) .setVirtualHost(pt.get("rmq.vhost")) .build(); // 4,添加资源,RMQSource(OUT) DataStreamSource dataStreamSource = env.addSource(new RMQSource( connectionConfig, pt.get("rmq.queue.country"),// 国家 true, new SimpleStringSchema())); // 5,添加到流,去执行接收到的数据进行入库,addSink(IN) dataStreamSource.addSink(new CountrySinkOracle()); // 6,执行工作,定义一个工作名称 env.execute("rabbitmq flink oracle"); } } 读取数据库 public class CountrySinkOracle extends RichSinkFunction { private Connection conn; private PreparedStatement statement; // 1,初始化 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ParameterTool pt = ParameterTool.fromPropertiesFile("flink-crew-assign/src/main/resources/country.properties"); Class.forName(pt.get("db.driver")); conn = DriverManager.getConnection(pt.get("db.url"), pt.get("db.username"), pt.get("db.password")); }
最新回复(0)