大数据技术与架构 大数据技术与架构
维表JOIN-绕不过去的业务场景在Flink 流处理过程中,经常需要和外部系统进行交互,用维度表补全事实表中的字段。
例如:在电商场景中,需要一个商品的skuid去关联商品的一些属性,例如商品所属行业、商品的生产厂家、生产厂家的一些情况;在物流场景中,知道包裹id,需要去关联包裹的行业属性、发货信息、收货信息等等。
默认情况下,在Flink的MapFunction中,单个并行只能用同步方式去交互: 将请求发送到外部存储,IO阻塞,等待请求返回,然后继续发送下一个请求。这种同步交互的方式往往在网络等待上就耗费了大量时间。为了提高处理效率,可以增加MapFunction的并行度,但增加并行度就意味着更多的资源,并不是一种非常好的解决方式。
Flink 在1.2中引入了Async I/O,在异步模式下,将IO操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。
Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,解决与外部系统交互时网络延迟成为了系统瓶颈的问题。
图中棕色的长条表示等待时间,可以发现网络等待时间极大地阻碍了吞吐和延迟。为了解决同步访问的问题,异步模式可以并发地处理多个请求和回复。也就是说,你可以连续地向数据库发送用户a、b、c等的请求,与此同时,哪个请求的回复先返回了就处理哪个回复,从而连续的请求之间不需要阻塞等待,如上图右边所示。这也正是 Async I/O 的实现原理。
详细的原理可以参考文末给出的第一个链接,来自阿里巴巴云邪的分享。
一个简单的例子如下:
public classAsyncIOFunctionTest{ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); Properties p = new Properties(); p.setProperty("bootstrap.servers", "localhost:9092"); DataStreamSource<String> ds = env.addSource(new FlinkKafkaConsumer010<String>("order", new SimpleStringSchema(), p)); ds.print(); SingleOutputStreamOperator<Order> order = ds .map(new MapFunction<String, Order>() { @Override public Order map(String value) throws Exception { return new Gson().fromJson(value, Order.class); } }) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() { @Override public long extractAscendingTimestamp(Order element) { try { return element.getOrderTime(); } catch (Exception e) { e.printStackTrace(); } return 0; } }) .keyBy(new KeySelector<Order, String>() { @Override public String getKey(Order value) throws Exception { return value.getUserId(); } }) .window(TumblingEventTimeWindows.of(Time.minutes(10))) .maxBy("orderTime"); SingleOutputStreamOperator<Tuple7<String, String, Integer, String, String, String, Long>> operator = AsyncDataStream .unorderedWait(order, new RichAsyncFunction<Order, Tuple7<String, String, Integer, String, String, String, Long>>() { private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("com.mysql.jdbc.Driver"); connection = DriverManager.getConnection("url", "user", "pwd"); connection.setAutoCommit(false); } @Override public void asyncInvoke(Order input, ResultFuture<Tuple7<String, String, Integer, String, String, String, Long>> resultFuture) throws Exception { List<Tuple7<String, String, Integer, String, String, String, Long>> list = new ArrayList<>(); // 在 asyncInvoke 方法中异步查询数据库 String userId = input.getUserId(); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery("select name,age,sex from user where userid=" + userId); if (resultSet != null && resultSet.next()) { String name = resultSet.getString("name"); int age = resultSet.getInt("age"); Stri�����ظ�,