实战大数据-01-构建日志采集和分析平台
一、构建Flume集群
需求描述:每天有海量的用户访问新闻网站,那么新闻网站需要多台Web服务器分摊用户的访问压力,而且用户访问新闻网站产生的日志数据写入web服务器落盘。为了分析新闻网站的用户行为,需要通过Flume 将用户日志数据采集到大数据平台。如果每台Flume采集服务直接将数据写入大数据平台,会造成很大的 I/O 压力,所以需要增加Flume聚合层对来自采集节点的数据进行聚合,它能减少对大数据平台的压力。Flume的采集层和聚合层共同形成了Flume集群。
本项目有 3 个实验集群节点:centos01、centos02和centos03, centos01作为 Flume采集节点,采集Web服务器日志,选择centos02和centos03作为Flume聚合节点,聚合来自 centos01 节点采集的数据,要求 centos01 采集节点的数据负载均衡到centos02 和 centos03节点。
1、配置Flume采集服务
由于web服务器有多台,这里以 centos01 作为采集节点配置即可。
在 centos01 节点上,进入 Flume 安装目录下的 conf目录,新建一个配置文件 taildir-file-selector-avro.properties
,具体配置如下:
$ cd /opt/modules/apache-flume-1.8.0-bin/conf
$ vi taildir-file-selector-avro.properties
文件内容:
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
# 定义Source、Channel、Sink的名称
agent1.sources = taildirSource
agent1.channels = fileChannel
agent1.sinkgroups = g1
agent1.sinks = k1 k2
# Define and configure an exec source
agent1.sources.taildirSource.type = TAILDIR
agent1.sources.taildirSource.positionFile = /home/hadoop/data/flume/taildir_position.json
agent1.sources.taildirSource.filegroups = f1
agent1.sources.taildirSource.filegroups.f1 = /home/hadoop/data/flume/logs/sogou.log
agent1.sources.taildirSource.channels = fileChannel
# 定义和配置一个fileChannel
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /home/hadoop/data/flume/checkpointDir
agent1.channels.fileChannel.dataDirs = /home/hadoop/data/flume/dataDirs
# 定义和配置一个sink组
agent1.sinkgroups.g1.sinks = k1 k2
# 为sink组定义一个处理器,load_balance 负载均衡 failover故障切换
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
# round_robin 轮询 random随机
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000
# 定义一个sink将数据发送给centos02节点
agent1.sinks.k1.type = avro
agent1.sinks.k1.channel = fileChannel
agent1.sinks.k1.batchSize = 1
agent1.sinks.k1.hostname = centos02
agent1.sinks.k1.port = 1234
# 定义一个sink将数据发送给centos03节点
agent1.sinks.k2.type = avro
agent1.sinks.k2.channel = fileChannel
agent1.sinks.k2.batchSize = 1
agent1.sinks.k2.hostname = centos03
agent1.sinks.k2.port = 1234
Flume采集配置中,Source类型选择 TAILDIR
类型实时采集文件的新增内容,Channel类型选择file类型将采集的数据持久化到本地磁盘,Sink 选择 avro 类型将数据再发送给聚合节点的Flume服务。
2、配置Flume聚合服务
Flume采集 centos01 节点的数据,发送到两个 Flume聚合节点 centos02 和 centos03,那么就需要配置聚合节点的Flume服务来接收数据,而且两个聚合节点Flume配置是一致的。
复制centos01 的Flume安装节点到 centos02、centos03
$ scp -r /opt/modules/apache-flume-1.8.0-bin/ hadoop@centos02:/opt/modules/
$ scp -r /opt/modules/apache-flume-1.8.0-bin/ hadoop@centos03:/opt/modules/
说明:hadoop用户密码为:hadoop@123
在centos02、centos03 Flume安装目录下的conf目录,新建一个配置文件 avro-file-selector-logger.properties
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
# 定义Source、Channel、Sink的名称
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
# Define and configure an avro
agent1.sources.r1.type = avro
agent1.sources.r1.channels = c1
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 1234
# Configure channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /home/hadoop/data/flume/checkpointDir
agent1.channels.c1.dataDirs = /home/hadoop/data/flume/dataDirs
# Define and configure a sink
agent1.sinks.k1.type = logger
agent1.sinks.k1.channel = c1
3、Flume集群测试
(1)、启动Flume聚合服务
进入 /opt/modules/apache-flume-1.8.0-bin/conf
目录,在 centos02 和 centos03 节点上分别启动 Flume 进程,启动命令如下:
> cd /opt/modules/apache-flume-1.8.0-bin
> bin/flume-ng agent -n agent1 -c conf -f conf/avro-file-selector-logger.properties -Dflume.root.logger=INFO,console
(2)、启动Flume采集服务
> cd /opt/modules/apache-flume-1.8.0-bin
> bin/flume-ng agent -n agent1 -c conf -f conf/taildir-file-selector-avro.properties -Dflume.root.logger=INFO,console
(3)、准备测试数据
进入 centos01 节点所在的 /home/hadoop/data/flume/logs/
目录,向监控日志 sogou.log
文件添加新数据。
> echo '00:00:00 00717725924582846 [闪字吧] 1 2 www.shanziba.com/' >> sogou.log
> echo '00:00:00 41416219018952116 [霍震霆与朱玲玲照片] 2 6 bbs.gouzai.cn/thread-698736.html' >> sogou.log
> echo '00:00:00 5228056822071097 [75810部队] 14 5 www.greatoo.com/greatoo_cn/' >> sogou.log
通过命令不断向 sogou.log 文件新增数据,如能能在centos02 和 centos03节点的Flume服务控制台,查看到均匀打印的用户日志,说明Flume集群构建成功。
二、 采集用户行为数据
利用Flume 构建的日志采集集群将用户行为数据分别采集到Kafka集群和HBase集群,为后续用户行为分析做准备。
2.1 Flume与Kafka集成
通过Flume集群将用户行为数据采集到Kafka集群,这里只需要修改 Flume 聚合节点的配置,因为前面已经构建了Flume集群。
配置Flume聚合服务
分别在 centos02 和 centos03 节点,进入 Flume安装目录下的conf目录,新建一个 avro-file-selector-kafka.properties
。
$ cd /opt/modules/apache-flume-1.8.0-bin/conf
$ vi avro-file-selector-kafka.properties
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
# 定义 Source、Channel、Sink的名称
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
# 定义和配置一个 avro Source
agent1.sources.r1.type = avro
agent1.sources.r1.channels = c1
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 1234
# 定义和配置一个file Channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /home/hadoop/data/flume/checkpointDir
agent1.channels.c1.dataDirs = /home/hadoop/data/flume/dataDirs
# 定义和配置一个Kafka Sink
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = sogoulogs
agent1.sinks.k1.brokerList = centos01:9092,centos02:9092,centos03:9092
agent1.sinks.k1.producer.acks = 1
agent1.sinks.k1.channel = c1
Flume聚合配置中,Source选择 avro 类型接收发送过来的数据,Channel选择file类型将接收的数据持久化到本地磁盘,Sink选择KafkaSink 类型将数据写入Kafka集群。
Flume与Kafka集成测试
(0)、准备工作:
启动Zookeeper集群#
分别在三个节点执行以下命令,启动Zookeeper集群(需要进入Zookeeper安装目录)
cd /opt/modules/zookeeper-3.5.9/bin
[root@centos01 bin]# ./zkServer.sh start
启动Kafka集群#
分别在三个节点上执行以下命令,启动Kafka集群(需要进入Kafka安装目录)
cd /opt/modules/kafka_2.12-2.5.0
bin/kafka-server-start.sh -daemon config/server.properties
(1)、启动Flume聚合服务
分别在 centos02 和 centos03 节点启动Flume服务
> cd /opt/modules/apache-flume-1.8.0-bin
> bin/flume-ng agent -n agent1 -c conf -f conf/avro-file-selector-kafka.properties -Dflume.root.logger=INFO,console
centos02 和 centos03 节点启动:
[root@centos02 apache-flume-1.8.0-bin]# bin/flume-ng agent -n agent1 -c conf -f conf/avro-file-selector-kafka.properties -Dflume.root.logger=INFO,console
[root@centos03 apache-flume-1.8.0-bin]# bin/flume-ng agent -n agent1 -c conf -f conf/avro-file-selector-kafka.properties -Dflume.root.logger=INFO,console
(2)、启动Flume采集服务
在 centos01 节点上启动Flume服务,启动命令如下:
> cd /opt/modules/apache-flume-1.8.0-bin
> bin/flume-ng agent -n agent1 -c conf -f conf/taildir-file-selector-avro.properties -Dflume.root.logger=INFO,console
(3)、启动Kafka消费者服务
创建topic,通过kafka脚本提前创建 sogoulogs 主题:
> cd /opt/modules/kafka_2.12-2.5.0
> bin/kafka-topics.sh \
--create \
--zookeeper centos01:2181, centos02:2181, centos03:2181 \
--replication-factor 2 \
--partitions 2 \
--topic sogoulogs
在 centos01 节点上启动消费者:
> bin/kafka-console-consumer.sh \
--bootstrap-server centos01:9092,centos02:9092,centos03:9092 \
--topic sogoulogs
(4)、准备测试数据
进入 centos01 节点所在的 /home/hadoop/data/flume/logs/
目录,向监控日志 sogou.log
文件添加新数据。
> echo '00:00:00 00717725924582846 [wenying] 1 2 www.shanziba.com/' >> sogou.log
> echo '00:00:00 41416219018952116 [霍震霆与朱玲玲照片] 2 6 bbs.gouzai.cn/thread-698736.html' >> sogou.log
> echo '00:00:00 5228056822071097 [8341部队] 14 5 www.greatoo.com/greatoo_cn/' >> sogou.log
可以看到在centos01节点上启动的 Kafka消费者能打印出采集的数据,说明 Flume 成功采集用户行为数据并写入了 Kafka 集群。
2.2 Flume与Hbase集成
Flume直接使用默认的HBaseSink 将采集的数据写入 HBase集群,无法满足后续数据的查询与分析,所以需要对HBaseSink 进行二次开发。
HBaseSink的二次开发
用户行为日志入库 HBase 之前,需要提前设计 HBase 数据库。因为在 Flume 的HBaseSink中,Hbase表中的 RowKey 以及列都是系统默认值,无法满足业务查询的个性化需求,所以,需要对HBaseSink 插件进行二次开发。
(1)下载Flume源码
(2) 导入Flume源码
(3)HbaseSink源码开发
(4) HBaseSink项目打包部署
mvn clean package
打包成功后,将jar包分别分别上传至 centos 02, centos03 节点的Flume安装目录下的 lib 目录下,覆盖 Flume 默认的 HBaseSink 项目包。
`avro-file-selector-hbase.properties` 文件
```
定义 Source、Channel、Sink的名称
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
Define and configure an avro
agent1.sources.r1.type = avro
agent1.sources.r1.channels = c1
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 1234
Configure channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /home/hadoop/data/flume/checkpointDir
agent1.channels.c1.dataDirs = /home/hadoop/data/flume/dataDirs
定义和配置一个HBase Sink
agent1.sinks.k1.type = asynchbase
agent1.sinks.k1.channel = c1
agent1.sinks.k1.table = weblogs
agent1.sinks.k1.serializer = org.apache.flume.sink.hbase.DjtAsyncHbaseEventSerializer
agent1.sinks.k1.zookeeperQuorum = centos01:2181,centos02:2181,centos03:2181
agent1.sinks.k1.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
agent1.sinks.k1.znodeParent = /hbase
agent1.sinks.k1.columnFamily = info
#### 2.2.1 创建HBase数据库表
启动HBase集群之前,需要先启动Hadoop集群。由于HBase不依赖Hadoop YARN,因此只启动Hadoop HDFS 即可。
在 centos01节点执行以下命令,启动HDFS:
[root@centos01 ~]# su hadoop
[hadoop@centos01 root]$ cd /opt/modules/hadoop-3.1.3
[hadoop@centos01 hadoop-3.1.3]$ sbin/start-dfs.sh
在centos01节点执行以下命令,启动HBase集群。启动HBase集群的同时,会将ZooKeeper集群也同时启动。
$ cd /opt/modules/hbase-2.2.7
$ su root # 需要切换到root 账号:123456
$ bin/start-hbase.sh
在启动HBase 之后,我们可以通过执行以下命令启动HBase Shell:
$ [root@centos01 hbase-2.2.7]# bin/hbase shell
hbase(main):001:0>
创建HBase业务表:
hbase(main):001:0> create 'sogoulogs','info'
Created table sogoulogs
Took 5.1336 seconds
=> Hbase::Table - sogoulogs
sougoulogs 为表的名称,info为列簇名称。
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)