2024-03-27 22:41:07.0|分类: flink|浏览量: 717
|
shuffle():随机向下游的并行子任务发送数据。,这里的shume和之前keyBy的shumle不是一回事儿! rebalance():将数据轮询发送到下游的所有并行子任务中。round-robin。 rescale():将数据轮询发送到下游的部分并行子任务中。用在下游算子的并行度是上游算子的并行度的整数倍的情况。 round-robin。 broadcast():将数据广播到下游的所有并行子任务中。 global():将所有数据发送到下游的第一个(索引为0)并行子任务中。 custom():自定义分区。可以自定义将某个key的数据发送到下游的哪一个并行子任务中去。 Flink系统参数演示例子: package com.conca.flink.vedio.shujuchongfenbu;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MainSys {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();
// env.fromElements(1,2,3,4,5,6,7,8)
// .setParallelism(1)
// .shuffle()//shuffle():随机向下游的并行子任务发送数据。,
// .print("随机")
// .setParallelism(4);
// env.fromElements(1,2,3,4,5,6,7,8)
// .setParallelism(1)
// .rebalance() //rebalance():将数据轮询发送到下游的所有并行子任务中。round-robin。
// .print("轮询")
// .setParallelism(4);
// env.fromElements(1,2,3,4,5,6,7,8)
// .setParallelism(1)
// .broadcast() //broadcast():将数据广播到下游的所有并行子任务中
// .print("广播")
// .setParallelism(4);
// env.fromElements(1,2,3,4,5,6,7,8)
// .setParallelism(1)
// .global() //global():将所有数据发送到下游的第一个(索引为0)并行子任务中
// .print("下游的第一个")
// .setParallelism(4);
env.fromElements(1,2,3,4,5,6,7,8)
.setParallelism(1)
.rebalance()
.map(r->r)
.setParallelism(2)
.rescale() //rescale():将数据轮询发送到下游的部分并行子任务中。
用在下游算子的并行度是上游算子的并行度的整数倍的情况。round-robin。
.print("轮询发送到下游的部分")
.setParallelism(4);
env.execute();
}
}自定义例子: package com.conca.flink.vedio.shujuchongfenbu;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MainCustom {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();
env.fromElements(1,2,3,4,5,6,7,8)
.setParallelism(1)
.partitionCustom(new Partitioner<Integer>() {
// key是Integer类型
//指定某个key的数据要发送到哪一个索引的并行子任务
@Override
public int partition(Integer key, int numPartitions) {
if(key ==0 || key==1) {
return 0;
}else {
return 1;
}
}
}, new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws Exception {
return value%3;
}
})
.print("自定义")
.setParallelism(4);
env.execute();
}
} |
