尚硅谷大数据实战_电商用户行为分析(项目开发实战)学习,原始项目使用scala,自己尝试用java重写。也会结合官方文档,介绍一些api的用处。
项目整体介绍
项目主要模块
基于对电商用户行为数据的基本分类,我们可以发现主要有以下三个分析方向:
- 热门统计
利用用户的点击浏览行为,进行流量统计、近期热门商品统计等。
- 偏好统计
利用用户的偏好行为,比如收藏、喜欢、评分等,进行用户画像分析,给出个性化的商品推荐列表。
- 风险控制
利用用户的常规业务行为,比如登录、下单、支付等,分析数据,对异常情况进行报警提示。
本项目限于数据,我们只实现热门统计和风险控制中的部分内容,将包括以下五大模块:实时热门商品统计、实时流量统计、市场营销商业指标统计、恶意登录监控和订单支付失效监控,其中细分为以下9个具体指标:
[
由于对实时性要求较高,我们会用flink作为数据处理的框架。在项目中,我们将综合运用flink的各种API,基于EventTime去处理基本的业务需求,并且灵活地使用底层的processFunction,基于状态编程和CEP去处理更加复杂的情形。
数据源解析
行为数据UserBehavior
字段名 | 数据类型 | 说明 |
---|---|---|
userId | Long | 加密后的用户ID |
itemId | Long | 加密后的商品ID |
categoryId | Int | 加密后的商品所属类别ID |
behavior | String | 用户行为类型,包括(‘pv’, ‘’buy, ‘cart’, ‘fav’) |
timestamp | Long | 行为发生的时间戳,单位秒 |
web日志数据
字段名 | 数据类型 | 说明 |
---|---|---|
ip | String | 访问的 IP |
userId | Long | 访问的 user ID |
eventTime | Long | 访问时间 |
method | String | 访问方法 GET/POST/PUT/DELETE |
url | String | 访问的 url |
热门时事商品统计
基本需求
- 统计近一小时热门商品,每五秒钟更新一次
- 热门数用浏览度pv来衡量
解决思路
- 过滤出用户行为中的pv
- 构建滑动窗口
按照商品id进行分区
.keyBy(“itemid”)
设置时间窗口
.timeWindow(Time.minutes(60),Time.minutes(5)) 滑动窗口
时间窗口左闭右开,同一份数据可以发送给满足条件的多份窗口
窗口聚合
.aggregate(new CountAgg(),new WindowResultFunction())
new CountAgg():定义聚合规则
new WindowResultFunction():定义输出的数据结构
实时流量统计–热门页面
基本需求
- 从web服务器日志中,统计实时热门访问页面
- 统计每分钟ip访问量,取出访问量最大的五个地址,每五秒更新一次
解决思路
- 将日志中的时间转换为时间戳
- 构建滑动窗口
市场营销分析–APP市场推广统计
基本需求
- 统计APP市场推广的数据指标
- 按照不同的推广渠道,分别统计数据
解决思路
- 通过滤过,按照不同渠道进行统计
- 自定义processFunction
市场营销分析–页面广告统计
基本需求
- 按照不同省份,统计每小时页面访问量,五秒钟统计一次
- 对于频繁的点击行为进行过滤,放入黑名单
解决思路
- 滑动窗口
- 利用processFunction进行黑名单过滤
其实需求的具体细节还有很多,代码实现中再展开。
项目编写
热门商品统计
数据分析
1 | // userID,itemId,categoryId,mode,timeStamp |
定义数据输入输出的结构
1 | // input structure |
Watermarks
为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。其通常通过使用 TimestampAssigner
API 从元素中的某个字段去访问/提取时间戳。
时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 WatermarkGenerator
来配置 watermark 的生成方式。
使用 Flink API 时需要设置一个同时包含 TimestampAssigner
和 WatermarkGenerator
的 WatermarkStrategy
。WatermarkStrategy
工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。
1 | // 设置水印,处理乱序数据 |
AggregateFunction自定义聚合规则
AggregateFunction比ReduceFunction更加通用,它有三个参数:输入类型(IN),累加器类型(ACC)和输出类型(OUT)。
1 | class CountAgg implements AggregateFunction<UserBehavior,Long,Long> { |
WindowFunction自定义窗口处理元素的规则
1 |
|
处理函数(ProcessFunctions)
ProcessFunction
将事件处理与 Timer,State 结合在一起,使其成为流处理应用的强大构建模块。 这是使用 Flink 创建事件驱动应用程序的基础。它和 RichFlatMapFunction
十分相似, 但是增加了 Timer。
这里展示了其中一种ProcessFunction。
1 | public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction { |
1 | class TopNHotItems extends KeyedProcessFunction<Long,ItemViewCount,String> { |
完整代码
注释还是写得很详细的,层层递进
1 | import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
第一个模块的代码,写得比较详细,之后的模块在文中就只写核心部分了。
数据源改为kafka
1 | Properties properties = new Properties(); |
自定义kafka生产者
可以从文件读取信息并不断发送,便于测试
1 | public class myKafkaProducer { |
实时流量统计
页面浏览量统计
每隔5秒,输出最近10分钟内访问量最多的前N个URL。
套路其实是一样的,就当作复习重新写一遍就可以了,这里就不贴代码了。
网站浏览总量(PV)
统计每小时pv
其实就是一个word count,代码也不贴了,都在仓库里。
独立访客数统计(UV)
这里涉及到一个去重的操作,flink本身没有distinct算子,这比较出乎意料,当前场景,有如下几种去重的方式。
在展示去重方法之前,需要先指出一个api的要点,就是WindowedStream/AllWindowedStream,这两者最后输出的都是DataStream,可以apply在有分区和没分区的窗口中,效果虽然没有process这么强力,但还是不错的。
方法一
利用set进行去重
1 | public class UniqueVisitor { |
方法二
利用mapState,思路和set差不多
可以看到这个processFunction并没有ontimer方法,因为keyedProcessFunction是ProcessFunction的扩展,可以在onTimer获取timer的key (通过context.getCurrentKey方法),而这个方法并不是。
1 | public class UniqueVisitor { |
方法三 布隆过滤
上两种方法都需要用到内存在存储元素,要是数据量很大,会遇到资源不够的情况,这里采用布隆过滤器。
这里有用到trigger触发器这个api,需要简单了解的,可以去这篇文章,链接
1 | public class UvWithBloomFilter { |
市场营销商业指标
APP市场推广统计
主要有两个知识点,一个是自定义数据源,这对测试来说,是一个很好的方式
1 | class SimulateEventSource extends RichParallelSourceFunction<MarketingUserBehavior> { |
另一个,在java中,keyby设计到的返回值要是过于复杂,如果不想定义pojo的话,还是要使用keyselector,否则可能会遇到错误。
完整代码如下
1 | // 定义一个输入数据的样例类 保存电商用户行为的样例类 |
页面广告分析
按照省份划分点击量,比较常规的做法,具体代码详见代码仓库。
过滤黑名单
相比上一个功能多了一个过滤动作,具体过滤规则由需求决定。
这里使用了旁路输出getSideOutput,这次接触到一个实际案例,收获还是很大的。
1 | //定义侧输出流报警信息样例类 |
恶意登录监控
方法一
最朴素的方法
1 | // 输入的登录事件样例类 |
这有两个很大的问题,计算的是最近两秒内的情况,是写死的,不能改,同时没有考虑到乱序的信息流,这个时候我们就需要使用到flink的cep了。
FlinkCEP - Complex event processing for Flink
FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink.
It allows you to detect event patterns in an endless stream of events, giving you the opportunity to get hold of what’s important in your data.
一个或多个由简单事件构成的事件流通过简单的规则匹配,然后输出用户想得到的数据–满足规则的复杂事件。
具体情况查看另一篇文章,链接
方法二 cep
1 | public class LoginFailWithCep { |
这算一个很简单的cep,做了个简单的入门,其实需要注意的点还很多。
订单支付实时监控
在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。在接下来的内容中,我们将实现这两个需求。
在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如15分钟),如果下单后一段时间仍未支付,订单就会被取消。
方法一 cep
这里的重点我们是要学一下cep中select函数的方法,源码如下
1 | /** |
可以发现select函数还可以返回超时的时间流,比我们想象的强大得多。
1 | // 定义输入的订单事件样例类 |
方法二 直接使用状态编程
遇到这种问题,直接理清思路就好,就先不写了。
来自两条流的订单交易匹配
对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做合并处理。这里我们利用connect将两条流进行连接,然后用自定义的CoProcessFunction进行处理。
方法一:connect + CoProcess
这里使用了流之间的connect
1 | // 输入输出的样例类 |
方法二:join
1 | public class OrderPayTxMatchWithJoin { |
本文链接: http://woaixiaoyuyu.github.io/2021/08/02/%E7%94%B5%E5%95%86%E7%94%A8%E6%88%B7%E8%A1%8C%E4%B8%BA%E5%88%86%E6%9E%90/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!