老熟女激烈的高潮_日韩一级黄色录像_亚洲1区2区3区视频_精品少妇一区二区三区在线播放_国产欧美日产久久_午夜福利精品导航凹凸

重慶分公司,新征程啟航

為企業(yè)提供網(wǎng)站建設、域名注冊、服務器等服務

FlinkConnectors怎么連接MySql

這篇文章主要講解了“Flink Connectors怎么連接MySQL”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Flink Connectors怎么連接MySql”吧!

成都創(chuàng)新互聯(lián)公司主要從事成都網(wǎng)站設計、成都網(wǎng)站制作、網(wǎng)頁設計、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務。立足成都服務開福,十年網(wǎng)站建設經(jīng)驗,價格優(yōu)惠、服務專業(yè),歡迎來電咨詢建站服務:13518219792

通過使用Flink DataStream Connectors 數(shù)據(jù)流連接器連接到Mysql數(shù)據(jù)源,并基于JDBC提供數(shù)據(jù)流輸入與輸出操作

示例環(huán)境

java.version: 1.8.xflink.version: 1.11.1mysql:5.7.x

數(shù)據(jù)流輸入

DataStreamSource.java

package com.flink.examples.mysql;

import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCTableSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Description 將mysql表中數(shù)據(jù)查詢輸出到DataStream流中
 */
public class DataStreamSource {

    /**
     * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html
     */

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //查詢sql
        String sql = "SELECT id,name,age,sex,address,createTimeSeries FROM t_user";

        //設置表視圖字段與類型
        TableSchema tableSchema = TableSchema.builder()
                .field("id", DataTypes.INT())
                .field("name", DataTypes.STRING())
                .field("age", DataTypes.INT())
                .field("sex", DataTypes.INT())
                .field("address", DataTypes.STRING())
                //.field("createTime", DataTypes.TIMESTAMP())
                .field("createTimeSeries", DataTypes.BIGINT())
                .build();

        //配置jdbc數(shù)據(jù)源選項
        JDBCOptions jdbcOptions = JDBCOptions.builder()
                .setDriverName(MysqlConfig.DRIVER_CLASS)
                .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL)
                .setUsername(MysqlConfig.SOURCE_USER)
                .setPassword(MysqlConfig.SOURCE_PASSWORD)
                .setTableName("t_user")
                .build();
        JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build();

        //將數(shù)據(jù)源注冊到tableEnv視圖student中
        tEnv.registerTableSource("t_user", jdbcTableSource);
        Table table = tEnv.sqlQuery(sql);

        DataStream sourceStream = tEnv.toAppendStream(table, TUser.class);
        sourceStream.map((t)->new Gson().toJson(t)).print();

        env.execute("flink mysql source");
    }
}

數(shù)據(jù)流輸出

DataStreamSink.java

package com.flink.examples.mysql;

import com.flink.examples.TUser;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Description 將DataStream數(shù)據(jù)流插入到mysql表中
 */
public class DataStreamSink {

    /**
     * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html
     */

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(2000);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //查詢sql
        String sql = "insert into t_user (id,name,age,sex,address,createTimeSeries) values (?,?,?,?,?,?)";
        //封裝數(shù)據(jù)
        TUser user = new TUser();
        user.setId(0);
        user.setName("zhao1");
        user.setAge(22);
        user.setSex(1);
        user.setAddress("CN");
        user.setCreateTimeSeries(System.currentTimeMillis());
        DataStream sourceStream = env.fromElements(user);

        //從DataStream獲取數(shù)據(jù)
//        Expression id = ExpressionParser.parseExpression("id");
//        Expression name = ExpressionParser.parseExpression("name");
//        Expression age = ExpressionParser.parseExpression("age");
//        Expression sex = ExpressionParser.parseExpression("sex");
//        Expression address = ExpressionParser.parseExpression("address");
//        Expression createTimeSeries = ExpressionParser.parseExpression("createTimeSeries");
//        Table table = tEnv.fromDataStream(sourceStream, id, name, age, sex, address, createTimeSeries );
        Table table = tEnv.fromDataStream(sourceStream,$("id"),$("name"),$("age"),$("sex"),$("address"),$("createTimeSeries"));

