一 背景

随着大数据的数量猛增,以及处理需求越来越多样化,大家对实时数据的统计需求要求越来越高。数据库界的大牛、图灵奖获得者Michael Stonebraker 在2009年曾经发表过一篇文章“MapReduce: A major step backwards”,他在文章中旗帜分明地说:MapReduce忽略了数据库领域积累了近40年的技术经验。虽然大数据技术的出现,降低了用户处理海量数据的门槛;但是,与数据库里面精湛的技术相比,大数据技术的抽象和实现还是太原始和初级。因此,在后面的十几年中,大数据技术一直以数据库为目标,将更多的成熟的数据库技术融入到大数据中。
1.1 数据仓库平台建设痛点

痛点一

几百TB级的T+1离线任务延迟导致重要报表数据产出时间不稳定

  • 凌晨NameNode压力大,call请求延迟不稳定。
  • 离线任务的ETL效率相对较低,一次ETL需要2个小时左右。
  • 一旦遇到硬盘坏掉或者机器宕机,Spark/Hive任务重试会延迟2小时。
为什么会出现这样的情况呢?

  • 任务本身请求的数据量非常大。通常一些重要的hive表的数据量可能要几十TB,或者更大。几百个分区,甚至上千个分区,几万+的文件数。如果是全量读取数据的话,几百个分区就会向NameNode发送几百次请求。在凌晨处理ETL任务的时候,NameNode的运行压力非常大。所以会出现NameNode响应慢,任务请求时初始化时间长。
  • 任务的ETL效率不高,这并不是说Spark的引擎效率不高,主要是我们对存储在HDFS上的这些文件的支持不是很好。例如:我们在查询一个分区的时候,需要将所有文件都扫描一遍才能进行分析;而实际上,我们只对一部分文件感兴趣。因此,这个方案的效率不高。
  • 大数据量的离线任务一旦遇到硬盘坏掉或者机器宕机,就需要重试,重试一次需要比较长的时间,大概几十分钟。重试次数越多,延迟越大。
痛点二





  • 不可靠的更新操作。我们通常会在ETL的过程中执行一些Insert OverWrite之类的操作,这类操作会先把相应的分区数据删除,再把生成的文件加载到分区中去。当我们在移除文件的时候,很多正在读取这些文件的任务就会出现异常,造成了不可靠的更新操作。
  • Hive表的schema变更低效。我们对Hive表增加一些字段、更改分区的操作其实是非常低效的操作。需要把所有的原始数据读出来,然后重新写回去。这样就会非常耗时,并且低效。
  • 数据可靠性缺乏保障。主要是针对Hive表的分区进行操作,数据分区通常把信息存放在两个地方,HDFS和Metastore,分别存储一份。这样在进行更新操作时,就会出现一个更新成功而另外一个更新失败,导致数据不可靠。
痛点三

基于Lambda架构构建实时数据仓库时存在很多实际问题

  • Kafka无法支持海量数据存储,无法支持高效的OLAP查询
  • Lambda架构维护成本很高



基于Lambda架构建设的实时数据仓库存在较多的问题。例如上面的架构图,一条链路是基于Kafka中转的实时链路(延迟要求小于2分钟),另外一条链路是离线链路(延迟大于1小时),甚至有些公司还会有第三条准实时链路(延迟要求在2分钟--1小时之间),或者更复杂的场景。

  • 两条链路对应两份数据。很多时候实时链路的处理结果和离线链路的处理结果不一致。
  • Kafka无法存储海量数据。无法基于当前的OLAP分析引擎高效查询Kafka中的数据。
  • Lambda维护成本高。代码、数据血缘、Schema等都需要两套。运维、监控成本都非常高。
痛点四

基于Lambda架构构建的支持更新数据业务还需要引入额外的存储技术。
不能很友好的支持高效的数据更新。大数据更新通常有两种场景,一种是CDC(Change Data Capture)的更新,将Binlog中的更新删除同步到HDFS上。另外一种是延迟数据聚合后的结果更新。HDFS只支持追加写,不支持更新。也有很多公司引入了Kudu。但是Kudu也有自身的不足,比如计算存储没有做到分离。整个数仓平台中引入了HDFS、Kafka和Kudu,运维成本很大。
1.2 大数据痛点破局

