2024-03-27 22:41:07.0|分类: flink|浏览量: 302
shuffle():随机向下游的并行子任务发送数据。,这里的shume和之前keyBy的shumle不是一回事儿! rebalance():将数据轮询发送到下游的所有并行子任务中。round-robin。 rescale():将数据轮询发送到下游的部分并行子任务中。用在下游算子的并行度是上游算子的并行度的整数倍的情况。 round-robin。 broadcast():将数据广播到下游的所有并行子任务中。 global():将所有数据发送到下游的第一个(索引为0)并行子任务中。 custom():自定义分区。可以自定义将某个key的数据发送到下游的哪一个并行子任务中去。 Flink系统参数演示例子: package com.conca.flink.vedio.shujuchongfenbu; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class MainSys { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); // env.fromElements(1,2,3,4,5,6,7,8) // .setParallelism(1) // .shuffle()//shuffle():随机向下游的并行子任务发送数据。, // .print("随机") // .setParallelism(4); // env.fromElements(1,2,3,4,5,6,7,8) // .setParallelism(1) // .rebalance() //rebalance():将数据轮询发送到下游的所有并行子任务中。round-robin。 // .print("轮询") // .setParallelism(4); // env.fromElements(1,2,3,4,5,6,7,8) // .setParallelism(1) // .broadcast() //broadcast():将数据广播到下游的所有并行子任务中 // .print("广播") // .setParallelism(4); // env.fromElements(1,2,3,4,5,6,7,8) // .setParallelism(1) // .global() //global():将所有数据发送到下游的第一个(索引为0)并行子任务中 // .print("下游的第一个") // .setParallelism(4); env.fromElements(1,2,3,4,5,6,7,8) .setParallelism(1) .rebalance() .map(r->r) .setParallelism(2) .rescale() //rescale():将数据轮询发送到下游的部分并行子任务中。 用在下游算子的并行度是上游算子的并行度的整数倍的情况。round-robin。 .print("轮询发送到下游的部分") .setParallelism(4); env.execute(); } } 自定义例子: package com.conca.flink.vedio.shujuchongfenbu; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class MainCustom { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); env.fromElements(1,2,3,4,5,6,7,8) .setParallelism(1) .partitionCustom(new Partitioner<Integer>() { // key是Integer类型 //指定某个key的数据要发送到哪一个索引的并行子任务 @Override public int partition(Integer key, int numPartitions) { if(key ==0 || key==1) { return 0; }else { return 1; } } }, new KeySelector<Integer, Integer>() { @Override public Integer getKey(Integer value) throws Exception { return value%3; } }) .print("自定义") .setParallelism(4); env.execute(); } } |