然后我们来看代码,可以看到,首先,我们说这个dataStream,这里,数据流,我们可以直接
.assignTimestampsAndWatermarks() 这里设置waterMark
首先我们去看可以看到参数中,这里是
AssignerWithPeriodicWatermarks这个参数.
我们可以去看一下这个参数的源码
我们去看一下这个
AssignerWithPeriodicWatermarks可以看到这个是,继承至TimestampAssigner
而这个TimestampAssigner是个,可以看到意思是个时间提取器,意思就是,提取了时间戳以后,然后
把时间戳封装成一个waterMark对吧.
然后我们这里传入一个,实现了上面的TimestampAssigner这个接口的类就可以了,可以看到上面是,new
BoundedOutOfOrdernessTimestampExtractor 这个可以看到字面上是,有界的乱序时间戳提取器对吧,Orderness有序,OutOfOrderness是无序.
然后这里我们做什么?我们只需要把数据流中的时间戳,提取出来就可以了对吧?
只是提取出来就可以了吗?
注意这里的方法我们可以去看一下
BoundedOutOfOrdernessTimestampExtractor的底层
我们去看看extractTimestamp,这个提取时间戳方法,我们继续看底层
去看实现的类,
AssignerWithPeriodicWatermarks
然后再看这个接口,继承的TimestampAssigner
可以看到这个继承的最终的,TimestampAssigner中的extractTimestamp这个方法
上面有个注释说,这个参数需要是用毫秒的单位才行.
可以看到这个值,如果他提取的时候,提取不到时间值的话,他会默认给出一个最小值.
这个最小值可以看到就是Long型的最小值对吧.
然后我们去看我们的数据,我们的数据是秒为单位的
所以这里,我们element.getTimestamp() * 1000L;这样就是毫秒了.
然后我们再看,还在报错对吧.是
BoundedOutOfOrdernessTimestampExtractor这个构造方法,缺少参数
我们去看这个参数是maxOutOfOrderness对吧
这个表示乱序程度
还记得我们之前说吗?
这里要使用乱序程度最大的,才能把尽可能多的迟到数据包含进去.
比如上面5-2=3 5-3=2 所以这里就要选择3作为延迟时间对吧.
然后这里我们,就暂时随便写一个比如:
Time.seconds(2)
我们写成2秒,把延迟两秒的数据,纳入进来对吧
所以解决乱序问题实际上就是,1.首先
指定
env.setStreamTimeCharacteristic这里指定,数据的时间按照
TimeCharacteristic.EventTime,也就是数据的生成时间.
然后再就是直接
.dataStream.assignTimestampsAndWatermarks()
这个方法,中,去new
BoundedOutOfOrdernessTimestampExtractor,然后重写
extractTimestamp方法,来提取数据中的,时间戳
然后,再
BoundedOutOfOrdernessTimestampExtractor给这个构造方法,指定数据的延迟时间
就可以了.
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>(){
}) 然后我们再看,除了用上面的
BoundedOutOfOrdernessTimestampExtractor,这个以外,
我们还可以使用,new
AscendingTimestampExtractor这个类对吧,这个是干嘛的呢?
我们可以看到asc是升序对吧.
我们去看,假如数据,真的是不乱序的,上面的那样,1,2,3,4,5...这样流过来的,这个时候
我们就可以直接用这个
AscendingTimestampExtractor这个了,可以看到
他的这个构造方法中并没有乱序程度那个参数对吧,因为他不是用来处理乱序数据的
他是用来处理正常顺序数据的.
然后我们再来看,这里我写上注释
可以看到给升序数据设置waterMark就按照上面就行,就是正常顺序数据的处理.
然后我们继续看,
给乱序数据设置waterMark就是对乱序数据的处理.
然后我们再看他底层是怎么弄的呢?
我们看dataStream的
assignTimestampsAndWatermarks这个方法,里面
AssignerWithPeriodicWatermarks这个参数
可以从名字上看的出来:
AssignerWithPeriodicWatermarks这个是周期性的水位线.
就是说,每隔一定的周期就设置一个waterMark对吧,并不是,每个数据来了以后
都设置一个waterMark对吧.
然后我们再看如果我们想每来一个数据,就设置一个waterMark呢?
这样我们怎么做?
可以看到有个,
AssignerWithPunctuatedWatermarks,这个类
可以看到这个是,Punctuated断点似的添加waterMark
可以看到他也是同样继承TimestampAssigner的,里面有提取extractTimestamp时间戳的方法对吧
然后我们再看
AssignerWithPunctuatedWatermarks里面还包含一个
checkAndGetNextWatermark对吧,这个方法
这个方法有lastElement这个是流过来的上一个数据,和当前的extractedTimestamp时间戳
得到一个waterMark对吧,也就是一个数据插入一个waterMark了
然后看到这个TimestampAssigner,里面有extractTimestamp方法用来提取时间戳
可以看到waterMark中最主要的其实就是有timestamp这个时间戳对吧
然后我们再看
AssignerWithPeriodicWatermarks 这个类可以看到
里面有的方法是getCurrentWatermark对吧,这个方法可以看到,就没有参数了
因为他周期性的去插入waterMark,不需要根据上一个数据和已经提取的时间戳对吧.
然后我们可以去看看这个
AssignerWithPeriodicWatermarks周期性提取waterMark
去看一下他这个方法,底层怎么实现的
首先这个类
BoundedOutOfOrdernessTimestampExtractor,是继承至
AssignerWithPeriodicWatermarks类的,在这个类中
重要的一个变量:currentMaxTimestamp,当前最大的时间戳
然后lastEmittedWatermark 这个是上一次发出的这个waterMark对吧.
这个默认值是long型的最小值.可以看到这里他并没有用waterMark作为自己的类型,而是
直接使用的long对吧.
然后我们看这里还要式maxOutOfOrderness这个就是最大乱序程度,也就是延迟时间对吧.
然后我们来看这部分的源码,首先看构造方法,可以看到这里,
首先得到这个乱序程度maxOutOfOrderness,拿到这个乱序程度以后,然后
先记录下来.
然后再去:currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
可以看到当前最大的waterMark,这里currentMaxTimestamp 为什么开始的时候,要去
用最小的long值取加上maxOutOfOrderness.
我们知道根据之前我们的分析,在处理乱序的时候,我们需要用当前的waterMark 减去 -
最大乱序程度对吧,但是上面没有减去,而是加上了,为什么?
可以看到他是在构造方法中做的,就是为了避免,如果一开始的时候,就需要生产waterMark,
但这个时候,我们知道本来应该是,lastEmittedWatermark - maxOutOfOrderness对吧,但是
刚开始的时候,这个还没有waterMark的时候,这个lastEmittedWatermark 是long型的最小值,这个
最小值如果再去,减去maxOutOfOrderness 减去一个值的话,就会导致溢出,变成一个很大的值
了,这样就会报错.所以他这里初始化的时候,在构造方法中没有减去,而是加,为了避免报错.
然后我们还可以看到,
BoundedOutOfOrdernessTimestampExtractor中的,继承过来的这个
extractTimestamp这个是抽象的对吧,这个是需要我们去实现,然后把时间戳拿出来返回的
然后我们主要看这个,getCurrentWatermark
可以看到他的代码,跟我们之前的分析一样,就是用当前最大的waterMark,减去乱序程度
对吧,然后再判断这个减去以后得到的值,和上一个waterMark比较,是否比原来的大,如果
比原来的大,那么就把上一个waterMark更新成这个大的最新的值,如果不是,那么就
还是保持原来的waterMark不变对吧.
这个就是这个
BoundedOutOfOrdernessTimestampExtractor的waterMark生成的底层原理.
用来处理乱序数据.
然后我们再去看
AscendingTimestampExtractor,看这个
可以看到他也是实现了
AssignerWithPeriodicWatermarks周期性的waterMark对吧
可以看到
AscendingTimestampExtractor里面的getCurrentWatermark
这个升序处理的waterMark,其实就是每个数据来了后面都跟一个waterMark对吧
看看他的这个getCurrentWatermark怎么实现的,可以看到,判断
currentTimestamp看看他和long的最小值对比,如果是最小值就返回最小值,初始化的时候,用对吧
否则可以看到,就返回currentTimestamp-1对吧.
这里减1实际上就是减去1毫秒对吧.
然后我们再看一下如何我们自己去定义一个waterMark,可以看到首先
我们自己定义了一个周期性的waterMark对吧,MyPeriodicAssigner对吧
可以看到我这里定义了延迟是延迟60*1000L毫秒,也就是延迟1分钟.
然后这里getCurrentWatermark,我们返回了一个,waterMark(maxTs - bound);
其实就是当前的值,减去我们设置的周期对吧.
然后我们看一下我们实现的extractTimestamp,这里可以看到就是,从数据中获取的时间戳和
maxTs做对比,从中获取大的那个,然后赋值给,maxTs,这个最大的时间戳,然后再返回
数据的时间戳就可以了,这里maxTs拿到最新的最大值,然后减去延迟的1分钟,然后
就可以返回waterMark了.
然后我们再看一个,我们自己定义的断点式的,Assigner,可以看到首先我们还是,声明了一个延迟
时间是60 * 1000L毫秒.
,然后我们实现checkAndGetNextWaterMark,这里面可以看到,判断,上一个数据,可以看到
上一个数据的id,如果是传感器1的话,sensor_1的话,我们再返回waterMark,而且返回的时候
也是用,提取的时间戳,减去延迟的1分钟时间,如果不是sensor_1这个传感器的话,就
不添加waterMark,也就是返回null
然后我们再实现提取extractTimestamp,提取时间戳的处理,这里
直接返回数据的getTimestamp就可以了.
这样我们也实现了自定义的断点式的处理乱序数据了.