大数据之 Hadoop-15-Flume

一、Flume概述

Apache Flume是一个分布式的、可靠和易用的日志收集系统,用于将大量日志数据从许多不同的源进行收集、聚合,最终移动到一个集中的数据中心进行存储。Flume的使用不仅仅限于日志数据聚合,由于数据源是可定制的,Flume可以用于传输大量数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息和几乎所有可能的数据源。

架构原理

单节点架构

Flume中最小的独立运行单位是Agent,Agent是一个JVM进程,运行在日志收集节点(服务器节点),其包含三个组件---Source(源)、Channel(通道〉和 Sink(接收地)。数据可以从外部数据源流入到这些组件,然后再输出到目的地。一个 Flume单节点架构如图所示。
file

Flume中传输数据的基本单位是event(如果是文本文件,通常是一行记录),event包括event头( headers)和 event体( body) 。event头是一些key-value键值对,存储在Map集合中,就好比HTTP的头信息,用于传递与体不同的额外信息。event 体为一个字节数组,存储实际要传递的数据。event的结构如图所示。

file

组件介绍

Source用于消费外部数据源中的数据(event,例如Web系统产生的日志),一个外部数据源(如 Web服务器)可以以 Source 识别的格式向Source发送数据。

  • Channel用于存储Source传入的数据,当这些数据被Sink消费后则会自动删除。
  • Sink 用于消费Channel 中的数据,然后将其存放在外部持久化的文件系统中(例如 HDFS、HBase和 Hive等)。
  • Flume可以在一个配置文件中指定一个或者多个Agent,每个Agent都需要指定Source .Channel和Sink三个组件以及它们的绑定关系,从而形成一个完整的数据流。
    Source、Channel和 Sink根据功能的不同有不同的类型。Source组件根据数据源的不同,常用类型与描述如表所示。

file

Channel组件根据存储方式的不同,常用类型与描述如表所示。
file

Sink 组件根据输出目的地的不同,常用类型与描述如表12-3所示。
file

多节点架构

如图所示,Flume除了可以单节点直接采集数据外,也提供了多节点共同采集数据的功能,多个Agent位于不同的服务器上,每个 Agent 的Avro Sink将数据输出到另一台服务器上的同一个Avro Source进行汇总,最终将数据输出到 HDFS文件系统中。

例如一个大型网站,为了实现负载均衡功能,往往需要部署在多台服务器上,每台服务器都会产生大量日志数据,如何将每台服务器的日志数据汇总到一台服务器上,然后对其进行分析呢?这个时候可以在每台网站所在的服务器上安装一个Flume,每个 Flume启动一个Agent对本地日志进行收集,然后分别将每个Agent 收集到的日志数据发送到同一台装有Flume的服务器进行汇总,最终将汇总的日志数据写入本地 HDFS文件系统中。

为了能使数据流跨越多个Agent,前一个Agent的Sink和当前Agent 的Source需要同样是Avro类型,并且 Sink需要指定Source的主机名(或者IP地址)和端口号。

file

一个Source可以对应多个Channel,一个Channel也可以对应多个Source。一个Channel可以对应多个Sink,但一个Sink只能对应一个Channel。

二、安装与简单实用

Flume依赖于Java环境,安装Flume之前需要先安装好JDK,JDK的安装此处不再赘述。下面讲解在集群中的centos01节点上安装Flume的操作步骤,并配置Flume从指定端口采集数据,将数据输出到控制台。

1、下载解压

到清华镜像 mirrors.tuna.tsinghua.edu.cn 找到 flume ,然后找到 HBase-2.1.x 地址下载,由于没有 2.1的版本,找 hbase-2.2.7 版本的下载。

$ cd /opt/softwares
$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz --no-check-certificate
$ tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /opt/modules/
$ cd /opt/modules/apache-flume-1.8.0-bin

2、配置环境变量

执行以下命令,修改 /etc/profile 文件:

> sudo vi /etc/profile

在该文件中加入以下代码,使 flume命令可以在任意目录下执行:

export FLUME_HOME=/opt/modules/apache-flume-1.8.0-bin
export PATH=$PATH:$FLUME_HOME/bin

刷新 /etc/profile 文件使修改生效:

source /etc/profile

在任意目录下执行 flume-ng 命令,若能成功输出如下参数信息,说明环境变量配置成功:

[root@centos01 bin]# flume-ng
Error: Unknown or unspecified command ''

Usage: /opt/modules/apache-flume-1.8.0-bin/bin/flume-ng <command> [options]...

commands:
  help                      display this help text
  agent                     run a Flume agent
  avro-client               run an avro Flume client
  version                   show Flume version info

global options:
  --conf,-c <conf>          use configs in <conf> directory
  --classpath,-C <cp>       append to the classpath
  --dryrun,-d               do not actually start Flume, just print the command
  --plugins-path <dirs>     colon-separated list of plugins.d directories. See the
                            plugins.d section in the user guide for more details.
                            Default: $FLUME_HOME/plugins.d
  -Dproperty=value          sets a Java system property value
  -Xproperty=value          sets a Java -X option

agent options:
  --name,-n <name>          the name of this agent (required)
  --conf-file,-f <file>     specify a config file (required if -z missing)
  --zkConnString,-z <str>   specify the ZooKeeper connection to use (required if -f missing)
  --zkBasePath,-p <path>    specify the base path in ZooKeeper for agent configs
  --no-reload-conf          do not reload config file if changed
  --help,-h                 display help text

avro-client options:
  --rpcProps,-P <file>   RPC client properties file with server connection params
  --host,-H <host>       hostname to which events will be sent
  --port,-p <port>       port of the avro source
  --dirname <dir>        directory to stream to avro source
  --filename,-F <file>   text file to stream to avro source (default: std input)
  --headerFile,-R <file> File containing event headers as key/value pairs on each new line
  --help,-h              display help text

  Either --rpcProps or both --host and --port must be specified.

Note that if <conf> directory is specified, then it is always included first
in the classpath.

/etc/profile 完整文件:

[root@centos01 apache-flume-1.8.0-bin]# cat /etc/profile
# /etc/profile

# System wide environment and startup programs, for login setup
# Functions and aliases go in /etc/bashrc

# It's NOT a good idea to change this file unless you know what you
# are doing. It's much better to create a custom.sh shell script in
# /etc/profile.d/ to make custom changes to your environment, as this
# will prevent the need for merging in future updates.

pathmunge () {
    case ":${PATH}:" in
        *:"$1":*)
            ;;
        *)
            if [ "$2" = "after" ] ; then
                PATH=$PATH:$1
            else
                PATH=$1:$PATH
            fi
    esac
}

