2024-04-11 21:23:58.0|分类: flink|浏览量: 759
|
package com.conca.flink.bean;
/**
* 传感器实体类
* @author conca
*/
public class SensorTemperaturer {
public String sensor;//传感器id
public Double temperaturer;//温度
public SensorTemperaturer() {
super();
}
public SensorTemperaturer(String sensor, Double temperaturer) {
super();
this.sensor = sensor;
this.temperaturer = temperaturer;
}
}
package com.conca.flink.source;
import java.util.Random;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.conca.flink.bean.SensorTemperaturer;
/*
* 传感器温度发生器,产生测试数据源
*/
public class SensorTemperaturerSource
implements SourceFunction<SensorTemperaturer>{
private boolean running = true;
private Random random =new Random();
@Override
public void cancel(){
running = false;
}
@Override
public void run(SourceContext<SensorTemperaturer> ctx) throws Exception {
while(running){
for (int i = 1; i < 4; i++) {
ctx.collect(new SensorTemperaturer("snsor"+i,
random.nextGaussian()));
}
Thread.sleep( 300L);
}
}
}
package com.conca.flink.vedio.keyedProcessFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import com.conca.flink.bean.SensorTemperaturer;
import com.conca.flink.source.SensorTemperaturerSource;
/**
* 针对keyBy之后的键控流(KeyedStream),可以使用KeyedProcessFunetion
* Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用
通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法
由于KeyedProcessFunction实现了RichFunction接口,
因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放
需要注意的是,只有在KeyedStream里才能够访问state和定时器,
通俗点来说就是这个函数要用在keyBy这个函数的后面
* @author conca
*/
public class ContinuousIncreaseMain {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.addSource(new SensorTemperaturerSource())
.keyBy(r->r.sensor)
.process(new StatisticsProcess())
.print();
env.execute();
}
public static class StatisticsProcess extends
KeyedProcessFunction<String, SensorTemperaturer, String>{
private static final long serialVersionUID = 1L;
private ValueState<Double> lastTemperaturer;
private ValueState<Long> time;
@Override
public void open(Configuration parameters) throws Exception {
lastTemperaturer = getRuntimeContext()
.getState(
new ValueStateDescriptor<Double>
("xx", Types.DOUBLE));
time = getRuntimeContext()
.getState(
new ValueStateDescriptor<Long>
("yyy", Types.LONG));
}
@Override
public void processElement(SensorTemperaturer in, Context context,
Collector<String> out) throws Exception {
Double prev = lastTemperaturer.value();
lastTemperaturer.update(in.temperaturer);
if(prev != null) {
if(in.temperaturer > prev) {
if(time.value() == null) {
time.update(context.timerService()
.currentProcessingTime()+1000L);
context.timerService()
.registerProcessingTimeTimer(time.value());
}
}else if(in.temperaturer < prev) {
if(time.value() != null) {
context.timerService()
.deleteProcessingTimeTimer(time.value());
time.clear();
}
}
}
}
@Override
public void onTimer(long timestamp,
OnTimerContext ctx,
Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey()+
"温度连续上涨1s,现在温度是:"+lastTemperaturer.value());
time.clear();
}
}
} |
