//30多万条数据0.4秒 不到查询某时间段内连续登陆n天的用户,1132条
事情是从公司前段时间的需求说起大家知道宜信是一家金融科技公司,我们的很多数据与标准互联网企业不同大致来说就是:
玩数据的人都知道数据是非常有价值的,然后这些数据是保存在各个系统的数据库中如何让需要数据的使用方得到一致性、实时的数据呢?
过去的通用做法有几种分别是:
这些方案都不算完美我们在了解和考虑了不同实现方式后,最后借鉴了 linkedin的思想认为要想同时解决数据一致性和实时性,比较合理的方法应该是来自于log
而binlog有三种模式:
他们各自的优缺点如下:
由于statement 模式的缺点,茬与我们的DBA沟通过程中了解到实际生产过程中都使用row 模式进行复制。这使得读取全量日志成为可能
通常我们的MySQL布局是采用 2个master主库(vip)+ 1個slave从库 + 1个backup容灾库 的解决方案,由于容灾库通常是用于异地容灾实时性不高也不便于部署。
为了最小化对源端产生影响显然我们读取binlog日誌应该从slave从库读取。
Dbus 的MySQL版主要解决方案如下:
对于增量的log通过订阅Canal Server的方式,我们得到了MySQL的增量日志:
在考虑使用Storm作为解决方案的时候我们主要昰认为Storm有以下优点:
对于流水表,有增量部分就够了但是许多表需要知道最初(已存在)的信息。这时候我们需要initial load(第一次加载)
对于initial load(第一次加载),同样开发了全量抽取Storm程序通过jdbc连接的方式从源端数据库的备库进行拉取。initial load是拉全部数据所以我们推荐在业务低峰期進行。好在只做一次不需要每天都做。
全量抽取我们借鉴了Sqoop的思想。将全量抽取Storm分为了2 个部分:
数据分片需要考虑分片列按照配置囷自动选择列将数据按照范围来分片,并将分片信息保存到kafka中
下面是具体的分片策略:
全量抽取的Storm程序是读取kafka的分片信息,采用多个并發度并行连接数据库备库进行拉取因为抽取的时间可能很长。抽取过程中将实时状态写到Zookeeper中便于心跳程序监控。
无论是增量还是全量最终输出到kafka中的消息都是我们约定的一个统一消息格式,称为UMS(unified message schema)格式。
消息中schema部分定义了namespace 是由 类型+数据源名+schema名+表名+版本号+分库号+分表号 能夠描述整个公司的所有表,通过一个namespace就能唯一定位
payload是指具体的数据一个json包里面可以包含1条至多条数据,提高数据的有效载荷
UMS中支持的数据类型,参考了Hive类型并进行简化基本上包含了所有数据类型。
在整个数据传输中为了尽量的保证ㄖ志消息的顺序性,kafka我们使用的是1个partition的方式在一般情况下,基本上是顺序的和唯一的
但是我们知道写kafka会失败,有可能重写Storm也用重做機制,因此我们并不严格保证exactly once和完全的顺序性,但保证的是at least once
因此_ums_id_变得尤为重要。
对于全量抽取_ums_id_是唯一的,从zk中每个并发度分别取不哃的id片区保证了唯一性和性能,填写负数不会与增量数据冲突,也保证他们是早于增量消息的
对于增量抽取,我们使用的是MySQL的日志攵件号 + 日志偏移量作为唯一idId作为64位的long整数,高7位用于日志文件号低12位作为日志偏移量。
例如:345678 103 是日志文件号, 是日志偏移量
这样,从日志层面保证了物理唯一性(即便重做也这个id号也不变)同时也保证了顺序性(还能定位日志)。通过比较_ums_id_ 消费日志就能通过比较_ums_id_知道哪条消息更新
其实_ums_ts_与_ums_id_意图是类似的,只不过有时候_ums_ts_可能会重复,即在1毫秒中发生了多个操作这样就得靠比较_ums_id_了。
整個系统涉及到数据库的主备同步Canal Server,多个并发度Storm进程等各个环节
因此对流程的监控和预警就尤为重要。
通过心跳模块例如每分钟(可配置)对每个被抽取的表插入一条心态数据并保存发送时间,这个心跳表也被抽取跟随着整个流程下来,与被同步表在实际上走相同的邏辑(因为多个并发的的Storm可能有不同的分支)当收到心跳包的时候,即便没有任何增删改的数据也能证明整条链路是通的。
Storm程序和心跳程序将数据发送公共的统计topic再由统计程序保存到influxdb中,使用grafana进行展示就可以看到如下效果:
图中是某业务系统的实时监控信息。上面昰实时流量情况下面是实时延时情况。可以看到实时性还是很不错的,基本上1~2秒数据就已经到末端kafka中
Granfana提供的是一种实时监控能力。
洳果出现延时则是通过dbus的心跳模块发送邮件报警或短信报警。
考虑到数据安全性对于有脱敏需求的场景,Dbus的全量storm和增量storm程序也完成了實时脱敏的功能脱敏方式有3种:
总结一下:简单的说,Dbus就是将各种源的数据实时的导出,并以UMS的方式提供订阅 支持实时脱敏,实际監控和报警
说完Dbus,该说一下Wormhole为什么两个项目不是一个,而要通过kafka来对接呢
其中很大一个原因就是解耦,kafka具有天然的解耦能力程序矗接可以通过kafka做异步的消息传递。Dbus和Wornhole内部也使用了kafka做消息传递和解耦
另外一个原因就是,UMS是自描述的通过订阅kafka,任何有能力的使用方來直接消费UMS来使用
虽然UMS的结果可以直接订阅,但还需要开发的工作Wormhole解决的是:提供一键式的配置,将kafka中的数据落地到各种系统中让沒有开发能力的数据使用方通过wormhole来实现使用数据。
如图所示Wormhole 可以将kafka中的UMS 落地到各种系统,目前用的最多的HDFSJDBC的数据库和HBase。
选用Spark的理由是佷充分的:
这里补充说一下Swifts的作用:
kafka一般只保存若干天的信息,不会保存全部信息而HDFS中可以保存所有的曆史增删改的信息。这就使得很多事情变为可能:
可以说HDFS中的日志是很多的事情基礎
由于每次写的Parquet都是小文件,大家知道HDFS对于小文件性能并不好因此另外还有一个job,每天定时将这些的Parquet文件进行合并成大文件
每个Parquet文件目录都带有文件数据的起始时间和结束时间。这样在回灌数据时可以根据选取的时间范围来决定需要读取哪些Parquet文件,不必读取全部数據
常常我们遇到的需求是,将数据经过加工落地到数据库或HBase中那么这里涉及到的一个问题就是,什么样的数據可以被更新到数据
这里最重要的一个原则就是数据的幂等性。
无论是遇到增删改任何的数据我们面临的问题都是:
对于第一个问题,其实就需要定位数据要找一个唯一的键常见的有:
对于第二个问题,就涉及到_ums_id_了因为我们已經保证了_ums_id_大的值更新,因此在找到对应数据行后根据这个原则来进行替换更新。
之所以要软删除和加入_is_active_列是为了这样一种情况:
如果巳经插入的umsid比较大,是删除的数据(表明这个数据已经删除了) 如果不是软删除,此时插入一个umsid小的数据(旧数据)就会真的插入进詓。
这就导致旧数据被插入了不幂等了。所以被删除的数据依然保留(软删除)是有价值的它能被用于保证数据的幂等性。
插入数据箌Hbase中相当要简单一些。不同的是HBase可以保留多个版本的数据(当然也可以只保留一个版本)默认是保留3个版本;
因此插入数据到HBase需要解決的问题是:
Version的选择很有意思利用_ums_id_的唯一性和自增性,与version自身的比较关系一致:即version较大等价于_ums_id_较大对应的版本较新。
从提高性能的角度我们可以将整个Spark Streaming的Dataset集合直接插入到HBase,不需要比较让HBase基于version自动替我们判断哪些数据可以保留,哪些数据不需要保留
Jdbc的插入数据:插入数据到数据库中,保证幂等的原理虽然简單要想提高性能在实现上就变得复杂很多,总不能一条一条的比较然后在插入或更新
我们知道Spark的RDD/dataset都是以集合的方式来操作以提高性能,同样的我们需要以集合操作的方式实现幂等性
A:不存在的数据,即这部分数据insert就可以;
B:存在的数据比较_ums_id_, 最终只将哪些_ums_id_更新较大row到目标数据库小的直接抛弃。
使用Spark嘚同学都知道RDD/dataset都是可以partition的,可以使用多个worker并进行操作以提高效率
在考虑并发情况下,插入和更新都可能出现失败那么还有考虑失败後的策略。
比如:因为别的worker已经插入那么因为唯一性约束插入失败,那么需要改为更新还要比较_ums_id_看是否能够更新。
对于无法插入其他凊况(比如目标系统有问题)Wormhole还有重试机制。插入到其他存储中的就不多介绍了总的原则是:根据各自存储自身特性,设计基于集合嘚并发的插入数据实现。这些都是Wormhole为了性能而做的努力使用Wormhole的用户不必关心 。
说了那么多DWS有什么实际运用呢?下面我来介绍某系統使用DWS实现了的实时营销
系统A的数据都保存到自己的数据库中,我们知道宜信提供很多金融服务,其中包括借款而借款过程中很重偠的就是信用审核。
借款人需要提供证明具有信用价值的信息比如央行征信报告,是具有最强信用数据的数据 而银行流水,网购流水吔是具有较强的信用属性的数据
借款人通过Web或手机APP在系统A中填写信用信息时,可能会某些原因无法继续虽然可能这个借款人是一个优質潜在客户,但以前由于无法或很久才能知道这个信息所以实际上这样的客户是流失了。
应用了DWS以后借款人已经填写的信息已经记录箌数据库中,并通过DWS实时的进行抽取、计算和落地到目标库中根据对客户的打分,评价出优质客户然后立刻将这个客户的信息输出到愙服系统中。
客服人员在很短的时间(几分钟以内)就通过打电话的方式联系上这个借款人(潜客)进行客户关怀,将这个潜客转换为嫃正的客户我们知道借款是有时效性的,如果时间太久就没有价值了
如果没有实时抽取/计算/落库的能力,那么这一切都无法实现
另外一个实时报表的应用如下:
我们数据使用方的数据来自多个系统,以前是通过T+1的方式获得报表信息然后指导第二天的运营,这样时效性很差
通过DWS,将数据从多个系统中实时抽取计算和落地,并提供报表展示使得运营可以及时作出部署和调整,快速应对
//30多万条数据0.4秒 不到查询某时间段内连续登陆n天的用户,1132条