伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

Flink异步之矛-锋利的Async I/O

来源:本站原创 浏览:132次 时间:2022-03-06
Flink异步之矛-锋利的Async I/O

大数据技术与架构 大数据技术与架构

维表JOIN-绕不过去的业务场景

在Flink 流处理过程中,经常需要和外部系统进行交互,用维度表补全事实表中的字段。
例如:在电商场景中,需要一个商品的skuid去关联商品的一些属性,例如商品所属行业、商品的生产厂家、生产厂家的一些情况;在物流场景中,知道包裹id,需要去关联包裹的行业属性、发货信息、收货信息等等。
默认情况下,在Flink的MapFunction中,单个并行只能用同步方式去交互: 将请求发送到外部存储,IO阻塞,等待请求返回,然后继续发送下一个请求。这种同步交互的方式往往在网络等待上就耗费了大量时间。为了提高处理效率,可以增加MapFunction的并行度,但增加并行度就意味着更多的资源,并不是一种非常好的解决方式。

Async I/O异步非阻塞请求

Flink 在1.2中引入了Async I/O,在异步模式下,将IO操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。
Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,解决与外部系统交互时网络延迟成为了系统瓶颈的问题。

图中棕色的长条表示等待时间,可以发现网络等待时间极大地阻碍了吞吐和延迟。为了解决同步访问的问题,异步模式可以并发地处理多个请求和回复。也就是说,你可以连续地向数据库发送用户a、b、c等的请求,与此同时,哪个请求的回复先返回了就处理哪个回复,从而连续的请求之间不需要阻塞等待,如上图右边所示。这也正是 Async I/O 的实现原理。
详细的原理可以参考文末给出的第一个链接,来自阿里巴巴云邪的分享。
一个简单的例子如下:

public classAsyncIOFunctionTest{    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        env.setParallelism(1);        Properties p = new Properties();        p.setProperty("bootstrap.servers", "localhost:9092");        DataStreamSource<String> ds = env.addSource(new FlinkKafkaConsumer010<String>("order", new SimpleStringSchema(), p));        ds.print();        SingleOutputStreamOperator<Order> order = ds                .map(new MapFunction<String, Order>() {                    @Override                    public Order map(String value) throws Exception {                        return new Gson().fromJson(value, Order.class);                    }                })                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() {                    @Override                    public long extractAscendingTimestamp(Order element) {                        try {                            return element.getOrderTime();                        } catch (Exception e) {                            e.printStackTrace();                        }                        return 0;                    }                })                .keyBy(new KeySelector<Order, String>() {                    @Override                    public String getKey(Order value) throws Exception {                        return value.getUserId();                    }                })                .window(TumblingEventTimeWindows.of(Time.minutes(10)))                .maxBy("orderTime");        SingleOutputStreamOperator<Tuple7<String, String, Integer, String, String, String, Long>> operator = AsyncDataStream                .unorderedWait(order, new RichAsyncFunction<Order, Tuple7<String, String, Integer, String, String, String, Long>>() {                    private Connection connection;                    @Override                    public void open(Configuration parameters) throws Exception {                        super.open(parameters);                        Class.forName("com.mysql.jdbc.Driver");                        connection = DriverManager.getConnection("url", "user", "pwd");                        connection.setAutoCommit(false);                    }                    @Override                    public void asyncInvoke(Order input, ResultFuture<Tuple7<String, String, Integer, String, String, String, Long>> resultFuture) throws Exception {                        List<Tuple7<String, String, Integer, String, String, String, Long>> list = new ArrayList<>();                        // 在 asyncInvoke 方法中异步查询数据库                        String userId = input.getUserId();                        Statement statement = connection.createStatement();                        ResultSet resultSet = statement.executeQuery("select name,age,sex from user where userid=" + userId);                        if (resultSet != null && resultSet.next()) {                            String name = resultSet.getString("name");                            int age = resultSet.getInt("age");                            Stri�����ظ�,�

  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net