如何快速、一致、原子性地在大数据平台上构建起Data PipeLine,成为了迫切解决的问题。为此,Uber开源了Apache Kudi,Databricks提出了Delta Lake,而Netflix发起了Apache Iceberg项目,一时间这种具备ACID能力的表达式中间件成了大数据、数据湖领域炙手可热的方向。
在互联网巨头中,腾讯和阿里在2019年开始投入研发Apache Iceberg。为什么巨头选择了Iceberg呢?Iceberg有什么独到之处呢?
计算引擎之下、存储之上的技术
目前,大数据分析技术已经相当成熟,如何能够借鉴更多的数据库技术和理念来提升大数据的能力呢?Apache Iceberg、Hudi和Delta Lake这种技术从数据库中汲取了灵感,将事务的能力带到了大数据领域,并抽象成统一的中间格式供不同的引擎适配对接。
这类技术是介于上层计算引擎和底层存储格式之间的一个中间层,我们可以理解为“数据组织格式”,Iceberg将其称之为“表达式”也是这个意思。它与底层的存储格式(ORC、Parquet等列式存储格式)最大的区别是,它并不定义数据存储格式,而是定义了数据、元数据的组织方式,向上提供了统一的“表”的语义。它构建在数据存储格式之上,其底层数据存储仍然使用ORC、Parquet等进行存储。



Apache Iceberg、Hudi和Delta Lake诞生于不同的公司,需要解决的问题不一样,三种产品的差异大致如下:

  • Iceberg的设计初衷是定义于一个标准、开发且通用的数据组织格式,同时屏蔽底层数据存储格式上的差异,向上提供统一的操作API,使得不同的引擎可以通过其提供的API接入。
  • Hudi的设计初衷是解决流式数据的快速落地,并能够通过upsert语义进行延迟数据修正。
  • Delta Lake作为Databricks开源的项目,更侧重于在Spark层面解决Parquet、ORC等存储格式的固有问题,并带来更多的能力提升。
这三个项目都提供了ACID的能力,都基于乐观锁实现了冲突解决和提供线性一致性,同时也提供了time travel的功能。
正是由于设计初衷不同,三种技术擅长之处各不相同,Iceberg在其格式定义和核心能力上最为完善,但是上游引擎的适配稍微弱一些。Hudi是基于Spark打造了完整的流式数据落地方案,但是其核心抽象能力弱,对Spark耦合非常紧。Delta Lake同样高度依赖于Spark生态圈,与其他引擎的适配尚需时日。
二 为什么选择Iceberg




上图是由Flink团体针对数据湖方案的调优对比,总体来看这些方案的基础功能相对都还是比较完善的。基础功能总体来说主要包括:

  • 高效Table Schema的变更,比如针对增减分区,增减字段等功能。
  • ACID语义保证。
  • 同时支持流批读写,不会出现脏读等现象。
  • 支持OSS这类廉价存储。
Iceberg主要特点


  • 自带ACID能力:保障每次写入后的数据都是一个完整的快照,落地任务把数据直接写入Iceberg表中,不需要任务再做额外的success状态维护。Iceberg会根据分区字段自动处理延时到来的数据,把延时的数据及时地写入到正确的分区,因为有ACID的保障,延时数据写入过程中Iceberg表依然提供可靠的读取能力。




  • 分区管理灵活:Iceberg表提供partition的evolution功能,可以根据数据量的变化灵活调整分区策略而不需要修改落地任务。分区在Iceberg中是自动转换的,读取数据的时候不需要在SQL中指定分区,数据的存储和读取的灵活性都很高。
  • 结果数据可以更改:目前社区版本的Iceberg已经能够支持row level的update功能,这使得上面案例中的样本表可以及时地修正样本数据,在只有一份数据的情况下可以保障模型训练任务不把模型跑偏。
  • 支持增量读取:Iceberg还支持增量读取功能,可以根据snapshot来增量读取每一次修改的数据,这也大大增加了数据读取的灵活性。
  • 数据质量更直观:之前的数据写入完整性检测依赖落地任务的健康度,可能会出现数据落地成功而没有生成success文件或者生成了success文件而数据不完整的情况,而使用Iceberg之后可以根据snapshot的生成时间和数据状态来更加合理直观的判断数据是否正常。
2.1 其他不同点

Hudi的特性主要是支持快速的更新删除和增量拉取。
Iceberg的特性主要是代码抽象程度高,不绑定任何执行引擎。它暴露出来的核心接口,可以非常方便的和Spark/Flink对接。然而Delta和Hudi基本上和Spark的耦合性很重,如果想接入Flink,相对比较难。
2.2 选择Iceberg的原因