if [ -x /usr/bin/id ]; then
    if [ -z "$EUID" ]; then
        # ksh workaround
        EUID=`/usr/bin/id -u`
        UID=`/usr/bin/id -ru`
    fi
    USER="`/usr/bin/id -un`"
    LOGNAME=$USER
    MAIL="/var/spool/mail/$USER"
fi

# Path manipulation
if [ "$EUID" = "0" ]; then
    pathmunge /usr/sbin
    pathmunge /usr/local/sbin
else
    pathmunge /usr/local/sbin after
    pathmunge /usr/sbin after
fi

HOSTNAME=`/usr/bin/hostname 2>/dev/null`
HISTSIZE=1000
if [ "$HISTCONTROL" = "ignorespace" ] ; then
    export HISTCONTROL=ignoreboth
else
    export HISTCONTROL=ignoredups
fi

export PATH USER LOGNAME MAIL HOSTNAME HISTSIZE HISTCONTROL

# By default, we want umask to get set. This sets it for login shell
# Current threshold for system reserved uid/gids is 200
# You could check uidgid reservation validity in
# /usr/share/doc/setup-*/uidgid file
if [ $UID -gt 199 ] && [ "`/usr/bin/id -gn`" = "`/usr/bin/id -un`" ]; then
    umask 002
