醋醋百科网

Good Luck To You!

大数据_Flink_Java版_数据处理_Watermark(7)在代码中的设置



然后我们来看代码,可以看到,首先,我们说这个dataStream,这里,数据流,我们可以直接



.assignTimestampsAndWatermarks() 这里设置waterMark



首先我们去看可以看到参数中,这里是
AssignerWithPeriodicWatermarks这个参数.



我们可以去看一下这个参数的源码



我们去看一下这个
AssignerWithPeriodicWatermarks可以看到这个是,继承至TimestampAssigner



而这个TimestampAssigner是个,可以看到意思是个时间提取器,意思就是,提取了时间戳以后,然后


把时间戳封装成一个waterMark对吧.



然后我们这里传入一个,实现了上面的TimestampAssigner这个接口的类就可以了,可以看到上面是,new
BoundedOutOfOrdernessTimestampExtractor 这个可以看到字面上是,有界的乱序时间戳提取器对吧,Orderness有序,OutOfOrderness是无序.



然后这里我们做什么?我们只需要把数据流中的时间戳,提取出来就可以了对吧?


只是提取出来就可以了吗?



注意这里的方法我们可以去看一下
BoundedOutOfOrdernessTimestampExtractor的底层



我们去看看extractTimestamp,这个提取时间戳方法,我们继续看底层



去看实现的类,
AssignerWithPeriodicWatermarks



然后再看这个接口,继承的TimestampAssigner



可以看到这个继承的最终的,TimestampAssigner中的extractTimestamp这个方法



上面有个注释说,这个参数需要是用毫秒的单位才行.



可以看到这个值,如果他提取的时候,提取不到时间值的话,他会默认给出一个最小值.


这个最小值可以看到就是Long型的最小值对吧.



然后我们去看我们的数据,我们的数据是秒为单位的



所以这里,我们element.getTimestamp() * 1000L;这样就是毫秒了.



然后我们再看,还在报错对吧.是
BoundedOutOfOrdernessTimestampExtractor这个构造方法,缺少参数



我们去看这个参数是maxOutOfOrderness对吧


这个表示乱序程度



还记得我们之前说吗?


这里要使用乱序程度最大的,才能把尽可能多的迟到数据包含进去.


比如上面5-2=3 5-3=2 所以这里就要选择3作为延迟时间对吧.



然后这里我们,就暂时随便写一个比如:


Time.seconds(2)


我们写成2秒,把延迟两秒的数据,纳入进来对吧



所以解决乱序问题实际上就是,1.首先


指定
env.setStreamTimeCharacteristic这里指定,数据的时间按照



TimeCharacteristic.EventTime,也就是数据的生成时间.



然后再就是直接
.dataStream.assignTimestampsAndWatermarks()


这个方法,中,去new
BoundedOutOfOrdernessTimestampExtractor,然后重写


extractTimestamp方法,来提取数据中的,时间戳


然后,再
BoundedOutOfOrdernessTimestampExtractor给这个构造方法,指定数据的延迟时间


就可以了.



.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>(){


}) 然后我们再看,除了用上面的
BoundedOutOfOrdernessTimestampExtractor,这个以外,


我们还可以使用,new
AscendingTimestampExtractor这个类对吧,这个是干嘛的呢?


我们可以看到asc是升序对吧.



我们去看,假如数据,真的是不乱序的,上面的那样,1,2,3,4,5...这样流过来的,这个时候


我们就可以直接用这个
AscendingTimestampExtractor这个了,可以看到



他的这个构造方法中并没有乱序程度那个参数对吧,因为他不是用来处理乱序数据的


他是用来处理正常顺序数据的.



然后我们再来看,这里我写上注释


可以看到给升序数据设置waterMark就按照上面就行,就是正常顺序数据的处理.



然后我们继续看,


给乱序数据设置waterMark就是对乱序数据的处理.



然后我们再看他底层是怎么弄的呢?


我们看dataStream的
assignTimestampsAndWatermarks这个方法,里面



AssignerWithPeriodicWatermarks这个参数



可以从名字上看的出来:



AssignerWithPeriodicWatermarks这个是周期性的水位线.


就是说,每隔一定的周期就设置一个waterMark对吧,并不是,每个数据来了以后


都设置一个waterMark对吧.



然后我们再看如果我们想每来一个数据,就设置一个waterMark呢?


这样我们怎么做?


可以看到有个,
AssignerWithPunctuatedWatermarks,这个类


可以看到这个是,Punctuated断点似的添加waterMark




可以看到他也是同样继承TimestampAssigner的,里面有提取extractTimestamp时间戳的方法对吧



然后我们再看
AssignerWithPunctuatedWatermarks里面还包含一个


checkAndGetNextWatermark对吧,这个方法


这个方法有lastElement这个是流过来的上一个数据,和当前的extractedTimestamp时间戳


得到一个waterMark对吧,也就是一个数据插入一个waterMark了



然后看到这个TimestampAssigner,里面有extractTimestamp方法用来提取时间戳


