当前位置:AIGC资讯 > 数据采集 > 正文

数据采集实操细节(Flume+Kafka+Flume形式)【重要】

使用Flume采集用户行为数据,并实现自定义Flume拦截器,将采集的数据发送到Kafka不同topic中,然后再通过Flume消费Kafka数据发送到HDFS上:

数据采集实操细节:

Flume方面:

选型:数据采集的选型是Flume+Kafka+Flume,因为Flume的put和take事务机制,所以它的数据采集准确性比较好,非常擅长日志格式数据的采集,同时接入Kafka是为了进行削峰和缓冲。

采集端配置:对于日志文件,我们保存30天。Flume的Source选择的是TailDir Source, 因为它支持断点续传的功能和读取多目录文件的功能。Channel采用的是memory channel,它虽然可能会丢失一些数据,但是速度快,在大量的日志数据情况下,是可以容忍小量数据丢失。Sink 配置的是Kafka,Kafka Topic设置3个,启动日志主题,错误日志主题,事件日志主题。

(因为是3台机器所以Kafka配置3个分区)

自定义Flume拦截器:另外我们还实现了两个FLume拦截器,一个是实现简单的日志过滤,比如时间戳不合法的,json数据不完整的,就把它舍弃;另外一个实现的是根据不同的日志类型发送到不同的kafka Topic中,它的实现是首先获取Flume接收消息头header,然后从json数据中的eventType取出相应的日志类型,将日志类型存储到header中,这样就实现不同事件发送到不同topic的功能。

用监控器监控Flume性能:为了监控flume的性能还用了监控器,可以查看put和take机制尝试次数和成功次数,如果put的尝试次数低于成功次数,说明flume集群性能有问题,那我们可以考虑优化flume,可以修改配置文件flume-env文件,把内存调大。(从默认1G调到4G)

Flume消费Kafka数据存储到HDFS:配置source为kafka,sink为hdfs

Kafka方面:

kafka的数据保存7天,副本数为2。Kafka前后以Flume分别作为生产者和消费者。

针对生产者,ack可以设置为0,虽然这样速度很快,但很容易丢失数据;设置为-1的话也可以,这样leader和follower都收到数据后才返回消息,数据最可靠,但效率太慢了。综合权重,我们的ack设置为1,leader收到了,就回应生产者offset; 针对消费者,我们使用了range的分区分配策略,也可以选择roundRobin,只不过roundroubin对消费者要求线程数一样,并且一个消费者组只消费同一个主题,而Range没有这些限制,分配比较均匀。

(Kafka还有个问题就是相关ISR的副本同步队列问题,因为leader负责消费者消费的内容。所以leader挂了谁上。就有副本同步队列。它会根据leader和follower之间的延迟条数和延迟时间来判断,后面延迟条数被抛弃了。)

更新时间 2023-11-08