else
    umask 022
fi

for i in /etc/profile.d/*.sh /etc/profile.d/sh.local ; do
    if [ -r "$i" ]; then
        if [ "${-#*i}" != "$-" ]; then 
            . "$i"
        else
            . "$i" >/dev/null
        fi
    fi
done

unset i
unset -f pathmunge
#JAVA_HOME
export JAVA_HOME=/opt/modules/jdk1.8.0_211
export PATH=$PATH:$JAVA_HOME/bin

#HADOOP_HOME
export HADOOP_HOME=/opt/modules/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

#SQOOP_HOME
export SQOOP_HOME=/opt/modules/sqoop-1.4.7
export PATH=$PATH:$SQOOP_HOME/bin

#HIVE_HOME
export HIVE_HOME=/opt/modules/apache-hive-2.3.9-bin
export PATH=$PATH:$HIVE_HOME/bin

#Flume
export FLUME_HOME=/opt/modules/apache-flume-1.8.0-bin
export PATH=$PATH:$FLUME_HOME/bin
[root@centos01 apache-flume-1.8.0-bin]# 

3、配置Flume

Flume配置文件是一个 Java属性文件,里面存放键值对字符串,且Flume对配置文件的名称和路径没有固定的要求,但一般都放在Flume安装目录的conf文件夹中。
在Flume安装目录的conf文件夹中新建配置文件 flume-conf.properties,并在其中加入以下内容:

[root@centos01 conf]# cp flume-conf.properties.template flume-conf.properties
[root@centos01 conf]# vi flume-conf.properties

flume-conf.properties 文件:

#单节点Flume配置例子

#给Agent 中的三个组件 Source、Sink和channel 各起一个别名,a1代表为Agent起的别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#source属性配置信息
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# sink属性配置信息
a1.sinks.k1.type = logger

# channe1属性配置信息
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 绑定source和sink到 channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

上述配置属性解析如下。

  • a1.sources.r1.type: Source的类型。netcat表示打开指定的端口并监听数据,数据的格式必须是换行分割的文本,每行文本会被转换为event发送给Channel。
  • a1.sinks.k1.type: Sink 的类型。logger表示在INFO级别上记录日志数据,通常用于测试/调试目的。
  • a1.channels.c1.type: Channel的类型。memory表示将event存储在内存队列中。如果对数据流的吞吐量要求比较高,可以采用memory类型;如果不允许数据丢失,不建议采用memory类型。
  • a1.channels.c1.capacity:存储在Channel中的最大event数量。
  • a1.channels.c1.transactionCapacity:在每次事务中,Channel从Source接收或发送给Sink的最大event数量。

上述配置信息描述了一个单节点的Flume部署,允许用户生成数据并发送到Flume,Flume接收到数据后会输出到控制台。该配置定义了一个名为a1的 Agent。 al的Source组件监听端口44444上的数据源,并将接收到的event 发送给Channel, al的 Channel组件将接收到的event缓冲到内存,a1的 Sink组件最终将event输出到控制台。

Flume的配置文件中可以定义多个Agent,在启动Flume时可以指定使用哪一个Agent。

4、启动Flume

在任意目录执行以下命令,启动Flume:

$ flume-ng agent --conf conf --conf-file $FLUME_HOME/conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console

上述代码中各参数含义如下。

  • --conf-file:指定配置文件的位置。
  • --name:指定要运行的Agent名称。
  • -Dflume.root.logger:指定日志输出级别(INFO)和输出位置(控制台)。部分启动日志如下:
2021-10-10 23:32:42,615 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2021-10-10 23:32:42,615 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2021-10-10 23:32:42,963 INFO node.Application: Starting Sink k1
2021-10-10 23:32:42,964 INFO node.Application: Starting Source r1
2021-10-10 23:32:42,966 INFO source.NetcatSource: Source starting
2021-10-10 23:32:43,053 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

启动成功后,新开一个SSH 窗口,执行以下命令,连接本地44444端口:

> telnet localhost 44444
[root@centos01 ~]# telnet localhost 44444
Trying ::1%1...
telnet: connect to address ::1%1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.

若提示找不到telnet命令,则执行以下命令安装telnet组件:

> yum install -y telnet

安装成功后,重新连接44444端口,命令及输出信息如下:

此时继续输入任意字符串(此处输入字符串“hello”)后按回车键,向本地Flume发送数据。然后回到启动Flume的SSH窗口可以看到,控制台成功打印出了接收到的数据“hello”,如图所示。
file

file

三、日志监控

本例实用Flume实时采集日志文件中的数据并输出到控制台或HDFS系统中。

1、设计思路

Flume的开发主要是编写配置文件,配置文件中三大组件Source、Channel和Sink 的类型与属性需要根据具体业务进行选取。
(1)配置组件名称。
给Agent 中的三个组件Source、Sink和 Channel 各起一个别名,通常为r1、k1和c1,a1表示为Agent起的别名:

a1.sources =r1
a1.sinks = k1
a1 .channels = c1

(2) 配置source组件
在本例中,需要实时采集日志文件中的数据,而我们已经知道,Linux Shell命令tail-F可以实时监控文件内容并输出,因此可以配置Source执行该命令。在官网的介绍中,Exec类型的Source在启动时可以运行指定的Shell 命令,并且期望该进程在标准输出上持续生成数据。Exec Source的常用配置属性介绍如表所示。

file

假设日志文件为/home/hadoop/data.log ,因此本例中Source组件可以如下配置:

a1.sources.r1.type =exec
a1.sources.r1.command= tail -F /home/hadoop/data.log

(3)配置Channel组件。
我们可以将日志数据存储于内存队列中,以加快传输速度,因此可选用Memory类型的Channel组件。
Memory Channel的常用配置属性介绍如表所示。
file

因此本例中Channel组件可以如下配置:

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

(4) 配置 Sink组件。
Logger类型的Sink可以在 INFO级别上记录日志数据,通常用于测试/调试目的。
Logger Sink的常用配置属性介绍如表所示。
file

因此本例中 Sink组件可以如下配置:

a1.sinks.k1.type= logger

(5)绑定三大组件。
最后,将Source和 Sink 绑定到Channel 上:

a1.sources.r1.channels =c1
a1.sinks.k1.channel =c1

2、操作步骤

(1)在 Flume的安装目录下的conf文件夹中新建配置文件 flume-conf2.properties,然后在该文件中写入以下内容:

#配置组件名称
a1.sources =r1
a1.sinks = k1
a1.channels =c1

#配置Source组件
a1.sources.r1.type =exec
a1.sources.r1.command = tail -F /home/hadoop/data.log

#配置Channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity = 100

#配置Sink组件
a1.sinks.k1.type = logger

#将Source和 Sink绑定到Channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(2)执行以下命令,启动Agent:

$ flume-ng agent \
--name a1 \
--conf conf \
--conf-file $FLUME_HOME/conf/flume-conf2.properties \
-Dflume.root.logger=INFO,console

上述命令启动Agent,读取配置文件 flume-conf2.properties,并将日志信息输出到控制台。从启动日志信息可以看到,Source、Channel和Sink 三个组件都已启动,如图所示。

file

( 3)新开启一个连接centos01节点的SSH 窗.口,执行以下命令,向日志文件/home/hadoop/data.log中写入内容“hello flume”:

$ echo 'hello flume' >> /home/hadoop/data.log

回到启动Agent 的SSH 窗口,可以看到控制台输出了信息“hello flume”,如图所示。

file

[root@centos01 ~]# echo 'hello flume' >> /home/hadoop/data.log
[root@centos01 ~]# echo 'DA WAN QU' >> /home/hadoop/data.log
[root@centos01 ~]# cat /home/hadoop/data.log
hello flume
DA WAN QU
[root@centos01 ~]# 

file

到此,Flume实时采集日志数据并输出到控制台的例子就完成了。

为者常成,行者常至