大数据之 Hadoop-15-Flume
一、Flume概述
Apache Flume是一个分布式的、可靠和易用的日志收集系统,用于将大量日志数据从许多不同的源进行收集、聚合,最终移动到一个集中的数据中心进行存储。Flume的使用不仅仅限于日志数据聚合,由于数据源是可定制的,Flume可以用于传输大量数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息和几乎所有可能的数据源。
架构原理
单节点架构
Flume中最小的独立运行单位是Agent,Agent是一个JVM进程,运行在日志收集节点(服务器节点),其包含三个组件---Source(源)、Channel(通道〉和 Sink(接收地)。数据可以从外部数据源流入到这些组件,然后再输出到目的地。一个 Flume单节点架构如图所示。
Flume中传输数据的基本单位是event(如果是文本文件,通常是一行记录),event包括event头( headers)和 event体( body) 。event头是一些key-value键值对,存储在Map集合中,就好比HTTP的头信息,用于传递与体不同的额外信息。event 体为一个字节数组,存储实际要传递的数据。event的结构如图所示。
组件介绍
Source用于消费外部数据源中的数据(event,例如Web系统产生的日志),一个外部数据源(如 Web服务器)可以以 Source 识别的格式向Source发送数据。
- Channel用于存储Source传入的数据,当这些数据被Sink消费后则会自动删除。
- Sink 用于消费Channel 中的数据,然后将其存放在外部持久化的文件系统中(例如 HDFS、HBase和 Hive等)。
- Flume可以在一个配置文件中指定一个或者多个Agent,每个Agent都需要指定Source .Channel和Sink三个组件以及它们的绑定关系,从而形成一个完整的数据流。
Source、Channel和 Sink根据功能的不同有不同的类型。Source组件根据数据源的不同,常用类型与描述如表所示。
Channel组件根据存储方式的不同,常用类型与描述如表所示。
Sink 组件根据输出目的地的不同,常用类型与描述如表12-3所示。
多节点架构
如图所示,Flume除了可以单节点直接采集数据外,也提供了多节点共同采集数据的功能,多个Agent位于不同的服务器上,每个 Agent 的Avro Sink将数据输出到另一台服务器上的同一个Avro Source进行汇总,最终将数据输出到 HDFS文件系统中。
例如一个大型网站,为了实现负载均衡功能,往往需要部署在多台服务器上,每台服务器都会产生大量日志数据,如何将每台服务器的日志数据汇总到一台服务器上,然后对其进行分析呢?这个时候可以在每台网站所在的服务器上安装一个Flume,每个 Flume启动一个Agent对本地日志进行收集,然后分别将每个Agent 收集到的日志数据发送到同一台装有Flume的服务器进行汇总,最终将汇总的日志数据写入本地 HDFS文件系统中。
为了能使数据流跨越多个Agent,前一个Agent的Sink和当前Agent 的Source需要同样是Avro类型,并且 Sink需要指定Source的主机名(或者IP地址)和端口号。
一个Source可以对应多个Channel,一个Channel也可以对应多个Source。一个Channel可以对应多个Sink,但一个Sink只能对应一个Channel。
二、安装与简单实用
Flume依赖于Java环境,安装Flume之前需要先安装好JDK,JDK的安装此处不再赘述。下面讲解在集群中的centos01节点上安装Flume的操作步骤,并配置Flume从指定端口采集数据,将数据输出到控制台。
1、下载解压
到清华镜像 mirrors.tuna.tsinghua.edu.cn 找到 flume ,然后找到 HBase-2.1.x 地址下载,由于没有 2.1的版本,找 hbase-2.2.7 版本的下载。
$ cd /opt/softwares
$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz --no-check-certificate
$ tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /opt/modules/
$ cd /opt/modules/apache-flume-1.8.0-bin
2、配置环境变量
执行以下命令,修改 /etc/profile
文件:
> sudo vi /etc/profile
在该文件中加入以下代码,使 flume命令可以在任意目录下执行:
export FLUME_HOME=/opt/modules/apache-flume-1.8.0-bin
export PATH=$PATH:$FLUME_HOME/bin
刷新 /etc/profile
文件使修改生效:
source /etc/profile
在任意目录下执行 flume-ng
命令,若能成功输出如下参数信息,说明环境变量配置成功:
[root@centos01 bin]# flume-ng
Error: Unknown or unspecified command ''
Usage: /opt/modules/apache-flume-1.8.0-bin/bin/flume-ng <command> [options]...
commands:
help display this help text
agent run a Flume agent
avro-client run an avro Flume client
version show Flume version info
global options:
--conf,-c <conf> use configs in <conf> directory
--classpath,-C <cp> append to the classpath
--dryrun,-d do not actually start Flume, just print the command
--plugins-path <dirs> colon-separated list of plugins.d directories. See the
plugins.d section in the user guide for more details.
Default: $FLUME_HOME/plugins.d
-Dproperty=value sets a Java system property value
-Xproperty=value sets a Java -X option
agent options:
--name,-n <name> the name of this agent (required)
--conf-file,-f <file> specify a config file (required if -z missing)
--zkConnString,-z <str> specify the ZooKeeper connection to use (required if -f missing)
--zkBasePath,-p <path> specify the base path in ZooKeeper for agent configs
--no-reload-conf do not reload config file if changed
--help,-h display help text
avro-client options:
--rpcProps,-P <file> RPC client properties file with server connection params
--host,-H <host> hostname to which events will be sent
--port,-p <port> port of the avro source
--dirname <dir> directory to stream to avro source
--filename,-F <file> text file to stream to avro source (default: std input)
--headerFile,-R <file> File containing event headers as key/value pairs on each new line
--help,-h display help text
Either --rpcProps or both --host and --port must be specified.
Note that if <conf> directory is specified, then it is always included first
in the classpath.
/etc/profile
完整文件:
[root@centos01 apache-flume-1.8.0-bin]# cat /etc/profile
# /etc/profile
# System wide environment and startup programs, for login setup
# Functions and aliases go in /etc/bashrc
# It's NOT a good idea to change this file unless you know what you
# are doing. It's much better to create a custom.sh shell script in
# /etc/profile.d/ to make custom changes to your environment, as this
# will prevent the need for merging in future updates.
pathmunge () {
case ":${PATH}:" in
*:"$1":*)
;;
*)
if [ "$2" = "after" ] ; then
PATH=$PATH:$1
else
PATH=$1:$PATH
fi
esac
}
if [ -x /usr/bin/id ]; then
if [ -z "$EUID" ]; then
# ksh workaround
EUID=`/usr/bin/id -u`
UID=`/usr/bin/id -ru`
fi
USER="`/usr/bin/id -un`"
LOGNAME=$USER
MAIL="/var/spool/mail/$USER"
fi
# Path manipulation
if [ "$EUID" = "0" ]; then
pathmunge /usr/sbin
pathmunge /usr/local/sbin
else
pathmunge /usr/local/sbin after
pathmunge /usr/sbin after
fi
HOSTNAME=`/usr/bin/hostname 2>/dev/null`
HISTSIZE=1000
if [ "$HISTCONTROL" = "ignorespace" ] ; then
export HISTCONTROL=ignoreboth
else
export HISTCONTROL=ignoredups
fi
export PATH USER LOGNAME MAIL HOSTNAME HISTSIZE HISTCONTROL
# By default, we want umask to get set. This sets it for login shell
# Current threshold for system reserved uid/gids is 200
# You could check uidgid reservation validity in
# /usr/share/doc/setup-*/uidgid file
if [ $UID -gt 199 ] && [ "`/usr/bin/id -gn`" = "`/usr/bin/id -un`" ]; then
umask 002
else
umask 022
fi
for i in /etc/profile.d/*.sh /etc/profile.d/sh.local ; do
if [ -r "$i" ]; then
if [ "${-#*i}" != "$-" ]; then
. "$i"
else
. "$i" >/dev/null
fi
fi
done
unset i
unset -f pathmunge
#JAVA_HOME
export JAVA_HOME=/opt/modules/jdk1.8.0_211
export PATH=$PATH:$JAVA_HOME/bin
#HADOOP_HOME
export HADOOP_HOME=/opt/modules/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
#SQOOP_HOME
export SQOOP_HOME=/opt/modules/sqoop-1.4.7
export PATH=$PATH:$SQOOP_HOME/bin
#HIVE_HOME
export HIVE_HOME=/opt/modules/apache-hive-2.3.9-bin
export PATH=$PATH:$HIVE_HOME/bin
#Flume
export FLUME_HOME=/opt/modules/apache-flume-1.8.0-bin
export PATH=$PATH:$FLUME_HOME/bin
[root@centos01 apache-flume-1.8.0-bin]#
3、配置Flume
Flume配置文件是一个 Java属性文件,里面存放键值对字符串,且Flume对配置文件的名称和路径没有固定的要求,但一般都放在Flume安装目录的conf文件夹中。
在Flume安装目录的conf文件夹中新建配置文件 flume-conf.properties
,并在其中加入以下内容:
[root@centos01 conf]# cp flume-conf.properties.template flume-conf.properties
[root@centos01 conf]# vi flume-conf.properties
flume-conf.properties
文件:
#单节点Flume配置例子
#给Agent 中的三个组件 Source、Sink和channel 各起一个别名,a1代表为Agent起的别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source属性配置信息
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# sink属性配置信息
a1.sinks.k1.type = logger
# channe1属性配置信息
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 绑定source和sink到 channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
上述配置属性解析如下。
- a1.sources.r1.type: Source的类型。netcat表示打开指定的端口并监听数据,数据的格式必须是换行分割的文本,每行文本会被转换为event发送给Channel。
- a1.sinks.k1.type: Sink 的类型。logger表示在INFO级别上记录日志数据,通常用于测试/调试目的。
- a1.channels.c1.type: Channel的类型。memory表示将event存储在内存队列中。如果对数据流的吞吐量要求比较高,可以采用memory类型;如果不允许数据丢失,不建议采用memory类型。
- a1.channels.c1.capacity:存储在Channel中的最大event数量。
- a1.channels.c1.transactionCapacity:在每次事务中,Channel从Source接收或发送给Sink的最大event数量。
上述配置信息描述了一个单节点的Flume部署,允许用户生成数据并发送到Flume,Flume接收到数据后会输出到控制台。该配置定义了一个名为a1的 Agent。 al的Source组件监听端口44444上的数据源,并将接收到的event 发送给Channel, al的 Channel组件将接收到的event缓冲到内存,a1的 Sink组件最终将event输出到控制台。
Flume的配置文件中可以定义多个Agent,在启动Flume时可以指定使用哪一个Agent。
4、启动Flume
在任意目录执行以下命令,启动Flume:
$ flume-ng agent --conf conf --conf-file $FLUME_HOME/conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console
上述代码中各参数含义如下。
- --conf-file:指定配置文件的位置。
- --name:指定要运行的Agent名称。
- -Dflume.root.logger:指定日志输出级别(INFO)和输出位置(控制台)。部分启动日志如下:
2021-10-10 23:32:42,615 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2021-10-10 23:32:42,615 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2021-10-10 23:32:42,963 INFO node.Application: Starting Sink k1
2021-10-10 23:32:42,964 INFO node.Application: Starting Source r1
2021-10-10 23:32:42,966 INFO source.NetcatSource: Source starting
2021-10-10 23:32:43,053 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
启动成功后,新开一个SSH 窗口,执行以下命令,连接本地44444端口:
> telnet localhost 44444
[root@centos01 ~]# telnet localhost 44444
Trying ::1%1...
telnet: connect to address ::1%1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
若提示找不到telnet命令,则执行以下命令安装telnet组件:
> yum install -y telnet
安装成功后,重新连接44444端口,命令及输出信息如下:
此时继续输入任意字符串(此处输入字符串“hello”)后按回车键,向本地Flume发送数据。然后回到启动Flume的SSH窗口可以看到,控制台成功打印出了接收到的数据“hello”,如图所示。
三、日志监控
本例实用Flume实时采集日志文件中的数据并输出到控制台或HDFS系统中。
1、设计思路
Flume的开发主要是编写配置文件,配置文件中三大组件Source、Channel和Sink 的类型与属性需要根据具体业务进行选取。
(1)配置组件名称。
给Agent 中的三个组件Source、Sink和 Channel 各起一个别名,通常为r1、k1和c1,a1表示为Agent起的别名:
a1.sources =r1
a1.sinks = k1
a1 .channels = c1
(2) 配置source组件
在本例中,需要实时采集日志文件中的数据,而我们已经知道,Linux Shell命令tail-F可以实时监控文件内容并输出,因此可以配置Source执行该命令。在官网的介绍中,Exec类型的Source在启动时可以运行指定的Shell 命令,并且期望该进程在标准输出上持续生成数据。Exec Source的常用配置属性介绍如表所示。
假设日志文件为/home/hadoop/data.log ,因此本例中Source组件可以如下配置:
a1.sources.r1.type =exec
a1.sources.r1.command= tail -F /home/hadoop/data.log
(3)配置Channel组件。
我们可以将日志数据存储于内存队列中,以加快传输速度,因此可选用Memory类型的Channel组件。
Memory Channel的常用配置属性介绍如表所示。
因此本例中Channel组件可以如下配置:
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
(4) 配置 Sink组件。
Logger类型的Sink可以在 INFO级别上记录日志数据,通常用于测试/调试目的。
Logger Sink的常用配置属性介绍如表所示。
因此本例中 Sink组件可以如下配置:
a1.sinks.k1.type= logger
(5)绑定三大组件。
最后,将Source和 Sink 绑定到Channel 上:
a1.sources.r1.channels =c1
a1.sinks.k1.channel =c1
2、操作步骤
(1)在 Flume的安装目录下的conf文件夹中新建配置文件 flume-conf2.properties
,然后在该文件中写入以下内容:
#配置组件名称
a1.sources =r1
a1.sinks = k1
a1.channels =c1
#配置Source组件
a1.sources.r1.type =exec
a1.sources.r1.command = tail -F /home/hadoop/data.log
#配置Channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity = 100
#配置Sink组件
a1.sinks.k1.type = logger
#将Source和 Sink绑定到Channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(2)执行以下命令,启动Agent:
$ flume-ng agent \
--name a1 \
--conf conf \
--conf-file $FLUME_HOME/conf/flume-conf2.properties \
-Dflume.root.logger=INFO,console
上述命令启动Agent,读取配置文件 flume-conf2.properties
,并将日志信息输出到控制台。从启动日志信息可以看到,Source、Channel和Sink 三个组件都已启动,如图所示。
( 3)新开启一个连接centos01节点的SSH 窗.口,执行以下命令,向日志文件/home/hadoop/data.log中写入内容“hello flume”:
$ echo 'hello flume' >> /home/hadoop/data.log
回到启动Agent 的SSH 窗口,可以看到控制台输出了信息“hello flume”,如图所示。
[root@centos01 ~]# echo 'hello flume' >> /home/hadoop/data.log
[root@centos01 ~]# echo 'DA WAN QU' >> /home/hadoop/data.log
[root@centos01 ~]# cat /home/hadoop/data.log
hello flume
DA WAN QU
[root@centos01 ~]#
到此,Flume实时采集日志数据并输出到控制台的例子就完成了。
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)