离线技术处理总结

前言

可怜数目处理技术下:

  • [x] 电信运营商
  • 数营销:房地产营销、运营商时代(汇聚用户作为)
  • [x] 互联网用户作为分析
  • 数量令运营:漏斗模型、反作弊
  • [x] P2P风控系统
  • 私征信(人民银行)、各大银行贷款记录

我们在举行的:

离线:

image

实时:

image

一样、离线数据处理简介

(1)分布式计算(mapreduce)

数据仓库工具hive(基于磁盘存储)构建以mapreduce之上,支持条件SQL的款式查询存储于HDFS上之数,原理就是是解析HiveQL,经过编译后(SQL转换成为MapReduce任务)生成执行计划(多只Stage依赖)

image

(2)分布式存储(hdfs)

  1. 凡一律种允许文件
    通过网在多贵主机(廉价商用机)上享用的文件系统,可让多机器及的大都用户享受文件以及存储空间。
  2. 通透性。让实际是经过网来聘文件之动作,由程序和用户看来,就如是造访当地的磁盘一般。
  3. 容错。即使系统面临起某些节点脱机,整体来说系统依然可持续运行而非见面来数量损失。
  4. 适用于 一差写副、 多次查询的情状,不支持连发写情况,小文件未适宜
![](https://github.com/guofei1219/RiskControl/blob/master/src/main/resources/jpg/hadoop_hdfs.png?raw=true)
image

(3)分布式资源调度(yarm)

通用资源管理网,可为上层应用提供统一之资源管理和调度,它的引入为集群在利用率、资源统一保管及多少共享等地方带来了宏伟利益

image

亚、数据导入(sqoop)

(1)数据导入方式对待

  1. mysql表导出也文件形式,再以hive load加载命令将数据导入Hadoop平台
    瑜:方式简单
    症结:数据可靠性得不至保障,扩展性低
  2. 开源工具Sqoop,基于MapReuce
    瑜:扩展性高

(2)介绍

Sqoop是一个为此来将Hadoop和涉项目数据库被的数量交互转换的家伙,可以拿一个关联项目数据库(例如
: MySQL ,Oracle
,Postgres等)中的数额导进到Hadoop的HDFS中,也足以拿HDFS的多寡导进到关系项目数据库被。
对此某些NoSQL数据库它也提供了连接器。Sqoop,类似于外ETL工具,使用初数据模型来判定数据类型并在数量从数据源转移至Hadoop时保证项目安全的数目处理。Sqoop专为甚数量批量传设计,能够分割数据集并创建Hadoop任务来拍卖每个区块。

(3)个贷数据迁入Hadoop平台

  • [x] 基为只贷表元数据变化查询SQL,替换Hive敏感字符并统一数据类型
  • 换行符、制表符、#等等统一替换成空格符
  • 强时日格式(timestamp、date、datetime)转换为Hive支持的合并时间格式
  • [x] 开发迁移脚论
  • 统一时间库
  • 多少校验
  • 动态链接个贷表
转变查询SQL脚本
SELECT 
    CASE 
        WHEN data_type  IN('varchar','text','char') then
            CONCAT('REPLACE(REPLACE(REPLACE(REPLACE(', column_name,',CHAR(13),'' ''),CHAR(10),'' ''),CHAR(9),'' ''),CHAR(44),'' ''),')
        when data_type in('timestamp','date','datetime') then
            CONCAT('DATE_FORMAT(', column_name, ',''%Y-%m-%d %h:%i:%s''),')
        else
            CONCAT(column_name,',') 
    END
FROM COLUMNS WHERE table_name='crm_repay_dk'
查询SQL样例
select
id,
contract_id,
REPLACE(REPLACE(REPLACE(REPLACE(account_number,CHAR(13),' '),CHAR(10),' '),CHAR(9),' '),CHAR(44),' '),
repay_method,
year,
DATE_FORMAT(first_repay,'%Y-%m-%d %h:%i:%s'),
status,
repay_status,
REPLACE(REPLACE(REPLACE(REPLACE(accntnm,CHAR(13),' '),CHAR(10),' '),CHAR(9),' '),CHAR(44),' '),
REPLACE(REPLACE(REPLACE(REPLACE(accntno,CHAR(13),' '),CHAR(10),' '),CHAR(9),' '),CHAR(44),' '),
REPLACE(REPLACE(REPLACE(REPLACE(bank,CHAR(13),' '),CHAR(10),' '),CHAR(9),' '),CHAR(44),' '),
REPLACE(REPLACE(REPLACE(REPLACE(branchnm,CHAR(13),' '),CHAR(10),' '),CHAR(9),' '),CHAR(44),' '),
loan_type,
from #tablename  where $CONDITIONS
mysql导入hdfs脚本
sqoop import \
--connect jdbc:mysql://$server:$port/$mysql_database?tinyInt1isBit=false \
--username $username \
--password $password  \
--split-by id \
--query "$excute_sql" \
--target-dir $tmp_partition  

echo "load data inpath '$tmp_partition/$table_suff/*' into table $hdb.pl_$table partition(dt='"$partition"')"
#exit
if [ $? -eq 0 ];then
    echo "数据mysql导入hive分区$year$month成功"
    echo "$tmp_partition/$table/*"
    hive -hiveconf hive.exec.parallel=true -e "use $hdb;alter table pl_$table add if not EXISTS partition(dt='"$partition"');
        load data inpath '$tmp_partition/*' into table $hdb.pl_$table partition(dt='"$partition"');"
else
    echo "$tmp_partition 分区下数据为空"
fi

(4)碰到的问题

  • mysql数据导入hive有些许种办法,第一栽是冲全表,第二种植是根据查询条件

//基于全表导入
/*
优点:方法简单
缺点:1.导入过程中不能做额外逻辑操作
      2.导入到hdfs中目录结构为 /table/文件,导致hive外部表指定的location失效,解决方法可以使用中间临时存储区过度
      3.关系型数据控中如果有特殊字符可能会导致hive表错列,因为hive建表时会执行行分隔符和列分隔符,当关系型数据库表字段内容含有已指定分隔符就会导致hive表错列
*/
sqoop import \
--connect jdbc:mysql://$server:$port/$mysql_database?tinyInt1isBit=false \
--username sqoop \
--password sqoop 
--table ${table_suff} \
--warehouse-dir $tmp_partition

//基于查询SQL导入

优点:可以进行数据过滤转换等操作
缺点:暂时没发现啥缺点

sqoop import \
--connect jdbc:mysql://$server:$port/$mysql_database?tinyInt1isBit=false \
--username $username \
--password $password  \
--split-by id \
--query "$excute_sql" \
--target-dir $tmp_partition
  • 开场对数码探测不成功,导致对只贷部分表的有的字段内容涵盖的特殊字符了解未到位,比如换行符/制表符/#/逗号,导致我们导入到Hive中会错列
  • mysql 数据类型tinyint转换至hive中也null
    解决智:tinyInt1isBit=false
  • 原生sqoop只支持数据导入单分区,要支持多分区可以先行将数量拉至hdfs多分区目录下(/database/table/year=2016/month=10/day=27/),再采取load命令加载数据

老三、数据处理(hive/impala)

(1)数据仓库工具Hive

Hive建表

--放款合同信息表
CREATE EXTERNAL TABLE `pl_crm_account_mes_dk` (
  `id` int,
  `intopieces_id` int,
  `by_name` string,
  `is_by_name` string,
  `by_id_card` string,
  `addressee` string,
  `mail_province` int,
  `mail_city` int,
  `mail_address` string,
  `account_time` int,
  `account_staff_id` int,)
PARTITIONED BY ( 
  `dt` string COMMENT 'by date')
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://hadb/rmdb/pl_crm_account_mes_dk';

数据加载

加overwrite 数据覆盖对应分区,不加overwrite 会追加数据到对应分区
--load data [local] inpath 'hdfs路径/本地文件路径' [overwrite] into 库名.表名 partition(分区名='对应分区')
注:这个操作是移动数据,而不是复制数据
小心问题:
  1. 数量存放的门道层次要与发明的分区一致;
  2. 倘若分区表没有新增分区,即使目标路径下自家经发数据了,但仍查不顶数。
隔符问题:
  1. 隔符默认只来单个字符。如果产生差不多只字符,默认取第一独字符作为分隔符。
数据类型对承诺问题:
  1. Load数据数据,字段类型不可知互相转化时,查询结果返回NULL。而其实的数还是在。
  2. Select查询插入,字段类型不克互相转化时,插入数据也NULL。而其实的多寡吧也NULL。
其他:
  1. Select查询插入数据,字段值顺序要同发明中字段顺序一致,名称可不一样。
  2. Hive在数量加载时无开检查,查询时检查。
  3. 表面分区表要加上分区才能见到数据。

hive 中 Order by, Sort by ,Dristribute by,Cluster By 的意和对比

order by

本某些字段排序
样例

select col1,other...
from table
where conditio
order by col1,col2 [asc|desc]

注意
order by后面可以出差不多列进行排序,默认按字典排序
order by也大局排序
order
by需要reduce操作,且只出一个reduce,与部署无关。数据量很可怜时,慎用。

Sort排序

sort by col – 按照col列管数据排序

select col1,col2 from M
distribute by col1
sort by col1 asc,col2 desc

两头结合出现,确保每个reduce的输出都是板上钉钉的。

distribute by与group by对比

还是比照key值划分数据
都使用reduce操作
唯不同之是distribute by只是特的发散数据,而group
by把同key的数目聚集到同,后续要是集结操作。

order by与sort by 对比

order by是大局排序
sort
by只是承保每个reduce上面输出的数量有序。如果单发一个reduce时,和order
by作用一样。

cluster by

拿来相同值的数额聚集到联合,并排序。
效果相当价于distribute by col sort by col
cluster by col <==> distribute by col sort by col

hive自定义函数

自打定义函数介绍

当hive提供的函数不可知满足我们的需要时,需要开发者自己付出自定义函数,自定义函数分为以下三像样:

  • [x] UDF(user defined function)
  • 用户从定义函数针对单条记录,解决输入一行输出一行需求。
  • 案例:根据用户访问日志获得客户端设备类

public class UADeviceType {
    public Text evaluate(Text url_text) {
        if(null==url_text){
            return new Text();
        }
        DeviceType dt = null;
        try {
            UserAgent userAgent = UserAgent.parseUserAgentString(url_text.toString());
            dt = userAgent.getOperatingSystem().getDeviceType();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return new Text(dt.getName());
    }
    public static void main(String[] args) {
        UserAgent userAgent = UserAgent.parseUserAgentString("Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1");
        System.out.println(userAgent.getOperatingSystem().getDeviceType().getName());
    }
}
  • [x] UDTF(User-Defined Table-Generating Functions)
  • 因而来解决 输入一行输出多执行(On-to-many maping) 的要求。
  • 案例:从日志表抽取优惠券券码生成优惠券明细表

hive --hiveconf mapreduce.job.queuename=root.offline.hdp_daojia.normal -v -e "use hdp_daojia_defaultdb;
   insert overwrite table dj58_f_coupon  partition(dt=$1)
    select coupon_code,cookieid,urlfields['hmsr'],order_id,operation_status,mobile,urlfields['znsr'] 
    from dj58_o_web lateral view explode(split(urlfields['code'],',')) adtable as coupon_code 
    where dt=$1;"
  • [x] UDAF(user defined aggregation function)
  • 用户从定义聚合函数对记录集合,输入多实行输出一行的求
自从定义函数开发流程(UDTF和UDAF)
  1. 从今定义一个Java类
  2. 继承UDF类
  3. 重写evaluate方法
  4. 打成jar包
  5. 在hive执行add jar方法
  6. 每当hive执行创建模板函数
  7. hql中使用
    案例:

hive --hiveconf mapreduce.job.queuename=root.offline.hdp_daojia.normal -hiveconf hive.exec.parallel=true -v -e "
add jar hdfs://hdp-58-cluster/home/hdp_58dp/udf/udf-utils.jar;
create temporary function result_update as 'com.dj58.data.hive.udf.ResultsUpdate';
create temporary function spend_time as 'com.dj58.data.hive.udaf.SpendTime';
use hdp_daojia_defaultdb;
--$1 优惠券活动下单明细统计
select result_update('app_coupon/a_app_coupon_order.properties',
    $1,hmsr,znsr,cateid,order_users,orders,finished_order_users,finished_orders,finished_order_price_sum,finished_corder_actual_price_sum)
from(
    select (case when t2.hmsr!='' then t2.hmsr else 'none' end) hmsr,--渠道
            (case when t2.znsr!='' then t2.znsr else 'none' end) znsr,--站内渠道
            t1.cateid,--业务类型
            count(distinct(case when t1.createtime='$2' and t2.operation_status = 'ordersuccess' then t1.uid end)) order_users,--渠道下单用户数
            count(distinct(case when t1.createtime='$2' and t2.operation_status = 'ordersuccess' then t2.order_id end)) orders,--渠道下单数
            count(distinct(case when t1.servicetime='$2' and t1.bistate in(3,8) then t1.uid end)) finished_order_users,--渠道完成订单用户数
            count(distinct(case when t1.servicetime='$2' and t1.bistate in(3,8) then t1.orderid end)) finished_orders,--渠道完成订单数
            sum((case when t1.bistate in(3,8) and t1.price is not null then t1.price else 0 end)) finished_order_price_sum, --渠道带来的完成订单的实收金额总和
            sum((case when t1.bistate in(3,8) and t1.actual_price is not null then t1.actual_price else 0 end))finished_corder_actual_price_sum--渠道带来的完成订单实际金额加和    
    from(
        select uid,orderid,bistate,price,actual_price,to_date(createtime) createtime,to_date(servicetime) servicetime,cateid
        from hdp_daojia_defaultdb.dj58_f_mysql_order
        where uid!='' and orderid!='' and to_date(createtime)='$2' or to_date(servicetime)='$2'
    )t1
    left outer join(
        select hmsr,znsr,order_id,operation_status,mobile
        from hdp_daojia_defaultdb.dj58_f_web_orderchannel
        where order_id !=''
    )t2
    on t1.orderid = t2.order_id
    group by t2.hmsr,t2.znsr,t1.cateid
    order by order_users desc
)t5;
"

(2)开源的Apache Hadoop UI系统Hue

HUE的使 B/S 代替C/S架构,不用数登陆 hive/hbase等客户端
Hue是一个可是快速开与调试Hadoop生态系统各种应用之一个基于浏览器的图形化用户接口。官网被起底表征,通过翻译原文简单询问一下Hue所支持之意义特色集合:

  1. 默认基于轻量级sqlite数据库管理会话数据,用户征与授权,可以从定义为MySQL、Postgresql,以及Oracle
  2. 基于文件浏览器(File Browser)访问HDFS
  3. 因Hive编辑器来出及运作Hive查询
  4. 支持因Solr进行检索的运用,并提供可视化的多寡视图,以及仪表板(Dashboard)
  5. 支持因Impala的以进行交互式查询

image

image

老三、数据导出(sqoop)

hive导入mysql脚本

#下载HDFS文件到本地目录
hadoop fs -get $basedir/$hive_table/dt=$year-$month-$day/ $resultDir
#进入本地文件目录
cd $resultDir/dt=$year-$month-$day
pwd
#合并文件
cat ./* > data.txt
#exit
#加载数据至mysql数据库
mysql -u root -p123456 -h$server -e "use '$mysql_database';
delete from $mysql_table where data_dt='$year-$month-$day';
load data local infile '$resultDir/dt=$year-$month-$day/data.txt' into table $mysql_table FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n';"

四、商业化hadoop平台CDH应用

hadoop是一个开源项目,所以众多商家以是基础进行商业化,Cloudera对hadoop做了对应的改观

image

五、作业调度体系

因 Linux 系统级别的 Crontab。

于 Job 数量庞大之景况下,Crontab
脚本的编辑,变得大复杂。其调度的过程为未可知透明化,让管理变得艰难。

Java 应用级别之 Quartz。

Quartz 虽然不要编写脚本,实现对应的调度 API
即可,然其调度过程不透明,不涵盖 Job 运行详情。需自行开发其功能。

其三正值调度平台对比

Oozie

Oozie时凡是托管在Apache基金会的,开源。配置的长河略发繁琐和复杂,配置相关的调度任务比较麻烦,然其可视化界面也非是那么的直观,另外,对UI界面要求于高的校友,此调度体系估计会于你失望。

Zeus

她是一个阿里巴巴开源Hadoop的功课平台,从Hadoop任务之调节运行及生育任务之周期调度,它支持任务之周生命周期。从该效果来拘禁,它支持以下任务:

  1. Hadoop的MapReduce任务调度运行
  2. Hive任务的调度运行
  3. Shell任务之运行
  4. Hive元数据的可视化展示查询和数量预览
  5. Hadoop任务的全自动调度

Azkaban

Azkaban提供了一个便于使的用户界面来保安和跟踪而的劳作流程。另外,Github上献的Azkaban调度体系的源码量不深,做二次开发难度不殊。其功效点涉及以下内容:

  1. 易用的Web UI
  2. 简简单单的Web和Http工作流的达标传
  3. 色工作区
  4. 工作流调度
  5. 模块化和插件化
  6. 证明和授权
  7. 用户作为跟
  8. 邮件告警失败与成功
  9. SLA告警
  10. 重启失败的Jobs
Azkaban使用
  • [x] 主页面

https://localhost:8443
留神是https,采用的凡jetty
ssl链接。输入账号密码azkaban/azkanban(如果你之前并未更改的言语)

image

首页有四独菜单

  • projects:最要害的有的,创建一个工,所有flows将当工程中运作。
  • scheduling:显示定时任务
  • executing:显示当前运作的任务
  • history:显示历史运行任务

要害介绍projects部分
率先创建一个工程,填写名称以及描述,比如o2olog。

image

Flows:工作流程,有差不多只job组成
Permissions:权限管理
Project Logs:工程日志

  1. 创造工程:

创立之前我们事先了解下次的涉及,一个工程分包一个或者多单flows,一个flow包含多只job。job是您想当azkaban中运作的一个历程,可以是概括的linux命令,可是java程序,也足以是扑朔迷离的shell脚本,当然,如果您安装相关插件,也堪运作插件。一个job可以因让外一个job,这种多个job和它的乘组成的图纸叫做flow。
job创建

创造job很简短,只要创造一个以.job结尾的文本文件就实施了,例如我们创建一个做事,用来用日志数据导入hive中(关于那个数据方面的东西,不在再次,可以解吧,将日志所待数导入的mysql中),我们创建o2o_2_hive.job

type=command
command=echo "data 2 hive"

一个简易的job就创造好了,解释下,type的command,告诉azkaban用unix原生命让去运作,比如原生命令或者shell脚本,当然为有外类别,后面说。

一个工不容许一味生一个job,我们本创造多独依赖job,这吗是使用azkaban的显要目的。

2、Flows创建

咱说了多个jobs和它的因组成flow。怎么开创依赖,只要指定dependencies参数就实施了。比如导入hive前,需要开展数据清洗,数据清洗前待上污染,上传之前要由ftp获取日志。
定义5个job:

  • o2o_2_hive.job:将保洁了的数目入hive库
  • o2o_clean_data.job:调用mr清洗hdfs数据
  • o2o_up_2_hdfs.job:将文件上传至hdfs
  • o2o_get_file_ftp1.job:从ftp1获取日志
  • o2o_get_file_fip2.job:从ftp2获取日志
    靠关系:
    3乘4暨5,2依赖3,1依赖2,4及5从未有过因关系。

o2o_2_hive.job

type=command
# 执行sh脚本,建议这样做,后期只需维护脚本就行了,azkaban定义工作流程
command=sh /job/o2o_2_hive.sh
dependencies=o2o_clean_data

o2o_clean_data.job

type=command
# 执行sh脚本,建议这样做,后期只需维护脚本就行了,azkaban定义工作流程
command=sh /job/o2o_clean_data.sh
dependencies=o2o_up_2_hdfs

o2o_up_2_hdfs.job

type=command
#需要配置好hadoop命令,建议编写到shell中,可以后期维护
command=hadoop fs -put /data/*
#多个依赖用逗号隔开
dependencies=o2o_get_file_ftp1,o2o_get_file_ftp2

o2o_get_file_ftp1.job

type=command
command=wget "ftp://file1" -O /data/file1

o2o_get_file_ftp2.job

type=command
command=wget "ftp:file2" -O /data/file2
  1. 将上述job打成zip包。上传:

image

image

点击o2o_2_hive进入流程,azkaban流程名称为最终一个从来不因之job定义的。

image

右边上是布置执行时流程或执行定时流程。

image

Flow view:流程视图。可以经用,启用某些job
Notification:定义任务成功还是失败是否发送邮件
Failure Options:定义一个job失败,剩下的job怎么执行
Concurrent:并行任务执行设置
Flow Parametters:参数设置。
1.执行同样不好
安装好上述参数,点击execute。

image

绿色代表成功,蓝色是运行,红色是没戏。可以查job运行时刻,依赖与日志,点击details可以查阅各个job运行情况。

image

2.定时实行

image

五、报表平台

image

image

image

参考文章:

  1. HDFS深入浅析:http://www.cnblogs.com/linuxprobe/p/5594431.html

  2. Hive
    SQL编译过程:http://tech.meituan.com/hive-sql-to-mapreduce.html

  3. Hadoop 新 MapReduce 框架 Yarn
    详解:http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-yarn/

  4. Sqoop安装配置以及示范:http://www.micmiu.com/bigdata/sqoop/sqoop-setup-and-demo/

  5. Hue安装配置实践:http://shiyanjun.cn/archives/1002.html

  6. Hadoop Hive
    sql语法详解:http://blog.csdn.net/hguisu/article/details/7256833

  7. Azkaban-开源任务调度程序(安装篇):http://www.jianshu.com/p/cc680380ca34

  8. Azkaban-开源任务调度程序(使用篇):http://www.jianshu.com/p/484564beda1d

  9. Hive高级查询(group by、 order by、 join等)
    :http://blog.csdn.net/scgaliguodong123\_/article/details/46944519

网站地图xml地图