文档
doc
docker 安装
docker pull flink
docker-compose.yml
version: "2.1"
services:
jobmanager:
image: flink
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
docker-compose build
docker-compose up -d --force-recreate
http://192.168.203.128:8081
java demo
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.6</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
批处理
package com.zongkx.batch;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.Arrays;
@Slf4j
public class Batch {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> dataSource = env.readTextFile("src/main/resources/hello.txt");
FlatMapOperator<String, Tuple2<String, Integer>> flatMap = dataSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
}).returns(Types.TUPLE(Types.STRING,Types.INT));
AggregateOperator<Tuple2<String, Integer>> sum = flatMap.groupBy(0).sum(1);
sum.print();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
有界流
package com.zongkx.stream;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class BoundedStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment boundedStream = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = boundedStream.readTextFile("src/main/resources/hello.txt");
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = streamSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
}).returns(Types.TUPLE(Types.STRING, Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatMap.keyBy(0).sum(1);
sum.print("test");
boundedStream.execute();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
无界流
package com.zongkx.stream;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class UnboundedStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 9000);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(0).sum(1);
sum.print("test");
env.execute();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38