Flink温度连续上涨1s
cookqq ›博客列表 ›flink

Flink温度连续上涨1s

2024-04-11 21:23:58.0|分类: flink|浏览量: 497

摘要: Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用 通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法 由于KeyedProcessFunction实现了RichFunction接口, 因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放 需要注意的是,只有在KeyedStream里才能够访问state和定时器, 通俗点来说就是这个函数要用在keyBy这个函数的后面

     

package com.conca.flink.bean;

/**
 * 传感器实体类
 * @author conca
 */
public class SensorTemperaturer {

	public String sensor;//传感器id
	public Double temperaturer;//温度
	
	public SensorTemperaturer() {
		super();
	}
	
	public SensorTemperaturer(String sensor, Double temperaturer) {
		super();
		this.sensor = sensor;
		this.temperaturer = temperaturer;
	}
	
	
	
}

package com.conca.flink.source;

import java.util.Random;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import com.conca.flink.bean.SensorTemperaturer;

/*
 * 传感器温度发生器,产生测试数据源
 */
public class SensorTemperaturerSource 
    implements SourceFunction<SensorTemperaturer>{
	
	private boolean running = true;
	private Random random =new Random();
	
	@Override
	public void cancel(){
		running = false;
	}

	@Override
	public void run(SourceContext<SensorTemperaturer> ctx) throws Exception {
		while(running){
			for (int i = 1; i < 4; i++) {
				ctx.collect(new SensorTemperaturer("snsor"+i, 
				random.nextGaussian()));
			}
			Thread.sleep( 300L);
		}
	}
}


package com.conca.flink.vedio.keyedProcessFunction;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import com.conca.flink.bean.SensorTemperaturer;
import com.conca.flink.source.SensorTemperaturerSource;

/**
 * 针对keyBy之后的键控流(KeyedStream),可以使用KeyedProcessFunetion
 * Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用
通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法
由于KeyedProcessFunction实现了RichFunction接口,
因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放
需要注意的是,只有在KeyedStream里才能够访问state和定时器,
通俗点来说就是这个函数要用在keyBy这个函数的后面
 * @author conca
 */
public class ContinuousIncreaseMain {
	public static void main(String[] args) throws Exception {
		// 创建执行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);

		env.addSource(new SensorTemperaturerSource())
		.keyBy(r->r.sensor)
		.process(new StatisticsProcess())
		.print();
		env.execute();
	}
	
	
	public static class StatisticsProcess extends 
	KeyedProcessFunction<String, SensorTemperaturer, String>{

		private static final long serialVersionUID = 1L;
		
		private ValueState<Double> lastTemperaturer;
		private ValueState<Long> time;
		
		@Override
		public void open(Configuration parameters) throws Exception {
			lastTemperaturer = getRuntimeContext()
					.getState(
							new ValueStateDescriptor<Double>
							("xx", Types.DOUBLE));
			
			time = getRuntimeContext()
					.getState(
							new ValueStateDescriptor<Long>
							("yyy", Types.LONG));
		}

		@Override
		public void processElement(SensorTemperaturer in, Context context,
				Collector<String> out) throws Exception {
			Double prev = lastTemperaturer.value();
			lastTemperaturer.update(in.temperaturer);
			
			if(prev != null) {
				if(in.temperaturer > prev) {
					if(time.value() == null) {
						time.update(context.timerService()
						.currentProcessingTime()+1000L);
						context.timerService()
						.registerProcessingTimeTimer(time.value());
					}
					
				}else if(in.temperaturer < prev) {
					if(time.value() != null) {
						context.timerService()
						.deleteProcessingTimeTimer(time.value());
						time.clear();
					}
				}
			}
		}

		@Override
		public void onTimer(long timestamp, 
				OnTimerContext ctx,
				Collector<String> out) throws Exception {
			out.collect(ctx.getCurrentKey()+
			"温度连续上涨1s,现在温度是:"+lastTemperaturer.value());
			time.clear();
		}
		
		
	}
}


一键分享文章

分类列表

  • • struts源码分析
  • • flink
  • • struts
  • • redis
  • • kafka
  • • ubuntu
  • • zookeeper
  • • hadoop
  • • activiti
  • • linux
  • • 成长
  • • NIO
  • • 关键词提取
  • • mysql
  • • android studio
  • • zabbix
  • • 云计算
  • • mahout
  • • jmeter
  • • hive
  • • ActiveMQ
  • • lucene
  • • MongoDB
  • • netty
  • • flume
  • • 我遇到的问题
  • • GRUB
  • • nginx
  • • 大家好的文章
  • • android
  • • tomcat
  • • Python
  • • luke
  • • android源码编译
  • • 安全
  • • MPAndroidChart
  • • swing
  • • POI
  • • powerdesigner
  • • jquery
  • • html
  • • java
  • • eclipse
  • • shell
  • • jvm
  • • highcharts
  • • 设计模式
  • • 列式数据库
  • • spring cloud
  • • docker+node.js+zookeeper构建微服务
版权所有 cookqq 感谢访问 支持开源 京ICP备15030920号
CopyRight 2015-2018 cookqq.com All Right Reserved.