Win10下Flink探索(1)

文章目录
  1. 下载 Flink
  2. 启动服务
  3. 创建 Flink 项目
  4. 运行 WordCount 代码(批处理)
  5. 运行 StreamWordCount 代码(流处理)
  6. 其他

下载 Flink

在 flink 官网上 下载 旧版本:

1
wget https://archive.apache.org/dist/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.12.tgz

注意:
1、最新的 flink 1.12.2 版本不支持windows,下载后在其bin目录下没找到启动的批处理。
2、flink 1.9.3 和 flink 1.10.0经过测试都可以win10下启动(flink 1.10.3也没有windows下启动的批处理文件)

启动服务

解压缩上面下载的包,直接在bin目录下运行 start-cluster.bat 即可:

1
2
cd flink-1.9.3
bin\start-cluster.bat

正常启动后会打开两个命令行窗口,此时在浏览器中访问 http://localhost:8081 即可看到 flink 的 UI 界面。

注意:
1、如果是 flink-1.10.0版本,启动后一开始显示两个命令行窗口,然后很快其中一个会被关掉,在界面上看不到任何 Task Manager 信息。
2、参考 Window10安装Flink1.10.0-大坑 的尝试,不建议大家在win10下使用这个版本。

创建 Flink 项目

1、通过如下 maven 命令创建 flink项目:

1
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.0

2、启动 Idea 导入 上面创建的 mvn 项目,自动创建的目录结构可能如下:

1
2
3
4
5
6
--src
--main
--java
--com.ice.stream # 上一步创建时自定义的 groupId
BatchJob.java
StreamingJob.java

运行 WordCount 代码(批处理)

1、新增如下 WordCount 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> source = env.fromElements("I love Beijing", "I love China", "Beijing is the capital of China");
source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String w : words) {
out.collect(new Tuple2<String, Integer>(w, 1));
}
}
}).groupBy(0).sum(1).print();
env.execute("Java WordCount from Example");
}
}

2、运行:有两种方式,一种是IDE内运行,直接在 WordCount.java 上单击右键运行:

注意:因为 pom.xml 中对 flink 的依赖包设定了 provided scope,如果直接在 IDE 中运行会报找不到 Flink 相关类的错误,所以需要编辑启动配置,选中 ‘Include dependencies with “Provided” scope’ 即可:

1
2
3
4
5
6
7
8
9
10
11
12
><dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

3、另外一种方式是打包后提交到 flink ui 上运行:通过如下命令打包

1
mvn clean package

生成的 flink-1.0-SNAPSHOT.jar 大小约 10k (不带 flink core依赖),在 UI 界面上点击 “submit new job”,点击”add new”,选择上一步打好的包。然后单击 submit 提交,即可在 jobs 页面查看正在运行的 job。

问题:win10下 task manager 的Stdout一直没有输出,但是可以在 task manager的命令行窗口看到输出。原因大致是因为平台差异所致。(参考 windows下flink 8081页面taskmanager无输出 的说法,flink 在 linux 下启动就没有这种问题。

运行 StreamWordCount 代码(流处理)

1、新增如下 StreamWordCount 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
public static void main(String[] args) throws Exception {
//参数检查
if (args.length != 2) {
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
return;
}
String hostname = args[0];
Integer port = Integer.parseInt(args[1]);
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据
DataStreamSource<String> stream = env.socketTextStream(hostname, port);
//计数
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
.keyBy(0)
.sum(1);
sum.print();
env.execute("Java WordCount from SocketTextStream Example");
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
String[] tokens = s.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}

2、通过 nc 开启 9999 端口:

1
netcat-win32-1.12\nc -L -p 9999

3、重新打包 flink 应用,通过如下命令启动流处理:

1
2
set FLINK_CONF_DIR=flink-1.9.3\conf
flink-1.9.3\bin\flink.bat run -c com.ice.stream.StreamWordCount flink\target\flink-1.0-SNAPSHOT.jar 127.0.0.1 9999

4、在 nc 开的窗口中输入随机字符串,即可在 flink 的 task manager的命令行窗口看到流处理统计的结果。例如当输入多行记录a,b,c,a,a,b,显示如下:

1
2
3
(a,3)
(b,2)
(c,1)

其他

1、win10 下安装 nc:直接在如下地址下载

1
https://eternallybored.org/misc/netcat/

2、注意:下载后如果有安装360,会报检测到病毒并直接删除。需要关闭后才能正常下载。