当前国内实时数仓建设围绕Flink的情况会多一点,我们选择Iceberg的原因,主要是为了基于Flink进行生态扩展。
国内有很多基于Iceberg的重要开发力量,比如腾讯、网易、Flink官方团队,他们的数据湖选型也是Iceberg。目前他们在社区主导update以及Flink的生态对接。
降低数据修正的成本。传统Hive/Spark在修正数据时需要将数据读取出来,修改后再写入,有极大的修正成本。Iceberg所具有的修改、删除能力能够有效地降低开销,提升效率。
三 Iceberg原理




左侧图是一个抽象的数据处理系统,分别有SQL引擎、table format、文件集合以及分布式文件系统构成。右侧是对应的现实中的组件,SQL引擎比如HiveServer、Impala、Spark等等,table format比如Metastore或者Iceberg,文件集合主要有Parquet/ORC/Avro等,而分布式文件系统就是HDFS。
对于table format,我认为主要包含4个层面的含义,分别是表schema定义(是否支持复杂数据类型),表中文件的组织形式,表相关统计信息、表索引信息以及表的读写API信息。

  • 表schema定义了一个表支持字段类型,比如int、string、long以及复杂数据类型等。
  • 表中文件组织形式最典型的是Partition模式,是Range Partition还是Hash Partition。
  • Metadata数据统计信息。
  • 封装了表的读写API。上层引擎通过对应的API读取或者写入表中的数据。
和Iceberg差不多相当的一个组件是Metastore。不过Metastore是一个服务,而Iceberg就是一个jar包。这里就Metastore和Iceberg在表格式的4个方面分别进行一下对比介绍。
3.1 在schema层面上没有任何区别

都支持int、string、bigint、timestamp、array<string>等类型
3.2 partition实现完全不同

Metastore中partition字段不能是表字段,因为partition字段本质上是一个目录结构,不是用户表中的一列数据。基于Metastore,用户想定位到一个partition下的所有数据,首先需要在Metastore中定位出该partition对应的所在目录位置信息,然后再到HDFS上执行list命令获取到这个分区下的所有文件,对这些文件进行扫描得到这个partition下的所有数据。
Iceberg中partition字段就是表中的一个字段。Iceberg中每一张表都有一个对应的文件元数据表,文件元数据表中每条记录表示一个文件的相关信息,这些信息中有一个字段是partition字段,表示这个文件所在的partition。
很明显,Iceberg表根据partition定位文件相比Metastore少一个步骤,就是根据目录信息去HDFS上执行list命令获取分区下的文件。
例如,对于一个二级分区的大表来说,一级分区是小时分区,二级分区是一个枚举字段分区,如果每个一级分区下有30个二级分区,那么这个表每天就有24*30=720个分区。基于Metastore的partition方案,如果一个SQL想基于这个表扫描昨天一天的数据的话,就需要向NameNode下发720次list请求,如果扫描一周数据或者一个月数据,请求数据就更是相当夸张。这样,一方面会导致NameNode压力很大,一方面也会导致SQL请求响应延迟很大。而基于Iceberg的partition方案,就完全没有这个问题。
Iceberg的分区查找优化

Iceberg数据表每一次修改后的状态都会生成一个snapshot(s0,s1)文件,snapshot文件中包含了一个manifest文件的list,list中存储了当前的snapshot状态是由哪些manifest文件组成的。每一个manifest的文件中会指向到真是数据的存储文件data file(一般是parquet格式)。在这种结构中,每一个快照读取所需要的数据文件都已经清晰的定义在了manifest list和manifest的文件中,并且manifest中还存储了相关的partition信息,那么在读取数据的时候需要筛选partition,通过manifest中存储的信息以K->V映射方式在O1复杂度的计算中就能定位到需要读取的partition目录。当前常用的数据读取引擎,例如Hive需要遍历整个数据目录下的文件索引来寻找必要的partition,是一个O(n)的复杂度查找过程。在大数据常见的海量分区下,采用partition映射的模式来选取目录的优化效果是非常明显的,可以在Ryan Blue的讲座中看到在NetFlix的应用场景中2600个分区只需要10S就列出来了,而使用Hive大概10分钟还没完成。
3.3 表统计信息实现粒度不同

Metastore中一张表的统计信息是表/分区级别粒度的统计信息,比如记录一张表中某一列的记录数量、平均长度、为null的记录数量、最大值最小值等。
Iceberg中统计信息精确到文件粒度,即每个数据文件都会记录所有列的记录数量、平均长度、最大值最小值等。
很明显,文件粒度的统计信息对于查询中谓词(即where条件)的过滤会更有效果。
Iceberg谓词下推的三层过滤

