首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

Flink与java

  • 25-02-20 12:01
  • 3862
  • 9356
blog.csdn.net

使用 Java 编写 Flink 程序通常涉及以下几个步骤:

  1. 设置环境:首先需要创建一个执行环境(ExecutionEnvironment 或 StreamExecutionEnvironment),这是 Flink 程序的基础。
  2. 定义数据源:指定数据的来源,可以是文件、Kafka、Socket 等。
  3. 转换操作:对数据流进行各种转换操作,如 map、flatMap、filter 等。
  4. 定义数据汇:指定数据的输出位置,可以是文件、数据库、Kafka 等。
  5. 执行程序:调用执行环境的 execute 方法,启动 Flink 作业。

DataStream API 用于处理无界数据流(streaming data)的编程接口

核心概念:

  • DataStream:代表数据流,是 DataStream API 的基本数据类型。
  • Transformation:对数据流进行操作的各种方法,如 map, flatMap, filter, keyBy, window, reduce 等。
  • Sink:数据流的终点,处理后的数据可以输出到各种存储系统或外部服务。
  • Source:数据流的起点,从外部系统(如消息队列或文件系统)读取数据。

关键操作:

  • map / flatMap:对每个元素进行转换,flatMap 可能产生零个或多个结果。
  • filter:过滤数据流,保留满足条件的元素。
  • keyBy:基于键进行分区。
  • window:定义窗口操作,结合 keyBy 使用以实现窗口聚合。
  • reduce / aggregate:在窗口或整个流上进行聚合计算。
  • connect / union:将两个流连接起来或者合并流。

结合编写步骤,简单编写一个示例

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. public class FlinkStreamJob {
  5. public static void main(String[] args) throws Exception {
  6. // 设置执行环境
  7. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8. // 创建一个数据流 source,这里使用集合作为示例数据源
  9. DataStream dataStream = env.fromElements(1L, 2L, 3L, 4L, 5L);
  10. // 对数据流进行转换操作,例如映射每个元素到它的两倍
  11. DataStream multiplied = dataStream.map(new MapFunction() {
  12. @Override
  13. public Long map(Long value) throws Exception {
  14. return value * 2L;
  15. }
  16. });
  17. // 打印结果到标准输出(为了简单起见)
  18. multiplied.print();
  19. // 执行程序
  20. env.execute("Flink Job Demo");
  21. }
  22. }

运行结果

5> 10
4> 8
3> 6
1> 2
2> 4

DataSet API 用于处理有界数据集(batch data)的编程接口

核心概念:

  • DataSet:代表一个数据集,是 DataSet API 的基本数据类型。
  • Transformation:对数据集进行各种操作的函数,如 map, flatMap, reduce, groupBy, join 等。
  • Execution Environment:批处理程序的入口,通过 ExecutionEnvironment 对象来配置和执行作业。

关键操作:

  • map / flatMap:对每个元素进行转换,flatMap 可以产生零个或多个结果。
  • filter:过滤数据集,保留满足条件的元素。
  • groupBy:根据键进行分组,以便后续的聚合操作。
  • reduce:对分组后的数据执行归约操作。
  • join:连接两个数据集。
  • aggregate:在数据集或分组上执行聚合操作。

结合编写步骤,简单编写一个示例 

  1. import org.apache.flink.api.common.typeinfo.Types;
  2. import org.apache.flink.api.java.ExecutionEnvironment;
  3. import org.apache.flink.api.java.operators.AggregateOperator;
  4. import org.apache.flink.api.java.operators.DataSource;
  5. import org.apache.flink.api.java.operators.FlatMapOperator;
  6. import org.apache.flink.api.java.operators.UnsortedGrouping;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.util.Collector;
  9. public class BatchWordCount {
  10. public static void main(String[] args) throws Exception {
  11. // 1、创建执行环境
  12. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  13. // 2、从环境中读取数据
  14. DataSource lineDataSource = env.readTextFile("input/words.txt");
  15. // 3、将每行数据进行分词,转化成二元组类型 扁平映射
  16. FlatMapOperator> wordAndOneTuple = lineDataSource.flatMap((String line, Collector> out) -> {
  17. // 将每行文本进行拆分
  18. String[] words = line.split(" ");
  19. // 将每个单词转化成二元组
  20. for (String word : words) {
  21. out.collect(Tuple2.of(word, 1L));
  22. }
  23. }).returns(Types.TUPLE(Types.STRING, Types.LONG));
  24. // 4、按照word进行分组
  25. UnsortedGrouping> wordAndOneGroup = wordAndOneTuple.groupBy(0);
  26. // 5、分组内进行聚合统计
  27. AggregateOperator> sum = wordAndOneGroup.sum(1);
  28. // 6、打印结果
  29. sum.print();
  30. }
  31. }

运行结果(取决于你在words.txt中输入了哪些内容)

  1. (nanjing,1)
  2. (wuhu,1)
  3. (china,1)
  4. (hello,4)
  5. (word,1)

PS:以上内容如有雷同,纯属巧合

注:本文转载自blog.csdn.net的抬杠超人的文章"https://blog.csdn.net/jaylentao/article/details/145516301"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

111
大数据
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top