使用 Java 编写 Flink 程序通常涉及以下几个步骤:
- 设置环境:首先需要创建一个执行环境(ExecutionEnvironment 或 StreamExecutionEnvironment),这是 Flink 程序的基础。
- 定义数据源:指定数据的来源,可以是文件、Kafka、Socket 等。
- 转换操作:对数据流进行各种转换操作,如 map、flatMap、filter 等。
- 定义数据汇:指定数据的输出位置,可以是文件、数据库、Kafka 等。
- 执行程序:调用执行环境的 execute 方法,启动 Flink 作业。
DataStream API 用于处理无界数据流(streaming data)的编程接口
核心概念:
- DataStream:代表数据流,是 DataStream API 的基本数据类型。
- Transformation:对数据流进行操作的各种方法,如 map, flatMap, filter, keyBy, window, reduce 等。
- Sink:数据流的终点,处理后的数据可以输出到各种存储系统或外部服务。
- Source:数据流的起点,从外部系统(如消息队列或文件系统)读取数据。
关键操作:
- map / flatMap:对每个元素进行转换,flatMap 可能产生零个或多个结果。
- filter:过滤数据流,保留满足条件的元素。
- keyBy:基于键进行分区。
- window:定义窗口操作,结合 keyBy 使用以实现窗口聚合。
- reduce / aggregate:在窗口或整个流上进行聚合计算。
- connect / union:将两个流连接起来或者合并流。
结合编写步骤,简单编写一个示例
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- public class FlinkStreamJob {
- public static void main(String[] args) throws Exception {
- // 设置执行环境
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // 创建一个数据流 source,这里使用集合作为示例数据源
- DataStream
dataStream = env.fromElements(1L, 2L, 3L, 4L, 5L); -
- // 对数据流进行转换操作,例如映射每个元素到它的两倍
- DataStream
multiplied = dataStream.map(new MapFunction() { - @Override
- public Long map(Long value) throws Exception {
- return value * 2L;
- }
- });
-
- // 打印结果到标准输出(为了简单起见)
- multiplied.print();
-
- // 执行程序
- env.execute("Flink Job Demo");
- }
- }
运行结果
5> 10
4> 8
3> 6
1> 2
2> 4
DataSet API 用于处理有界数据集(batch data)的编程接口
核心概念:
- DataSet:代表一个数据集,是 DataSet API 的基本数据类型。
- Transformation:对数据集进行各种操作的函数,如 map, flatMap, reduce, groupBy, join 等。
- Execution Environment:批处理程序的入口,通过 ExecutionEnvironment 对象来配置和执行作业。
关键操作:
- map / flatMap:对每个元素进行转换,flatMap 可以产生零个或多个结果。
- filter:过滤数据集,保留满足条件的元素。
- groupBy:根据键进行分组,以便后续的聚合操作。
- reduce:对分组后的数据执行归约操作。
- join:连接两个数据集。
- aggregate:在数据集或分组上执行聚合操作。
结合编写步骤,简单编写一个示例
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.AggregateOperator;
- import org.apache.flink.api.java.operators.DataSource;
- import org.apache.flink.api.java.operators.FlatMapOperator;
- import org.apache.flink.api.java.operators.UnsortedGrouping;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.util.Collector;
-
- public class BatchWordCount {
- public static void main(String[] args) throws Exception {
- // 1、创建执行环境
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- // 2、从环境中读取数据
- DataSource
lineDataSource = env.readTextFile("input/words.txt"); - // 3、将每行数据进行分词,转化成二元组类型 扁平映射
- FlatMapOperator
> wordAndOneTuple = lineDataSource.flatMap((String line, Collector> out) -> { - // 将每行文本进行拆分
- String[] words = line.split(" ");
- // 将每个单词转化成二元组
- for (String word : words) {
- out.collect(Tuple2.of(word, 1L));
- }
- }).returns(Types.TUPLE(Types.STRING, Types.LONG));
- // 4、按照word进行分组
- UnsortedGrouping
> wordAndOneGroup = wordAndOneTuple.groupBy(0); - // 5、分组内进行聚合统计
- AggregateOperator
> sum = wordAndOneGroup.sum(1); - // 6、打印结果
- sum.print();
-
- }
- }
运行结果(取决于你在words.txt中输入了哪些内容)
- (nanjing,1)
- (wuhu,1)
- (china,1)
- (hello,4)
- (word,1)
PS:以上内容如有雷同,纯属巧合
评论记录:
回复评论: