<!--flink core  1.14.3 -->
<!-- 如果将程序作为 Maven 项目开发,则必须添加 flink-clients 模块的依赖 必须要有 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.12</artifactId>
  <version>${flink-version}</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>${flink-version}</version>
</dependency>

<!--flink stream java-->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.12</artifactId>
  <version>${flink-version}</version>
</dependency>
<!--kafka-->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.12</artifactId>
  <version>${flink-version}</version>
</dependency>

<!-- 写入数据到clickhouse -->
<dependency>
  <groupId>ru.yandex.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <version>0.1.54</version>
        </dependency>
@SneakyThrows
public static void main(String[] args) {
    
    // StreamExecutionEnvironment用于设置你的执行环境。任务执行环境用于定义任务的属性,创建数据源以及最终启动任务的执行。
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    
    // 配置kafka信息
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "192.168.203.128:9092");
    properties.setProperty("group.id", "test");
    
    
    // 得到 kafka 实例
    FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("111", new SimpleStringSchema(), properties);
    // 尽可能从最早的记录开始
    //        myConsumer.setStartFromEarliest();
    // 从最新的记录开始
    myConsumer.setStartFromLatest();
    // 从指定的时间开始(毫秒)
    // myConsumer.setStartFromTimestamp();
    // myConsumer.setStartFromGroupOffsets(); // 默认的方法
    
    // 添加数据源
    DataStream<String> stream = env.addSource(myConsumer).setParallelism(1);
    
    SingleOutputStreamOperator<User> dataStream = stream.map(new MapFunction<String, User>() {
        @Override
        public User map(String data) throws Exception {
            String[] split = data.split(",");
            return User.of((split[0]), split[1], (split[2]));
        }
    });
    // sink
    J_MyClickHouseUtil jdbcSink = new J_MyClickHouseUtil("INSERT INTO default.my3  VALUES (?,?,?)");
    dataStream.addSink(jdbcSink);
    dataStream.print();
    env.execute("clickhouse sink test");
    env.execute("print-kafka-info");
    
}

public class J_MyClickHouseUtil extends RichSinkFunction<User> {
    Connection connection = null;
    
    String sql;
    
    public J_MyClickHouseUtil(String sql) {
        this.sql = sql;
    }
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = ClickHouseUtil.getConn("192.168.203.128", 8123, "default");
    }
    
    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
    }
    
    @Override
    public void invoke(User user, Context context) throws Exception {
        PreparedStatement preparedStatement = connection.prepareStatement(sql);
        preparedStatement.setString(1, user.id);
        preparedStatement.setString(2, user.name);
        preparedStatement.setString(3, user.age);
        preparedStatement.addBatch();
        
        long startTime = System.currentTimeMillis();
        int[] ints = preparedStatement.executeBatch();
        connection.commit();
        long endTime = System.currentTimeMillis();
        System.out.println("批量插入完毕用时:" + (endTime - startTime) + " -- 插入数据 = " + ints.length);
    }
}

public class ClickHouseUtil {
    private static Connection connection;
    
    public static Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
        String  address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;
        connection = DriverManager.getConnection(address);
        return connection;
    }
    
    public static Connection getConn(String host, int port) throws SQLException, ClassNotFoundException {
        return getConn(host,port,"default");
    }
    public static Connection getConn() throws SQLException, ClassNotFoundException {
        return getConn("192.168.203.128",8123);
    }
    public void close() throws SQLException {
        connection.close();
    }
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
    public String id;
    public String name;
    public String age;
    public static User of(String id, String name, String age) {
        return new User(id, name, age);
    }
}

topic=111&message=1%2C2222%2C1

create table my3( id String, name String, age Int )ENGINE = MergeTree PARTITION BY id PRIMARY KEY id