伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

Flink join终结者:SQL Join

来源:本站原创 浏览:96次 时间:2023-05-13

SQL是开发人员与数据分析师必备的技能,Flink也提供了Sql方式编写任务,能够很大程度降低开发运维成本,这篇是flink join的终极篇SQL Join, 首先介绍sql join使用方式、然后介绍global join带来的状态存储成本及解决方式、最后从源码角度分析sql join实现。

一、SQL JOIN使用方式

对于sql join可以分为两类:Global Join、Time-windowed Join

  • Global Join
    Global Join表示全局join, 也可以称为无限流join, 由于没有时间限制,任何时候流入的数据都可以被关联上,支持inner join、left join、right join、full join 连接语法。使用语法遵循standard ANSI SQL。使用方式:

   SELECT *   FROM Orders INNER/LEFT/RIGHT/FULL JOIN Product ON Orders.productId = Product.id
  • Time-windowed Join
    基于时间窗口的join, 流表的数据关联必须在一定的时间范围内,同样支持inner join、left join、right join、full join,但是不同的是条件中带有时间属性条件,有以下几种使用方式:

 ltime = rtime ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

ltime、rtime表示流表的时间属性字段。
其实现与interval join 使用了相同的实现方式,不同的是:
a. Time-windowed Join 即可支持Event-Time,也可支持Processing-Time
b. interval join 只支持inner join,Time-windowed Join支持多种类型join
以Flink intervalJoin 使用与原理分析 中订单流与地址流为例,sql实现:

   select o.userId,a.addrId from orders o left join address a on o.addrId=a.addrId        and o.rtt BETWEEN a.rt - INTERVAL '5' SECOND AND a.rt - INTERVAL '1' SECOND
二、Idle State Retention Time 使用

global join 能够join 上任何时刻的数据,是由于状态中保存了两个流表的所有数据,这些数据都保存在状态中,默认情况下是不会被过期,但是两个流表又是持续输入的,待数日或者数月之后,状态数据会无限增大,但是很多时候我们数据关联具有时效性,例如只要求当天数据关联即可,那么这种方式会内存或者磁盘造成不必要浪费。那我们的目标就是能够设置状态ttl,在到达过期时间能够被自动清除,在DataStream API 可以通过StateTtlConfig 来设置状态的ttl, 但是sql方式就无法通过这种方式设置,好在flink 提供了Idle State Retention Time 空闲状态的保留时间,通过配置StreamQueryConfig来设置ttl时间,并且只能按照Processing-time来清理数据,从数据流入系统到当数据未被读写时间达到ttl 就会被自动清除。先看下其使用方式:

 val config=tabEnv.queryConfig.withIdleStateRetentionTime(Time.minutes(1),Time.minutes(6))tabEnv.sqlUpdate('"',config)tabEnv.sqlQuery("",config)tab.writeToSink(sink,config)

withIdleStateRetentionTime(minTime: Time, maxTime: Time), minTime/maxTime 分别表示空闲保留最小/最大时间,但是必须满足maxTime-minTime>=5min,接下来看下数据的ttl设置:
初始默认的数据ttl = curProcessTime(数据流入当前系统时间) + maxRetentionTime(maxTime),之后每有相同的数据流入,只要满足curProcessTime + minRetentionTime > oldExpiredTime(上一次设置ttl的时间),就将其ttl设置为curProcessTime + maxRetentionTime。

另外还有两点需注意:

  • Idle State Retention Time 不是全局有效,需要在每一个使用sqlUpdate/sqlQuery中单独设置

  • 数据定时清理同样是依赖flink 定时机制,会将定时数据存储在内存状态中,会对内存造成比较大的压力,可以选择rocksDB 来代替内存作为stateBackend

三、源码分析

Flink SQL 中使用了apache calcite来完成sql解析、验证、逻辑计划/物理计划生成以及优化工作,物理计划都需要实现DataStreamRel接口,其中DataStreamWindowJoin与DataStreamJoin 分别对应Time-window join 与 global window的物理执行计划,由于Time-window join 与 interval-join的实现步骤大体相似,最终还是会调用到IntervalJoinOperator,这里不做分析。主要分析一下,Global window 的执行过程,从DataStreamJoin入手。

  • DataStreamJoin中translateToPlan方法。
    该方法获取左右两个流表对应的DataStream, 根据不同join 类型选择不同的ProcessFunction,例如inner join 选择NonWindowInnerJoin,将leftDataStream 与 rightDataStream 进行connect 得到ConnectedStreams 然后执行对应的ProcessFunction

  • 以 inner join为例分析NonWindowInnerJoin, 继承了NonWindowJoin,而NonWindowJoin又继承了CoProcessFunction,与ProcessFunction针对一个流相反,CoProcessFunction是针对两个流的low level api, 可以访问状态、注册定时器。join 逻辑在其processElement方法中

  1. override def processElement(

  2.      value: CRow,

  3.      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,

  4.      out: Collector[CRow],

  5.      timerState: ValueState[Long],

  6.      currentSideState: MapState[Row, JTuple2[Long, Long]],

  7.      otherSideState: MapState[Row, JTuple2[Long, Long]],

  8.      isLeft: Boolean): Unit = {


  9.    val inputRow = value.row

  10.    updateCurrentSide(value, ctx, timerState, currentSideState)


  11.    cRowWrapper.setCollector(out)

  12.    cRowWrapper.setChange(value.change)

  13.    val otherSideIterator = otherSideState.iterator()

  14.    // join other side data

  15.    while (otherSideIterator.hasNext) {

  16.      val otherSideEntry = otherSideIterator.next()

  17.      val otherSideRow = otherSideEntry.getKey

  18.      val otherSideCntAndExpiredTime = otherSideEntry.getValue

  19.      // join

  20.      cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)

  21.      callJoinFunction(inputRow, isLeft, otherSideRow, cRowWrapper)

  22.      // clear expired data. Note: clear after join to keep closer to the original semantics

  23.      if (stateCleaningEnabled && curProcessTime >= otherSideCntAndExpiredTime.f1) {

  24.        otherSideIterator.remove()

  25.      }

  26.    }

  27.  }

两个MapState对应两个流的缓存数据,key表示具体的数据ROW,Value表示数据ROW的数量与过期时间,由于数据流入过程中可能会存在多条相同的记录,以数据ROW作为key这种方式可以减少内存使用.
ValueState 用于存储数据的过期时间,以便任务失败恢复能够继续对数据执行过期操作。


processElement 执行流程:
a. updateCurrentSide 保存数据与更新数据的count与ttl, 同时会注册数据的过期时间,数据的过期时间是根据Idle State Retention Time来设置的,从StreamQueryConfig可以获取到
b. 循环遍历另外一个状态,调用callJoinFunction输出数据,在callJoinFunction里面使用的joinFunction是通过FunctionCodeGenerator动态生成的在,在DataStreamJoin的translateToPlan方法中被调用到,有兴趣可以debug 方式copy下来研读一下。

  • 过期数据的清理定时是在updateCurrentSide注册的,其清理工作是在NonWindowJoin的onTimer方法完成,onTimer方法是从CoProcessFunction中继承过来的。在onTimer主要做过期时间判断并且清理。


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net