可以看到waterMark中最主要的其实就是有timestamp这个时间戳对吧



然后我们再看



AssignerWithPeriodicWatermarks 这个类可以看到


里面有的方法是getCurrentWatermark对吧,这个方法可以看到,就没有参数了


因为他周期性的去插入waterMark,不需要根据上一个数据和已经提取的时间戳对吧.



然后我们可以去看看这个
AssignerWithPeriodicWatermarks周期性提取waterMark



去看一下他这个方法,底层怎么实现的



首先这个类
BoundedOutOfOrdernessTimestampExtractor,是继承至



AssignerWithPeriodicWatermarks类的,在这个类中


重要的一个变量:currentMaxTimestamp,当前最大的时间戳


然后lastEmittedWatermark 这个是上一次发出的这个waterMark对吧.


这个默认值是long型的最小值.可以看到这里他并没有用waterMark作为自己的类型,而是


直接使用的long对吧.



然后我们看这里还要式maxOutOfOrderness这个就是最大乱序程度,也就是延迟时间对吧.



然后我们来看这部分的源码,首先看构造方法,可以看到这里,


首先得到这个乱序程度maxOutOfOrderness,拿到这个乱序程度以后,然后


先记录下来.


然后再去:currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;


可以看到当前最大的waterMark,这里currentMaxTimestamp 为什么开始的时候,要去


用最小的long值取加上maxOutOfOrderness.


我们知道根据之前我们的分析,在处理乱序的时候,我们需要用当前的waterMark 减去 -


最大乱序程度对吧,但是上面没有减去,而是加上了,为什么?


可以看到他是在构造方法中做的,就是为了避免,如果一开始的时候,就需要生产waterMark,


但这个时候,我们知道本来应该是,lastEmittedWatermark - maxOutOfOrderness对吧,但是


刚开始的时候,这个还没有waterMark的时候,这个lastEmittedWatermark 是long型的最小值,这个


最小值如果再去,减去maxOutOfOrderness 减去一个值的话,就会导致溢出,变成一个很大的值


了,这样就会报错.所以他这里初始化的时候,在构造方法中没有减去,而是加,为了避免报错.



然后我们还可以看到,
BoundedOutOfOrdernessTimestampExtractor中的,继承过来的这个


extractTimestamp这个是抽象的对吧,这个是需要我们去实现,然后把时间戳拿出来返回的



然后我们主要看这个,getCurrentWatermark


可以看到他的代码,跟我们之前的分析一样,就是用当前最大的waterMark,减去乱序程度


对吧,然后再判断这个减去以后得到的值,和上一个waterMark比较,是否比原来的大,如果


比原来的大,那么就把上一个waterMark更新成这个大的最新的值,如果不是,那么就


还是保持原来的waterMark不变对吧.


这个就是这个
BoundedOutOfOrdernessTimestampExtractor的waterMark生成的底层原理.


用来处理乱序数据.



然后我们再去看
AscendingTimestampExtractor,看这个


可以看到他也是实现了
AssignerWithPeriodicWatermarks周期性的waterMark对吧



可以看到



AscendingTimestampExtractor里面的getCurrentWatermark


这个升序处理的waterMark,其实就是每个数据来了后面都跟一个waterMark对吧



看看他的这个getCurrentWatermark怎么实现的,可以看到,判断


currentTimestamp看看他和long的最小值对比,如果是最小值就返回最小值,初始化的时候,用对吧


否则可以看到,就返回currentTimestamp-1对吧.


这里减1实际上就是减去1毫秒对吧.



然后我们再看一下如何我们自己去定义一个waterMark,可以看到首先


我们自己定义了一个周期性的waterMark对吧,MyPeriodicAssigner对吧



可以看到我这里定义了延迟是延迟60*1000L毫秒,也就是延迟1分钟.


然后这里getCurrentWatermark,我们返回了一个,waterMark(maxTs - bound);


其实就是当前的值,减去我们设置的周期对吧.


然后我们看一下我们实现的extractTimestamp,这里可以看到就是,从数据中获取的时间戳和


maxTs做对比,从中获取大的那个,然后赋值给,maxTs,这个最大的时间戳,然后再返回


数据的时间戳就可以了,这里maxTs拿到最新的最大值,然后减去延迟的1分钟,然后


就可以返回waterMark了.



然后我们再看一个,我们自己定义的断点式的,Assigner,可以看到首先我们还是,声明了一个延迟


时间是60 * 1000L毫秒.


,然后我们实现checkAndGetNextWaterMark,这里面可以看到,判断,上一个数据,可以看到


上一个数据的id,如果是传感器1的话,sensor_1的话,我们再返回waterMark,而且返回的时候


也是用,提取的时间戳,减去延迟的1分钟时间,如果不是sensor_1这个传感器的话,就


不添加waterMark,也就是返回null



然后我们再实现提取extractTimestamp,提取时间戳的处理,这里


直接返回数据的getTimestamp就可以了.


这样我们也实现了自定义的断点式的处理乱序数据了.

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言