下载 Flink
在 flink 官网上 下载 旧版本:
1
| wget https://archive.apache.org/dist/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.12.tgz
|
注意:
1、最新的 flink 1.12.2 版本不支持windows,下载后在其bin目录下没找到启动的批处理。
2、flink 1.9.3 和 flink 1.10.0经过测试都可以win10下启动(flink 1.10.3也没有windows下启动的批处理文件)
启动服务
解压缩上面下载的包,直接在bin目录下运行 start-cluster.bat 即可:
1 2
| cd flink-1.9.3 bin\start-cluster.bat
|
正常启动后会打开两个命令行窗口,此时在浏览器中访问 http://localhost:8081 即可看到 flink 的 UI 界面。
注意:
1、如果是 flink-1.10.0版本,启动后一开始显示两个命令行窗口,然后很快其中一个会被关掉,在界面上看不到任何 Task Manager 信息。
2、参考 Window10安装Flink1.10.0-大坑 的尝试,不建议大家在win10下使用这个版本。
创建 Flink 项目
1、通过如下 maven 命令创建 flink项目:
1
| mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.0
|
2、启动 Idea 导入 上面创建的 mvn 项目,自动创建的目录结构可能如下:
1 2 3 4 5 6
| --src --main --java --com.ice.stream BatchJob.java StreamingJob.java
|
运行 WordCount 代码(批处理)
1、新增如下 WordCount 代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector;
public class WordCount { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> source = env.fromElements("I love Beijing", "I love China", "Beijing is the capital of China"); source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.split(" "); for (String w : words) { out.collect(new Tuple2<String, Integer>(w, 1)); } } }).groupBy(0).sum(1).print(); env.execute("Java WordCount from Example"); } }
|
2、运行:有两种方式,一种是IDE内运行,直接在 WordCount.java 上单击右键运行:
注意:因为 pom.xml 中对 flink 的依赖包设定了 provided scope,如果直接在 IDE 中运行会报找不到 Flink 相关类的错误,所以需要编辑启动配置,选中 ‘Include dependencies with “Provided” scope’ 即可:
1 2 3 4 5 6 7 8 9 10 11 12
| ><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
|
3、另外一种方式是打包后提交到 flink ui 上运行:通过如下命令打包
生成的 flink-1.0-SNAPSHOT.jar 大小约 10k (不带 flink core依赖),在 UI 界面上点击 “submit new job”,点击”add new”,选择上一步打好的包。然后单击 submit 提交,即可在 jobs 页面查看正在运行的 job。
问题:win10下 task manager 的Stdout一直没有输出,但是可以在 task manager的命令行窗口看到输出。原因大致是因为平台差异所致。(参考 windows下flink 8081页面taskmanager无输出 的说法,flink 在 linux 下启动就没有这种问题。
运行 StreamWordCount 代码(流处理)
1、新增如下 StreamWordCount 代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;
public class StreamWordCount { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>"); return; } String hostname = args[0]; Integer port = Integer.parseInt(args[1]); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> stream = env.socketTextStream(hostname, port); SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter()) .keyBy(0) .sum(1); sum.print(); env.execute("Java WordCount from SocketTextStream Example"); } public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) { String[] tokens = s.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { collector.collect(new Tuple2<String, Integer>(token, 1)); } } } } }
|
2、通过 nc 开启 9999 端口:
1
| netcat-win32-1.12\nc -L -p 9999
|
3、重新打包 flink 应用,通过如下命令启动流处理:
1 2
| set FLINK_CONF_DIR=flink-1.9.3\conf flink-1.9.3\bin\flink.bat run -c com.ice.stream.StreamWordCount flink\target\flink-1.0-SNAPSHOT.jar 127.0.0.1 9999
|
4、在 nc 开的窗口中输入随机字符串,即可在 flink 的 task manager的命令行窗口看到流处理统计的结果。例如当输入多行记录a,b,c,a,a,b,显示如下:
其他
1、win10 下安装 nc:直接在如下地址下载
1
| https://eternallybored.org/misc/netcat/
|
2、注意:下载后如果有安装360,会报检测到病毒并直接删除。需要关闭后才能正常下载。