(1)分区过滤:Iceberg支持查询中的谓词下推,前面已经说了Iceberg是支持隐式分区的,就是说在读取数据的时候不需要在SQL中指定分区。Iceberg会接收上层计算引擎下推过来的谓词表达式,跟进谓词表达式中column分区的信息进行分区转换的计算。例如,一个Iceberg表有一列time,用户设定了在time列上按照小时分区,当查询条件为time>=2020-01-01 10:00 AND 2020-01-01 13:00的时候Iceberg会根据下推过来的谓词表达式和Schema中定义的分区转换表达式进行计算。直接算出数据分区是在10点11点12点三个分区中,然后依据manifest中的分区字段直接定位到分区目录。
(2)文件过滤:Iceberg会把谓词继续下推到更细的筛选粒度,跟进谓词表达式和manifest中column的min/max值Iceberg可以有效的过滤查询数据所覆盖的具体data file,对扫描集成做进一步的筛选,如果筛选column是有序的,那么下推效果将更加明显。
(3)RowGroup过滤:经过分区过滤和文件过滤之后Iceberg还会继续把谓词表达式下推到data file文件内部的RowGroup级别,跟进parquet文件的metadata信息对RowGroup做进一步的筛选。经过以上三层的筛选,Iceberg最终把数据的扫描集缩小到必须读取的RowGroup级别,然后把需要读取的RowGroup数据读入到内存之中。(同样在Ryan Blue的讲座中我们可以看到,通过层层筛选(命中min/max)之后,Iceberg使得数据计算任务从61小时降低到了22分钟)。
3.4 读取API实现不同




Metastore模式下上层引擎写好一批文件,调用Metastore的add partition接口将这些文件添加到某个分区下。



Iceberg模式下上层业务写好一批文件,调动iceberg的commit接口提交本次写入形成一个新的snapshot快照。这种提交方式保证了表的ACID语义。同时基于snapshot快照提交可以实现增量拉取数据。
Iceberg写入流程及文件结构

Iceberg在数据写入的时候,按照下面的步骤进行。
(1)先把数据写入到data file文件中
(2)当一组data files文件写完之后,会根据data file文件中column的一些统计信息(如:每个column的min/max值),生成一个对应的manifest文件
(3)然后Iceberg把一次写入后涉及到的manifest文件组成一个manifest list(manifests),manifest list文件中也会存入一些相关manifest的统计信息(如:分区信息,manifest有效性)等
(4)然后按照整个manifest list生成一个对应的snapshot文件
(5)生成完snapshot文件之后,Iceberg会把当前snapshot的ID及存储路径等信息写入到metadata文件中
(6)当一切准备完毕之后,会以原子操作的方式commit这个metadata文件,这样一次iceberg的数据写入就完成了。随着每次的写入iceberg就生成了上图的文件组织模式
3.5 Iceberg的向量化读取和数据的zero copy

在低版本的Spark中,由于Spark DataSourceV2的API不支持批量读取,因此Iceberg通过for循环把筛选后的数据一行一行的返回给Spark去处理这个过程中既需要数据不断的在内存中互相拷贝,也无法发挥列式数据在现代CPU架构中的向量化处理能力。为了进一步提升读取速度,Iceberg在Spark2.4.5版本中,利用Spark BatchColumn的读取特性引入了向量化读取能力。
(1)经过谓词下推后,Iceberg把需要的RowGroup数据读入到了内存中。RowGroup是列式组织的,具有可向量化处理的优势。
(2)Iceberg会根据SQL语句的project来删减需要读取的column trunk。
(3)然后Iceberg借助Arrow插件作为共享内存,以page+Batch size为单位一次性的把一个批次大小的数据存入到共享内存中。
(4)当数据存储完之后把共享内存地址返回给spark,spark拿到共享内存地址之后,可以不再进行数据拷贝直接通过偏移量来访问Arrow获取数据。
3.6 Iceberg相对于Metastore的优势


  • 新partition模式:避免了查询时n次调用NameNode的list方法,降低NameNode压力,提升查询性能。
  • 新metadata模式:文件级别列统计信息可以用来根据where字段进行文件过滤,很多场景下可以大大减少扫描文件数,提升查询性能。
  • 新API模式:存储流批一体化。
流式写入-增量拉取
基于Iceberg统一存储模式可以同时满足业务批量读取以及增量订阅需求
支持流批同时读写同一张表
统一表schema,任务执行过程中不会出现FileNotFoundException。
四 Iceberg的优势




五 数据湖Iceberg社区现状




