数据仓库分层
1、介绍
数据仓库更多代表的是一种对数据的管理和使用的方式,它是一整套包括了etl、调度、建模在内的完整的理论体系。现在所谓的大数据更多的是一种数据量级的增大和工具的上的更新。 两者并无冲突,相反,而是一种更好的结合。数据仓库在构建过程中通常都需要进行分层处理。业务不同,分层的技术处理手段也不同。
2、为什么要分层
分层的主要原因是在管理数据的时候,能对数据有一个更加清晰的掌控,详细来讲,主要有下面几个原因:
清晰数据结构
每一个数据分层都有它的作用域,这样我们在使用表的时候能更方便地定位和理解。
数据血缘追踪
简单来说,我们最终给业务呈现的是一个能直接使用业务表,但是它的来源有很多,如果有一张来源表出问题了,我们希望能够快速准确地定位到问题,并清楚它的危害范围。
减少重复开发
规范数据分层,开发一些通用的中间层数据,能够减少极大的重复计算。
把复杂问题简单化
将一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解。而且便于维护数据的准确性,当数据出现问题之后,可以不用修复所有的数据,只需要从有问题的步骤开始修复。
屏蔽原始数据的异常
屏蔽业务的影响,不必改一次业务就需要重新接入数据
3、怎样分层
理论上数据分为三个层,数据运营层、数据仓库层和数据产品层。
ODS
Operate data store,操作数据存储,是最接近数据源中数据的一层,数据源中的数据,经过抽取、洗净、传输,也就说传说中的ETL之后,装入本层。本层的数据,总体上大多是按照源头业务系统的分类方式而分类的。
例如这一层可能包含的数据表为:人口表(包含每个人的身份证号、姓名、住址等)、机场登机记录(包含乘机人身份证号、航班号、乘机日期、起飞城市等)、银联的刷卡信息表(包含银行卡号、刷卡地点、刷卡时间、刷卡金额等)、银行账户表(包含银行卡号、持卡人身份证号等)等等一系列原始的业务数据。这里我们可以看到,这一层面的数据还具有鲜明的业务数据库的特征,甚至还具有一定的关系数据库中的数据范式的组织形式。
但是,这一层面的数据却不等同于原始数据。在源数据装入这一层时,要进行诸如去噪(例如去掉明显偏离正常水平的银行刷卡信息)、去重(例如银行账户信息、公安局人口信息中均含有人的姓名,但是只保留一份即可)、提脏(例如有的人的银行卡被盗刷,在十分钟内同时有两笔分别在中国和日本的刷卡信息,这便是脏数据)、业务提取、单位统一、砍字段(例如用于支撑前端系统工作,但是在数据挖掘中不需要的字段)、业务判别等多项工作。
ODS层数据的来源方式:
业务库
经常会使用sqoop来抽取,比如我们每天定时抽取一次。在实时方面,可以考虑用canal监听mysql的binlog,实时接入即可。
埋点日志
线上系统会打入各种日志,这些日志一般以文件的形式保存,我们可以选择用flume定时抽取,也可以用用spark streaming或者storm来实时接入,当然,kafka也会是一个关键的角色。
其它数据源
这和具体的业务相关。
DW
Data warehouse,数据仓库层。在这里,从ODS层中获得的数据按照主题建立各种数据模型。例如以研究人的旅游消费为主题的数据集中,便可以结合航空公司的登机出行信息,以及银联系统的刷卡记录,进行结合分析,产生数据集。在这里,我们需要了解四个概念:维(dimension)、事实(Fact)、指标(Index)和粒度( Granularity)。
App
该层主要是提供数据产品和数据分析使用的数据,一般会存放在es、mysql等系统中供线上系统使用,也可能会存在Hive或者Druid中供数据分析和数据挖掘使用。 比如我们经常说的报表数据,或者说那种大宽表,一般就放在这里。
4、Sqoop数据导入
4.1 常规导入
以追加方式导入数据,指定map并行度是1,字段分隔符为'001'。
sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath --num-mappers 1 #使用1个mapper --table $oralceTableName --columns $columns --fields-terminated-by '\001'
4.2 并行导入
如果指定--m为1的话,使用的是1个map进行导入,不能发挥集群的并行计算能力。可以通过增加数量达到同时启动多个mapper个数实现并行导入的目的。
sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath --num-mappers 4 #使用4个mapper --table $oralceTableName --columns $columns --fields-terminated-by '\001' --where "data_desc='2011-02-26'"
注意,并行导入时sqoop会先在主键字段上执行max和min的操作,将所有记录切割成指定的份数,然后再按照每个切片的数据范围进行同时导入。过程大致如下:
--查询主键的极值select max(id) as max, select min(id) as min from table [where 如果指定了where子句];--按照极值确定各自的处理范围select * from table where 0 <= id < 250;select * from table where 250 <= id < 500;select * from table where 500 <= id < 750;select * from table where 750 <= id < 1000;
如果数据库没有主键,则需要指定一个能够切割的字段进行处理,并且该字段值具有比较明显的切割线,即极值不能相等,否则无法进行切割。如下所示:
sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath --num-mappers 4 #使用4个mapper --table $oralceTableName --columns $columns --fields-terminated-by '\001' --split-by clientip --where "data_desc='2011-02-26'"
4.3 增量导入
增量导入有两种方式,按照指定的值和时间戳。本质上就是按照字段类型是整型还是时间戳类型进行导入。只导入比指定的值还大的记录数。增量导入使用于字段值是自增类型或是时间戳自增类型。
sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath --num-mappers 4 #使用4个mapper --table $oralceTableName --columns $columns --fields-terminated-by '\001' --incremental append #增量数据 --check-column num_iid #检查列 --last-value 0 #最后的值
按时间戳导入:
sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath --num-mappers 4 #使用4个mapper --table $oralceTableName --columns $columns --fields-terminated-by '\001' --incremental lastmodified # --check-column created # --last-value '2012-02-01 11:0:00'