2024-04-02 20:43:43.0|分类: flink|浏览量: 303
算子的每一个并行子任务都有自己的生命周期 ·open方法:在算子的计算逻辑执行前执行一次,适合做一些初始化的工作(打开一个文件,打开一个网络连接, 打开一个数据库的连接)。生命周期的开始。 ·close方法:在算子的计算逻辑执行完毕之后执行一次,适合做一些清理工作。(关闭一个文件,关闭网络连接, 关闭数据库连接)。生命周期的结束。· ·getRuntimeContext()方法:用来获取算子的并行子任务的一些上下文信息。比如当前算子的并行子任务的索引等等。 举一些例子 MapFunction >RichMapFunction FilterFunction > RichFilterFunction。 FlatMapFunetion > RichFlatMapFunction。 ReduceFunetion > RichReduceFunction SourceFunction > RichSourceFunction SinkFunetion > RichSinkFunetion RichMapFunctione测试用例: public class TestRichMapFunction { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); env.fromElements(1,2,3,4,5,6,7,8) .setParallelism(1) .map(new RichMapFunction<Integer, String>() { @Override public void open(Configuration parameters) throws Exception { System.out.println("map的并行子任务的索引是:"+ getRuntimeContext().getIndexOfThisSubtask()+ ",生命周期开始。"); } @Override public void close() throws Exception { System.out.println("map的并行子任务的索引是:"+ getRuntimeContext().getIndexOfThisSubtask()+ ",生命周期结束。"); } @Override public String map(Integer value) throws Exception { System.out.println("map的并行子任务的索引是:"+ getRuntimeContext().getIndexOfThisSubtask()+",处理数据是:"+ value); return value+""; } }).setParallelism(2) .print("生命周期测试") .setParallelism(2); env.execute(); } } 输出结果: map的并行子任务的索引是:0,生命周期开始。 map的并行子任务的索引是:1,生命周期开始。 map的并行子任务的索引是:1,处理数据是:1 map的并行子任务的索引是:0,处理数据是:2 生命周期测试:2> 1 生命周期测试:1> 2 map的并行子任务的索引是:1,处理数据是:3 生命周期测试:2> 3 map的并行子任务的索引是:1,处理数据是:5 map的并行子任务的索引是:0,处理数据是:4 生命周期测试:2> 5 生命周期测试:1> 4 map的并行子任务的索引是:1,处理数据是:7 map的并行子任务的索引是:0,处理数据是:6 生命周期测试:2> 7 生命周期测试:1> 6 map的并行子任务的索引是:0,处理数据是:8 生命周期测试:1> 8 map的并行子任务的索引是:1,生命周期结束。 map的并行子任务的索引是:0,生命周期结束。 |