重慶分公司,新征程啟航
為企業(yè)提供網(wǎng)站建設、域名注冊、服務器等服務
為企業(yè)提供網(wǎng)站建設、域名注冊、服務器等服務
這篇文章主要講解了“Flink Connectors怎么連接MySQL”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Flink Connectors怎么連接MySql”吧!
成都創(chuàng)新互聯(lián)公司主要從事成都網(wǎng)站設計、成都網(wǎng)站制作、網(wǎng)頁設計、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務。立足成都服務開福,十年網(wǎng)站建設經(jīng)驗,價格優(yōu)惠、服務專業(yè),歡迎來電咨詢建站服務:13518219792
通過使用Flink DataStream Connectors 數(shù)據(jù)流連接器連接到Mysql數(shù)據(jù)源,并基于JDBC提供數(shù)據(jù)流輸入與輸出操作
示例環(huán)境
java.version: 1.8.xflink.version: 1.11.1mysql:5.7.x
數(shù)據(jù)流輸入
DataStreamSource.java
package com.flink.examples.mysql; import com.flink.examples.TUser; import com.google.gson.Gson; import org.apache.flink.api.java.io.jdbc.JDBCOptions; import org.apache.flink.api.java.io.jdbc.JDBCTableSource; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * @Description 將mysql表中數(shù)據(jù)查詢輸出到DataStream流中 */ public class DataStreamSource { /** * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //查詢sql String sql = "SELECT id,name,age,sex,address,createTimeSeries FROM t_user"; //設置表視圖字段與類型 TableSchema tableSchema = TableSchema.builder() .field("id", DataTypes.INT()) .field("name", DataTypes.STRING()) .field("age", DataTypes.INT()) .field("sex", DataTypes.INT()) .field("address", DataTypes.STRING()) //.field("createTime", DataTypes.TIMESTAMP()) .field("createTimeSeries", DataTypes.BIGINT()) .build(); //配置jdbc數(shù)據(jù)源選項 JDBCOptions jdbcOptions = JDBCOptions.builder() .setDriverName(MysqlConfig.DRIVER_CLASS) .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL) .setUsername(MysqlConfig.SOURCE_USER) .setPassword(MysqlConfig.SOURCE_PASSWORD) .setTableName("t_user") .build(); JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build(); //將數(shù)據(jù)源注冊到tableEnv視圖student中 tEnv.registerTableSource("t_user", jdbcTableSource); Table table = tEnv.sqlQuery(sql); DataStreamsourceStream = tEnv.toAppendStream(table, TUser.class); sourceStream.map((t)->new Gson().toJson(t)).print(); env.execute("flink mysql source"); } }
數(shù)據(jù)流輸出
DataStreamSink.java
package com.flink.examples.mysql; import com.flink.examples.TUser; import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.sinks.TableSink; import static org.apache.flink.table.api.Expressions.$; /** * @Description 將DataStream數(shù)據(jù)流插入到mysql表中 */ public class DataStreamSink { /** * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(2000); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //查詢sql String sql = "insert into t_user (id,name,age,sex,address,createTimeSeries) values (?,?,?,?,?,?)"; //封裝數(shù)據(jù) TUser user = new TUser(); user.setId(0); user.setName("zhao1"); user.setAge(22); user.setSex(1); user.setAddress("CN"); user.setCreateTimeSeries(System.currentTimeMillis()); DataStreamsourceStream = env.fromElements(user); //從DataStream獲取數(shù)據(jù) // Expression id = ExpressionParser.parseExpression("id"); // Expression name = ExpressionParser.parseExpression("name"); // Expression age = ExpressionParser.parseExpression("age"); // Expression sex = ExpressionParser.parseExpression("sex"); // Expression address = ExpressionParser.parseExpression("address"); // Expression createTimeSeries = ExpressionParser.parseExpression("createTimeSeries"); // Table table = tEnv.fromDataStream(sourceStream, id, name, age, sex, address, createTimeSeries ); Table table = tEnv.fromDataStream(sourceStream,$("id"),$("name"),$("age"),$("sex"),$("address"),$("createTimeSeries")); //輸出到mysql //設置表視圖字段與類型 TableSchema tableSchema = TableSchema.builder() .field("id", DataTypes.INT()) .field("name", DataTypes.STRING()) .field("age", DataTypes.INT()) .field("sex", DataTypes.INT()) .field("address", DataTypes.STRING()) //.field("createTime", DataTypes.TIMESTAMP()) .field("createTimeSeries", DataTypes.BIGINT()) .build(); //設置sink輸出jdbc TableSink tableSink = JDBCAppendTableSink.builder() .setDrivername(MysqlConfig.DRIVER_CLASS) .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL) .setUsername(MysqlConfig.SOURCE_USER) .setPassword(MysqlConfig.SOURCE_PASSWORD) .setQuery(sql) .setParameterTypes(tableSchema.getFieldTypes()) .setBatchSize(100) .build(); //將數(shù)據(jù)源注冊到tableEnv視圖result中 tEnv.registerTableSink("result", tableSchema.getFieldNames(), tableSchema.getFieldTypes(), tableSink); //在指定的路徑下注冊,然后執(zhí)行插入操作 table.executeInsert("result"); } }
數(shù)據(jù)源配置類
MysqlConfig.java
package com.flink.examples.mysql; /** * @Description Mysql數(shù)據(jù)庫連接配置 */ public class MysqlConfig { public final static String DRIVER_CLASS="com.mysql.jdbc.Driver"; public final static String SOURCE_DRIVER_URL="jdbc:mysql://127.0.0.1:3306/flink?useUnicode=true&characterEncoding=utf-8&useSSL=false"; public final static String SOURCE_USER="root"; public final static String SOURCE_PASSWORD="root"; }
數(shù)據(jù)展示
感謝各位的閱讀,以上就是“Flink Connectors怎么連接MySql”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對Flink Connectors怎么連接MySql這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關知識點的文章,歡迎關注!