        //輸出到mysql
        //設置表視圖字段與類型
        TableSchema tableSchema = TableSchema.builder()
                .field("id", DataTypes.INT())
                .field("name", DataTypes.STRING())
                .field("age", DataTypes.INT())
                .field("sex", DataTypes.INT())
                .field("address", DataTypes.STRING())
                //.field("createTime", DataTypes.TIMESTAMP())
                .field("createTimeSeries", DataTypes.BIGINT())
                .build();

        //設置sink輸出jdbc
        TableSink tableSink = JDBCAppendTableSink.builder()
                .setDrivername(MysqlConfig.DRIVER_CLASS)
                .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL)
                .setUsername(MysqlConfig.SOURCE_USER)
                .setPassword(MysqlConfig.SOURCE_PASSWORD)
                .setQuery(sql)
                .setParameterTypes(tableSchema.getFieldTypes())
                .setBatchSize(100)
                .build();

        //將數(shù)據(jù)源注冊到tableEnv視圖result中
        tEnv.registerTableSink("result",
                tableSchema.getFieldNames(),
                tableSchema.getFieldTypes(),
                tableSink);
        //在指定的路徑下注冊,然后執(zhí)行插入操作
        table.executeInsert("result");
    }
}

數(shù)據(jù)源配置類

MysqlConfig.java

package com.flink.examples.mysql;

/**
 * @Description Mysql數(shù)據(jù)庫連接配置
 */
public class MysqlConfig {

    public final static String DRIVER_CLASS="com.mysql.jdbc.Driver";

    public final static String SOURCE_DRIVER_URL="jdbc:mysql://127.0.0.1:3306/flink?useUnicode=true&characterEncoding=utf-8&useSSL=false";
    public final static String SOURCE_USER="root";
    public final static String SOURCE_PASSWORD="root";

}

數(shù)據(jù)展示

Flink Connectors怎么連接MySql

感謝各位的閱讀,以上就是“Flink Connectors怎么連接MySql”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對Flink Connectors怎么連接MySql這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關知識點的文章,歡迎關注!


網(wǎng)站欄目:FlinkConnectors怎么連接MySql
分享URL:http://www.xueling.net.cn/article/jgsjph.html

其他資訊

在線咨詢
服務熱線
服務熱線:028-86922220
TOP
主站蜘蛛池模板: 精品久久亚洲精品中文字幕 | 国产成人精品高清在线 | 蜜臀AV色欲A片无码精品一区 | 成年美女黄网站色大片 | 亚洲天天在线日亚洲洲精 | 久久久久久久久久久久久久免费看 | 最色网ww| 国产一区二区三区18 | 日韩日批| 欧美日韩亚洲免费 | 久久综合久色欧美综合狠狠 | 中国大陆一级毛片 | 中文在线一区二区 | 终极斗罗4第三季免费播放 免费无码成人片 | 99手机在线视频 | 久久精品国产精品亚洲综合 | 成人国产第一区在线观看 | 99精品视频在线观看免费 | 视频一区中文字幕精品 | mdmf.tv麻豆 | 97人妻人人澡人人爽国产一 | 国产精品夜夜春夜夜爽久久小 | 2020狠狠操 | 日本公妇被公侵犯中文字幕2 | 孕交VIDEOSGRATIS孕妇性欧美 | 成人国产精品2021 | 亚洲成人久久久久 | 中文字幕人成无码人妻 | 国产三级精品三级在 | 国产成人精品无码片区 | 一级做a爰片久久高潮 | 成人毛片大全 | 亚洲国产综合在线观看 | 久久久久国产一区二区三区 | 巨胸喷奶水视频WWW免费网站 | 久久男人AV资源网站无码软件 | 欧美大片一区二区三区 | 欧洲熟妇性色黄 | 免费观影入口看日本视频 | 无码区a毛片免费视频 | 欧美一区二区三区四区视频 |