目前Iceberg主要支持的计算引擎包括Spark2.4.5、Spark3.x、Flink1.11以及Presto。同时,一些运维工作比如snapshot过期、小文件合并、增量订阅消费等功能都可以实现。
对于Apache Flink来说,Apache Iceberg是delta、iceberg、hudi三个开源项目中最先完成Flink接入的开源项目。通过Flink来完成实现导入数据到Iceberg的数据湖、通过Flink batch作业来读取Iceberg数据,这两个核心功能在Apache Iceberg0.10.0版本发布。实时写入和读取CDC数据的功能,将在Iceberg的0.11.0版本发布。
六 数据湖Iceberg实践之路




Iceberg针对目前大数据量的情况下,可以大大提升ETL任务的执行效率,这主要得益于新Partition模式下不再需要请求NameNode分区信息,同时得益于文件级别统计信息模式下可以过滤很多不满足条件的数据文件。



我们目前使用的是Spark2.4.5,在这个基础之上做了更多计算引擎的适配工作。主要包括:

  • 集成Hive。可以通过Hive创建和删除Iceberg表,通过HiveSQL查询Iceberg表中的数据。
  • 集成Impala。用户可以通过Impala新建Iceberg内(外)表,并通过Impala查询Iceberg表中的数据。
  • 集成Flink。已经实现了Flink到Iceberg的sink实现,业务可以通过消费kafka中的数据将结果写入到Iceberg中。同时基于Flink引擎实现了小文件异步合并的功能,这样可以实现Flink一边写数据文件,一边执行小文件的合并。基于Iceberg的小文件合并通过commit的方式提交,不需要删除合并前的小文件,也就不会引起读取任务的任何异常。
温馨提示:
1、在论坛里发表的文章仅代表作者本人的观点,与本网站立场无关。
2、论坛的所有内容都不保证其准确性,有效性,时间性。阅读本站内容因误导等因素而造成的损失本站不承担连带责任。
3、当政府机关依照法定程序要求披露信息时,论坛均得免责。
4、若因线路及非本站所能控制范围的故障导致暂停服务期间造成的一切不便与损失,论坛不负任何责任。
5、注册会员通过任何手段和方法针对论坛进行破坏,我们有权对其行为作出处理。并保留进一步追究其责任的权利。
回复

使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    563

    主题

    1496

    帖子

    3577

    积分

    论坛元老

    Rank: 8Rank: 8

    积分
    3577
    发表于 2021-7-16 21:06:16 | 显示全部楼层
    沙发
    原创不易,欢迎大家点赞+关注,您的关注是我继续写下去的动力!
    回复

    使用道具 举报

    546

    主题

    1412

    帖子

    3372

    积分

    论坛元老

    Rank: 8Rank: 8

    积分
    3372
    发表于 2021-7-16 21:06:22 | 显示全部楼层
    板凳
    转发了
    回复

    使用道具 举报

    606

    主题

    1487

    帖子

    3596

    积分

    论坛元老

    Rank: 8Rank: 8

    积分
    3596
    发表于 2021-7-16 21:06:46 | 显示全部楼层
    地板
    转发了
    回复

    使用道具 举报

    512

    主题

    1438

    帖子

    3406

    积分

    论坛元老

    Rank: 8Rank: 8

    积分
    3406
    发表于 2021-7-16 21:07:24 | 显示全部楼层
    5#
    转发了
    回复

    使用道具 举报

    544

    主题

    1401

    帖子

    3386

    积分

    论坛元老

    Rank: 8Rank: 8

    积分
    3386
    发表于 2021-7-16 21:07:43 | 显示全部楼层
    6#
    转发了
    回复

    使用道具 举报

    572

    主题

    1418

    帖子

    3422

    积分

    论坛元老

    Rank: 8Rank: 8

    积分
    3422
    发表于 2021-7-16 21:08:29 | 显示全部楼层
    7#
    转发了
    回复

    使用道具 举报

    546

    主题

    1412

    帖子

    3372

    积分

    论坛元老

    Rank: 8Rank: 8

    积分
    3372
    发表于 2021-7-16 21:09:14 | 显示全部楼层
    8#
    转发了
    回复

    使用道具 举报

    571

    主题

    1462

    帖子

    3523

    积分

    论坛元老

    Rank: 8Rank: 8

    积分
    3523
    发表于 2021-7-16 21:09:32 | 显示全部楼层
    9#
    转发了
    回复

    使用道具 举报

    • 售后服务
    • 关注我们
    • 社区新手

    QQ|手机版|小黑屋|数据通

    Powered by datatong.net X3.4  © 2008-2020 数据通