什么是Flink富函数(算子生命周期)?
cookqq ›博客列表 ›flink

什么是Flink富函数(算子生命周期)?

2024-04-02 20:43:43.0|分类: flink|浏览量: 666

摘要: 富函数(Rich Functions) 是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于, 可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

算子的每一个并行子任务都有自己的生命周期

·open方法:在算子的计算逻辑执行前执行一次,适合做一些初始化的工作(打开一个文件,打开一个网络连接,

        打开一个数据库的连接)。生命周期的开始。

·close方法:在算子的计算逻辑执行完毕之后执行一次,适合做一些清理工作。(关闭一个文件,关闭网络连接,

        关闭数据库连接)。生命周期的结束。·

·getRuntimeContext()方法:用来获取算子的并行子任务的一些上下文信息。比如当前算子的并行子任务的索引等等。


举一些例子

MapFunction >RichMapFunction

FilterFunction > RichFilterFunction。

FlatMapFunetion > RichFlatMapFunction。

ReduceFunetion > RichReduceFunction

SourceFunction > RichSourceFunction

SinkFunetion > RichSinkFunetion


RichMapFunctione测试用例:

public class TestRichMapFunction {
	public static void main(String[] args) throws Exception {
		
		// 创建执行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.
				getExecutionEnvironment();

		env.fromElements(1,2,3,4,5,6,7,8)
		.setParallelism(1)
		.map(new RichMapFunction<Integer, String>() {
			@Override
			public void open(Configuration parameters) throws Exception {
				System.out.println("map的并行子任务的索引是:"+ 
						getRuntimeContext().getIndexOfThisSubtask()+
						",生命周期开始。");
			}

			@Override
			public void close() throws Exception {
				System.out.println("map的并行子任务的索引是:"+ 
						getRuntimeContext().getIndexOfThisSubtask()+
						",生命周期结束。");
			}

			@Override
			public String map(Integer value) throws Exception {
				System.out.println("map的并行子任务的索引是:"+ 
						getRuntimeContext().getIndexOfThisSubtask()+",处理数据是:"+
						value);
				return value+"";
			}
		}).setParallelism(2)
		.print("生命周期测试")
		.setParallelism(2);
		env.execute();
	}
}

输出结果:

map的并行子任务的索引是:0,生命周期开始。

map的并行子任务的索引是:1,生命周期开始。

map的并行子任务的索引是:1,处理数据是:1

map的并行子任务的索引是:0,处理数据是:2

生命周期测试:2> 1

生命周期测试:1> 2

map的并行子任务的索引是:1,处理数据是:3

生命周期测试:2> 3

map的并行子任务的索引是:1,处理数据是:5

map的并行子任务的索引是:0,处理数据是:4

生命周期测试:2> 5

生命周期测试:1> 4

map的并行子任务的索引是:1,处理数据是:7

map的并行子任务的索引是:0,处理数据是:6

生命周期测试:2> 7

生命周期测试:1> 6

map的并行子任务的索引是:0,处理数据是:8

生命周期测试:1> 8

map的并行子任务的索引是:1,生命周期结束。

map的并行子任务的索引是:0,生命周期结束。


一键分享文章

分类列表

  • • struts源码分析
  • • flink
  • • struts
  • • redis
  • • kafka
  • • ubuntu
  • • zookeeper
  • • hadoop
  • • activiti
  • • linux
  • • 成长
  • • NIO
  • • 关键词提取
  • • mysql
  • • android studio
  • • zabbix
  • • 云计算
  • • mahout
  • • jmeter
  • • hive
  • • ActiveMQ
  • • lucene
  • • MongoDB
  • • netty
  • • flume
  • • 我遇到的问题
  • • GRUB
  • • nginx
  • • 大家好的文章
  • • android
  • • tomcat
  • • Python
  • • luke
  • • android源码编译
  • • 安全
  • • MPAndroidChart
  • • swing
  • • POI
  • • powerdesigner
  • • jquery
  • • html
  • • java
  • • eclipse
  • • shell
  • • jvm
  • • highcharts
  • • 设计模式
  • • 列式数据库
  • • spring cloud
  • • docker+node.js+zookeeper构建微服务
版权所有 cookqq 感谢访问 支持开源 京ICP备15030920号
CopyRight 2015-2018 cookqq.com All Right Reserved.