Flink实现值累积,每次输出结果
cookqq ›博客列表 ›flink

Flink实现值累积,每次输出结果

2024-04-11 21:27:40.0|分类: flink|浏览量: 460

摘要: 数值随机产生,用flink进行数值累积进行打印,每次输出结果,输出结果日志会很频繁

   

package com.conca.flink.source;

import java.util.Random;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class IntSource implements SourceFunction<Integer>{
	
	private boolean running = true;
	private Random random =new Random();
	
	@Override
	public void cancel(){
		running = false;
	}

	@Override
	public void run(SourceContext<Integer> ctx) throws Exception {
		while(running){
			ctx.collect(random.nextInt(1000));
			Thread.sleep( 1000L);
		}
	}
}

package com.conca.flink.vedio.keyedProcessFunction;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import com.conca.flink.source.IntSource;
import com.conca.flink.vedio.keyBy.StatisticsBean;

/**
 * 针对keyBy之后的键控流(KeyedStream),可以使用KeyedProcessFunetion
 * Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用
通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法
由于KeyedProcessFunction实现了RichFunction接口,因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放
需要注意的是,只有在KeyedStream里才能够访问state和定时器,通俗点来说就是这个函数要用在keyBy这个函数的后面
 * @author conca
 */
public class StatisticsMain {
	public static void main(String[] args) throws Exception {
		// 创建执行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment
		.getExecutionEnvironment();
		env.setParallelism(1);

		env.addSource(new IntSource())
		.keyBy(r->"count")
		.process(new StatisticsProcess())
		.print();
		env.execute();
	}
	
	
	public static class StatisticsProcess extends
	 KeyedProcessFunction<String, Integer, StatisticsBean>{

		private static final long serialVersionUID = 1L;
		private ValueState<StatisticsBean> accumulator;
		
		@Override
		public void open(Configuration parameters) throws Exception {
			accumulator = getRuntimeContext()
					.getState(
				new ValueStateDescriptor<StatisticsBean>
				("xx", org.apache.flink.api.common.typeinfo.Types
				.POJO(StatisticsBean.class)));
		}

		@Override
		public void processElement(Integer in, Context context,
				Collector<StatisticsBean> out) throws Exception {
			
			if(accumulator.value() == null) {
				accumulator.update(new StatisticsBean(in, in, 1, in, in));
			}else {
				StatisticsBean old = 
						accumulator.value();
				
				StatisticsBean newBean = new StatisticsBean
						(Math.max(in, old.max),
						 Math.min(in, old.min), 
						 1+old.count, 
						 in+old.sum, 
						 (in+old.sum)/(1+old.count));
				
				accumulator.update(newBean);
			}
			out.collect(accumulator.value());
		}
	}
}


一键分享文章

分类列表

  • • struts源码分析
  • • flink
  • • struts
  • • redis
  • • kafka
  • • ubuntu
  • • zookeeper
  • • hadoop
  • • activiti
  • • linux
  • • 成长
  • • NIO
  • • 关键词提取
  • • mysql
  • • android studio
  • • zabbix
  • • 云计算
  • • mahout
  • • jmeter
  • • hive
  • • ActiveMQ
  • • lucene
  • • MongoDB
  • • netty
  • • flume
  • • 我遇到的问题
  • • GRUB
  • • nginx
  • • 大家好的文章
  • • android
  • • tomcat
  • • Python
  • • luke
  • • android源码编译
  • • 安全
  • • MPAndroidChart
  • • swing
  • • POI
  • • powerdesigner
  • • jquery
  • • html
  • • java
  • • eclipse
  • • shell
  • • jvm
  • • highcharts
  • • 设计模式
  • • 列式数据库
  • • spring cloud
  • • docker+node.js+zookeeper构建微服务
版权所有 cookqq 感谢访问 支持开源 京ICP备15030920号
CopyRight 2015-2018 cookqq.com All Right Reserved.