Flink数据重分布
cookqq ›博客列表 ›flink

Flink数据重分布

2024-03-27 22:41:07.0|分类: flink|浏览量: 537

摘要: 将数据发送到下游算子的不同的并行子任务

shuffle():随机向下游的并行子任务发送数据。,这里的shume和之前keyBy的shumle不是一回事儿!

rebalance():将数据轮询发送到下游的所有并行子任务中。round-robin。

rescale():将数据轮询发送到下游的部分并行子任务中。用在下游算子的并行度是上游算子的并行度的整数倍的情况。    round-robin。

broadcast():将数据广播到下游的所有并行子任务中。

global():将所有数据发送到下游的第一个(索引为0)并行子任务中。

custom():自定义分区。可以自定义将某个key的数据发送到下游的哪一个并行子任务中去。


Flink系统参数演示例子:

package com.conca.flink.vedio.shujuchongfenbu;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MainSys {

	public static void main(String[] args) throws Exception {
			
			// 创建执行环境
			StreamExecutionEnvironment env = StreamExecutionEnvironment.
			getExecutionEnvironment();
	
//			env.fromElements(1,2,3,4,5,6,7,8)
//			.setParallelism(1)
//			.shuffle()//shuffle():随机向下游的并行子任务发送数据。,
//			.print("随机")
//			.setParallelism(4);
			
//			env.fromElements(1,2,3,4,5,6,7,8)
//			.setParallelism(1)
//			.rebalance() //rebalance():将数据轮询发送到下游的所有并行子任务中。round-robin。
//			.print("轮询")
//			.setParallelism(4);
			
//			env.fromElements(1,2,3,4,5,6,7,8)
//			.setParallelism(1)
//			.broadcast() //broadcast():将数据广播到下游的所有并行子任务中
//			.print("广播")
//			.setParallelism(4);
			
//			env.fromElements(1,2,3,4,5,6,7,8)
//			.setParallelism(1)
//			.global() //global():将所有数据发送到下游的第一个(索引为0)并行子任务中
//			.print("下游的第一个")
//			.setParallelism(4);
			
			env.fromElements(1,2,3,4,5,6,7,8)
			.setParallelism(1)
			.rebalance()
			.map(r->r)
			.setParallelism(2)
			.rescale() //rescale():将数据轮询发送到下游的部分并行子任务中。
			用在下游算子的并行度是上游算子的并行度的整数倍的情况。round-robin。
			.print("轮询发送到下游的部分")
			.setParallelism(4);
			
			env.execute();
			
			
			
	}
}


自定义例子:


package com.conca.flink.vedio.shujuchongfenbu;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MainCustom {

	public static void main(String[] args) throws Exception {
			
			// 创建执行环境
			StreamExecutionEnvironment env = StreamExecutionEnvironment.
			getExecutionEnvironment();
	
			
			env.fromElements(1,2,3,4,5,6,7,8)
			.setParallelism(1)
			.partitionCustom(new Partitioner<Integer>() {
				// key是Integer类型
				//指定某个key的数据要发送到哪一个索引的并行子任务
				@Override
				public int partition(Integer key, int numPartitions) {
					if(key ==0 || key==1) {
						return 0;
					}else {
						return 1;
					}
				}
				
			}, new KeySelector<Integer, Integer>() {
				@Override
				public Integer getKey(Integer value) throws Exception {
					return value%3;
				}
			})
			.print("自定义")
			.setParallelism(4);
			
			env.execute();
			
			
			
	}
}



一键分享文章

分类列表

  • • struts源码分析
  • • flink
  • • struts
  • • redis
  • • kafka
  • • ubuntu
  • • zookeeper
  • • hadoop
  • • activiti
  • • linux
  • • 成长
  • • NIO
  • • 关键词提取
  • • mysql
  • • android studio
  • • zabbix
  • • 云计算
  • • mahout
  • • jmeter
  • • hive
  • • ActiveMQ
  • • lucene
  • • MongoDB
  • • netty
  • • flume
  • • 我遇到的问题
  • • GRUB
  • • nginx
  • • 大家好的文章
  • • android
  • • tomcat
  • • Python
  • • luke
  • • android源码编译
  • • 安全
  • • MPAndroidChart
  • • swing
  • • POI
  • • powerdesigner
  • • jquery
  • • html
  • • java
  • • eclipse
  • • shell
  • • jvm
  • • highcharts
  • • 设计模式
  • • 列式数据库
  • • spring cloud
  • • docker+node.js+zookeeper构建微服务
版权所有 cookqq 感谢访问 支持开源 京ICP备15030920号
CopyRight 2015-2018 cookqq.com All Right Reserved.