柴少鹏的官方网站 技术在分享中进步,水平在学习中升华

大数据(十二)flume的source、sink、channel详解

http://blog.51niux.com/?id=196    #已经对flume进行了介绍,也参照官网搭建了简单的agent端,这里还是参照官网,将配置文件的各种参数记录一下。

一、Flume Sources配置参数详解

1.1 Avro Source

       监听Avro端口并接收来自外部Avro客户端流的事件。 当与另一个(前一跳)Flume agent内置的Avro Sink配对时,它可以创建分层收集拓扑。下面是所需属性为下面加黑部分。

属性名称                          默认      描述

channels                           -                      

type                                  -          组件类型名称,需要avro                  

bind                                  -          主机名或IP地址进行监听

port                                  -          要绑定的端口号

threads                                          产生的工作线程的最大数量

selector.type                   

selector.*                    

interceptors                      -            空格分隔的拦截列表

interceptors.*

compression-type            none       这可以是“none”或“deflate"。压缩类型必须匹配AvroSource匹配的压缩类型

ssl                                     false        将其设置为true以启用SSL加密。 还必须指定“keystore”和“keystore-password”。

keystore                            -              这是Java密钥库文件的路径。 SSL需要。

keystore-password           -               Java密钥库的密码。 SSL需要。

keystore-type                    JKS          Java密钥库的类型。 这可以是“JKS”或“PKCS12”。  

exclude-protocols             SSLv3      要排除的空格分隔的SSL/TLS协议列表。除了指定的协议之外,SSLv3将始终被排除。

ipFilter                               false        将其设置为true以启用ipFiltering为netty

ipFilterRules                        -             用这个配置定义N个netty ipFilter模式规则。

agent名称为a1的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

ipFilterRules的例子,ipFilterRules定义N个netty ipFilters,用逗号分隔,模式规则必须采用这种格式:<’allow’ or deny>:<’ip’ or ‘name’ for computer name>:<pattern> or allow/deny:ip/name:pattern

例如:

ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*

请注意,匹配的第一条规则将适用于下面的示例从本地主机上的客户端显示

这将允许本地主机上的客户端拒绝任何其他IP的客户端:“allow:name:localhost,deny:ip:”

这将拒绝本地客户端上的客户端允许来自任何其他IP的客户端:“deny:name:localhost,allow:ip:“

1.2 Thrift Source

       监听Thrift端口并接收来自外部Thrift客户端流的事件。 当与另一个(前一跳)Flume agent内置的ThriftSink配合使用时,它可以创建分层收集拓扑。 通过启用Kerberos身份验证,可将Thrift source配置为以安全模式启动。 agent-principal和agent-keytab是Thrift source用来向kerberos KDC进行身份验证的属性。 所需的属性以粗体显示。

属性名称                           默认        描述

channels                           -                      

type                                  -            组件类型名称,需要thrift        

bind                                  -            主机名或IP地址进行监听

port                                  -             要绑定的端口号

threads                                            产生的工作线程的最大数量

selector.type                   

selector.*                    

interceptors                     -              空格分隔的拦截列表

interceptors.*

ssl                                    false         将其设置为true以启用SSL加密。 还必须指定“keystore”和“keystore-password”。

keystore                           -              这是Java密钥库文件的路径。 SSL需要。

keystore-password          -               Java密钥库的密码。 SSL需要。

keystore-type                 JKS            Java密钥库的类型。 这可以是“JKS”或“PKCS12”。  

exclude-protocols          SSLv3        要排除的空格分隔的SSL/TLS协议列表。除了指定的协议之外,SSLv3将始终被排除。

kerberos                         false          设置为true以启用Kerberos身份验证。 在kerberos模式下成功认证需要agent-principal和agent-keytab。

       处于安全模式的Thrift source将仅接受来自Thrift客户端的连接,该Thrift客户端已启用kerberos并已成功通过kerberos KDC的身份验证。

agent-principal               -               由Thrift Source使用的Kerberos委托人对Kerberos KDC进行验证。

agent-keytab                  -               Thrift Source使用的keytab位置与agent-principal结合使用以向kerberos KDC进行身份验证。

agent名称为a1的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

1.3 Exec Source

       Exec源在启动时运行一个给定的Unix命令,并期望该过程持续在标准输出上生成数据(除非将属性logStdErr设置为true,否则stderr将被简单地丢弃)。 如果该过程因任何原因而退出,则该来源也退出并且不会产生进一步的数据。 这意味着像cat [named pipe] 或者 tail -F [file]这样的配置将产生所需的结果,而date可能不会--前两个命令产生数据流,在后者产生单个事件并退出。所需的属性以粗体显示:

属性名称                 默认         描述

channels                 -                      

type                        -             组件类型名称,需要exec

command              -             要执行的命令

shell                        -             用于运行该命令的shell调用。例如/bin/sh - c。只需要依赖于shell特性的命令,如通配符、回号、管道等。

restartThrottle      10000       尝试重新启动之前的等待时间(以毫秒为单位)

restart                   false         执行的cmd是否应该在它死后重新启动

logStdErr              false         是否应记录命令的stderr

batchSize              20            一次读取和发送到频道的最大行数

batchTimeout       3000        在数据被推向下游之前,如果未达到缓冲区大小,则等待的时间(以毫秒为单位)

selector.type      replicating   replicating(复制) or multiplexing(多路复用)

selector.*                                取决于selector.type的值

interceptors          -                拦截器的列表,用空格分隔

interceptors.*

警告ExecSource和其他异步源的问题是,源无法保证如果没有将事件放到客户机所知道的通道中。在这种情况下,数据将丢失。例如,最常被请求的特性之一是tail -F [file]样用例,其中应用程序写到磁盘上的日志文件,然后将文件发送到文件,将每一行发送为事件。
虽然这是可能的,但有一个明显的问题;如果信道填充和Flume不能发送事件,会发生什么?Flume无法指示应用程序编写日志文件,它需要保留日志,或者由于某种原因还没有发送该事件。如果这没有意义,你只需要知道这一点:
您的应用程序无法保证在使用诸如ExecSource这样的单向异步接口时收到的数据。作为警告的延伸 - 要完全清楚 - 当使用这个源时,事件传递绝对没有保证。为了更强的可靠性保证,可以考虑Spooling Directory Source,Taildir Source或者通过SDK直接与Flume集成。

agent名称为a1的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

      “shell”配置用于通过命令shell调用“命令”(如Bash或Powershell)。“command”作为执行的参数传递给“shell”。这允许“command”使用来自shell的特性,例如通配符、返回标记、管道、循环、条件等。在“shell”配置的缺失中,“命令”将直接调用。“shell”的公共值:“/ bin/sh - c”、“/ bin/ksh - c”、“cmd / c”、“powershell - command”等。

a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

1.4 JMS Source

        JMS Source从JMS目标(如队列或主题)读取消息。 作为一个JMS应用程序,它应该与任何JMS提供程序一起工作,但是只能使用ActiveMQ进行测试。JMS源提供可配置的batch size, message selector, user/pass, and message到flume的events转换器。请注意,供应商提供的JMS jar应该包含在使用plugins.d目录(首选)的Flume类路径中, - 在命令行上的类路径,或者在flume -env. sh中通过FLUME_CLASSPATH变量。

       所需的属性以粗体显示:

属性名称                    默认         描述

channels                    -                      

type                            -              组件类型名称,需要jms

initialContextFactory   -              Inital Context Factory例如:org.apache.activemq.jndi.ActiveMQInitialContextFactory

connectionFactory       -              连接factory应该出现的JNDI名称

providerURL                 -              JMS提供程序URL

destinationName         -              目的地名称

destinationType           -              目标类型(队列(queue)或主题(topic))

messageSelector          -              在创建消费者时使用的消息选择器

userName                     -              目的地/提供者的用户名

passwordFile                 -              包含目标/提供者密码的文件

batchSize                    100            一次要消费的消息数量

converter.type            DEFAULT    用于将消息转换为flume事件的类。见下文

converter.*                    -                转换器属性。

converter.charset        UTF-8          仅默认转换器。 将JMS文本消息转换为字节数组时使用的字符集。

createDurableSubscription  false     是否创建持久订阅。持久订阅只能与destinationType主题一起使用。如果是true,则必须指定“clientId”和“durableSubscriptionName”。

clientId                          -                  JMS客户端标识符在连接权限创建后设置。需要持久订阅。

durableSubscriptionName    -          用于标识持久订阅的名称。需要持久订阅。

Converter

       JMS源允许可插入的转换器,尽管默认的转换器很可能在大多数情况下工作。默认转换器能够将字节、文本和对象消息转换为FlumeEvents。在所有情况下,消息中的属性被添加到FlumeEvent中。

BytesMessage:

消息的字节被复制到FlumeEvent的主体。 每条消息不能转换超过2GB的数据。

TextMessage:

消息文本被转换为字节数组并复制到FlumeEvent的主体。 默认转换器默认使用UTF-8,但这是可配置的。

ObjectMessage:

将对象写入包装在ObjectOutputStream中的ByteArrayOutputStream,并将生成的数组复制到FlumeEvent的主体中。

agent名称为a1的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE

1.5 Spooling Directory Source

       通过将文件放入磁盘上的“spooling”目录中,该信息源可以让您摄取数据。此消息源将监视新文件的指定目录,并在新文件出现时解析事件。事件解析逻辑是可插入的。在将一个给定的文件完全读入通道后,它被重命名为表示完成(或可选删除)。

       与Exec源不同,该源是可靠的,不会错过数据,即使Flume重新启动或被杀。作为这种可靠性的交换,只有不可变的、独特的文件必须被放入到spooling目录中。Flume试图检测这些问题条件,如果违反,将会提示失败:

如果将文件写入到spooling目录中,Flume将会将错误输出到其日志文件并停止处理。
如果文件名称在以后重用,Flume将会将错误打印到日志文件并停止处理。

      为了避免上述问题,在将文件名移到spooling目录时,添加一个惟一的标识符(例如时间戳)来记录文件名可能很有用。

      尽管该数据源有可靠的保证,但仍然有一些情况,如果某些下游故障发生,事件可能会被复写。这与其他Flume组件提供的保证是一致的。

属性名称                    默认             描述

channels                    -                      

type                            -                组件类型名称,需要spooldir

spoolDir                      -                从该目录读取文件的目录。

fileSuffix           .COMPLETED        后缀追加到完全读取的文件 

deletePolicy           never              何时删除完成的文件:从不(never)或立即(immediate)

fileHeader              false               是否添加存储绝对路径文件名的头文件。

fileHeaderKey         false              将绝对路径文件名附加到事件标题时使用的标题键。

basenameHeader   false              是否添加存储文件基本名称的头文件。

basenameHeaderKey   basename   将文件的基本名称附加到事件标题时使用的标题键。

includePattern        ^.*$               正则表达式指定要包含哪些文件。 它可以与ignorePattern一起使用。 如果文件匹配ignorePattern和includePattern正则表达式,则该文件将被忽略。

ignorePattern          ^$                正则表达式指定要忽略的文件(跳过)。 它可以和includePattern一起使用。 如果文件匹配ignorePattern和includePattern正则表达式,则该文件将被忽略。

trackerDir           .flumespool       目录存储与文件处理相关的元数据。 如果此路径不是绝对路径,则将其解释为相对于spoolDir。

consumeOrder      oldest             oldest, youngest and random三种模式

pollDelay               500                对新文件进行轮询时使用的延迟(以毫秒为单位)。

recursiveDirectorySearch  false   是否监视子文件夹以读取新文件。

maxBackoff           4000              如果通道已满,连续尝试写入通道的最长时间(以毫秒为单位)。每次通道抛出ChannelException时,源将从一个低的backoff开始,并以指数方式增加它,upto这个参数指定的值。

batchSize              100                批量传送到通道的粒度

inputCharset         UTF-8            用于将输入文件作为文本处理的反序列化器使用的字符集。

decodeErrorPolicy  FAIL             当我们在输入文件中看到一个不可解码的字符时该怎么办。FAIL:抛出异常,无法解析文件。REPLACE:用“替换字符”替换字符,通常是Unicode U+ FFFD。IGNORE:删除不可分字符序列。

deserializer          LINE                指定用于将文件解析为事件的反序列化程序。默认解析每一行作为事件。指定的类必须实现EventDeserializer.Builder。

deserializer.*                               每个event不同的deserializer

bufferMaxLines       -                   (已过时)此选项现在被忽略。

bufferMaxLineLength  5000        (弃用)提交缓冲区中一行的最大长度。使用反序列化器。maxLineLength代替。

selector.type          replicating       replicating(复制) or multiplexing(多路复用)

selector.*                                        取决于selector.type的值

interceptors             -                     空格分隔的列表的拦截器

interceptors.*

名为agent-1的agent的示例: 

a1.channels = ch-1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true

事件反序列化器:

      Flume附带以下事件反序列化器。

LINE(这个反序列化器在每行文本输入中生成一个事件。):

deserializer.maxLineLength    #在单个事件中包含的最大字符数。如果一条线超过了这个长度,它就会被截断,并且在随后的事件中会出现其余的字符。默认是2048
deserializer.outputCharset    #将事件放入通道中的字符集。默认是UTF-8。

AVRO:

     这个反序列化器能够读取Avro容器文件,并在文件中每个Avro记录生成一个事件。每个事件都有一个标题,表示使用的模式。事件的主体是二进制Avro记录数据,不包括模式或容器文件元素的其余部分。

     请注意,如果spool目录源必须重试将其中一个事件放到一个通道上(例如,因为通道是满的),那么它将重新设置和重试从最新的Avro容器文件同步点。为了减少这种失败场景中的潜在事件重复,可以在Avro输入文件中更频繁地编写同步标记。

deserializer.schemaType   #模式是如何表示的。默认情况下,或当指定值散列时,Avro模式被HASH,并在事件头“flume . avro.schema.hash”中存储在每个事件中。
                          #如果指定KITERAL,json编码的模式本身存储在每一个事件的事件标题“flume.avro.schema.literal”。与哈希模式相比,使用文字模式相对低效。默认是HASH模式。

BlobDeserializer:

      这个反序列化程序读取每个事件的二进制大对象(BLOB),通常为每个文件的一个BLOB。例如一个PDF或JPG文件。注意,这种方法不适用于非常大的对象,因为整个BLOB都在RAM中缓冲。

deserializer   #这个类的FQCN:org.apache.flume.sink.solr.morphline.BlobDeserializer $ Builder
deserializer.maxBlobLength   #给定请求的最大读取和缓冲字节数。默认是100000000

博文来自:www.51niux.com

1.6 Taildir Source

       注意此源提供作为预览功能。 它不适用于Windows。

       watch指定的文件,并在几乎是实时的情况下跟踪这些文件,并在每个文件中发现新的行。如果正在编写新的行,这个源将重试阅读它们,等待写的完成。

       这个来源是可靠的,即使tailing files  rotate,也不会丢失数据。 它定期以JSON格式将每个文件的最后读取位置写入给定的位置文件。 如果Flume由于某种原因停止或关闭,则可以从现有位置文件上写入的位置重新开始拖尾。

      在其他用例中,该源也可以使用给定的位置文件从任意位置开始跟踪每个文件。当在指定路径上没有位置文件时,默认情况下将从每个文件的第一行开始tailing。

      文件将按其修改时间顺序使用。使用最老的修改时间的文件将首先被使用。

      此源不会重命名或删除或对被tail的文件做任何修改。目前该源不支持tail二进制文件。它逐行读取文本文件。

属性名称                    默认             描述

channels                    -                      

type                            -                组件类型名称,需要TAILDIR

filegroups                   -                 文件组的空间分隔列表。每个文件组指示要跟踪的一组文件。

filegroups.<filegroupName>   -   文件组的绝对路径。正则表达式(而不是文件系统模式)只能用于文件名。

positionFile    ~/.flume/taildir_position.json    用JSON格式记录inode、绝对路径和每个tailing文件的最后位置。

headers.<filegroupName>.<headerKey>  -   标题值是用标题键设置的标题值。 可以为一个文件组指定多个标题。

byteOffsetHeader      false              是否将加尾字符的字节偏移量添加到名为“byteoffset”的头部。

skipToEnd                  false              在文件没有写在位置文件上的情况下是否跳到EOF的位置

idleTimeout                120000         时间(毫秒)关闭不活动的文件。 如果关闭的文件附加了新行,该源将自动重新打开它。

writePosInterval          3000            间隔时间(ms)写入位置文件上每个文件的最后一个位置。

batchSize                     100             一次最多可以读取和发送到频道的行数。 使用默认通常很好。

backoffSleepIncrement  1000        在重新尝试轮询新数据之前,最后一次尝试未找到任何新数据之前的时间延迟增量。

maxBackoffSleep          5000           每次重新尝试轮询新数据时,最后一次尝试都没有找到任何新数据时,最大时间延迟。

cachePatternMatching   true          列出目录并应用文件名正则表达式模式对于包含数千个文件的目录可能是耗时的。 缓存匹配文件的列表可以提高。性能消耗文件的顺序也将被缓存,要求文件系统以至少1秒的粒度记录修改时间。

fileHeader                    false            是否添加存储绝对路径文件名的头文件。

fileHeaderKey               file              将绝对路径文件名附加到事件标题时使用的标题键。

agent名称为a1的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true

1.7  Twitter 1% firehose Source

官网收还是实验阶段:使用可能会稳定这里就不记录了。

官网文档链接:http://flume.apache.org/FlumeUserGuide.html#twitter-1-firehose-source-experimental

1.8 Kafka Source

        Kafka源是一个Apache Kafka消费者,它从Kafka主题中读取消息。如果您有多个Kafka源,您可以使用相同的消费者组来配置它们,因此每个人都将为主题读取一组独特的分区。

属性名称                         默认             描述

channels                          -                      

type                                 -                 组件类型名称,需要org.apache.flume.source.kafka.KafkaSource

kafka.bootstrap.servers -                source使用的Kafka集群中的brokers列表

kafka.consumer.group.id  flume        唯一确定的消费者群体。在多个源或agent中设置相同的id,表明它们是同一消费群体的一部分

kafka.topics                      -                kafka消费者将从逗号分隔的主题列表中读取消息。

kafka.topics.regex            -                 正则表达式定义源订阅的主题集。 该属性具有比kafka.topics更高的优先级,并覆盖kafka.topics(如果存在)。

batchSize                          1000            在一个批次中写入通道的最大消息数量

batchDurationMillis          1000            批次写入通道的最大时间(毫秒)只要达到第一个大小和时间,就会写入该批次。

backoffSleepIncrement     1000           kafka主题显示为空时触发的初始和增量等待时间。 等待期将会减少对Kafka主题的攻击。 一秒钟是摄取用例的理想选择,但对于拦截器的低延迟操作可能需要较低的值。

maxBackoffSleep               5000           kafka主题显示为空时触发的最长等待时间。 5秒钟对于摄取用例来说是理想的,但对于拦截器的低延迟操作可能需要较低的值。

useFlumeEventFormat       false           默认情况下,事件是从Kafka主题直接进入事件主体的字节。 设置为true,以Flume Avro二进制格式读取事件。

                                与KafkaSink上的相同属性或Kafka通道上的parseAsFlumeEvent属性一起使用,将保留在生产端发送的任何Flume头。

setTopicHeader                 true            设置为true时,将检索到的消息的主题存储到由topicHeader属性定义的标题中。

topicHeader                     topic         如果setTopicHeader属性设置为true,则定义在其中存储从中接收消息的主题名称的标头名称如果与Kafka Sink topicHeader属性相结合,应该小心,以避免发送消息回到同一个主题的循环

migrateZookeeperOffsets  true         当找不到Kafka存储的偏移量时,在Zookeeper中查找偏移量并将它们提交给Kafka。 这应该是true,以支持从旧版本的Flume无缝的Kafka客户端迁移。 一旦迁移,这可以设置为false,

                                          但通常不需要。 如果找不到Zookeeper偏移量,则Kafka配置kafka.consumer.auto.offset.reset定义如何处理偏移量。

kafka.consumer.security.protocol   PLAINTEXT  如果使用某种安全级别写入Kafka,则设置为SASL_PLAINTEXT,SASL_SSL或SSL。

more consumer security props                      如果使用SASL_PLAINTEXT,SASL_SSL或SSL,请参阅Kafka安全性(http://kafka.apache.org/documentation.html#security)以获取需要在使用者上设置的其他属性。

Other Kafka Consumer Properties                 这些属性用于配置Kafka消费者。可以使用Kafka支持的任何消费者属性。 唯一的要求是预先使用前缀kafka.consumer的属性名称。 例如:kafka.consumer.auto.offset.reset

注释Kafka Source覆盖了两个Kafka使用者参数:源代码将auto.commit.enable设置为“false”,并且提交每个批次。 kafka源保证至少一次消息检索策略。 源文件启动时可以存在重复文件。 
Kafka Source还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer)和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值。不建议修改这些参数。

通过逗号分隔的主题列表进行主题订阅的示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id

正则表达式主题订阅示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used

安全和kafka Source:

    Flume和Kafka之间的通信通道支持安全认证和数据加密。 对于安全身份验证SASL / GSSAPI(Kerberos V5)或SSL(即使参数名为SSL,实际协议是TLS实现),可以使用Kafka 0.9.0版本。截至目前,数据加密仅由SSL/TLS提供。

将kafka.consumer.security.protocol设置为以下任何值意味着:

SASL_PLAINTEXT    #无数据加密的Kerberos或纯文本身份验证
SASL_SSL          #使用数据加密的Kerberos或纯文本身份验证
SSL               #基于TLS的加密与可选的认证。

#警告启用SSL时性能会降低,其大小取决于CPU类型和JVM实现。

TLS和Kafka Source:

       请阅读配置Kafka客户端SSL中描述的步骤(http://kafka.apache.org/documentation/#security_configclients),了解其他配置设置,以便进行微调,例如以下任一项:security provider(安全提供程序),cipher suites(密码套件),enabled protocols(启用的协议),truststore(信任库)或keystore types(密钥库类型)。

       服务器端身份验证和数据加密的示例配置:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SSL
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

注意:默认情况下,属性ssl.endpoint.identification.algorithm没有定义,所以不执行主机名验证。 为了启用主机名验证,请设置以下属性

a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS

启用后,客户端将根据以下两个字段之一验证服务器的完全限定的域名(FQDN):

Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

如果还需要客户端身份验证,则还应在Flume代理配置中添加以下内容。 每个Flume agent必须拥有其客户证书,这个证书必须由kafka brokers单独或通过签名链来信任。 常见的例子是通过一个单一的根CA来签署每个客户端证书,而后者又被kafka brokers信任。

a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>

如果密钥库和密钥使用不同的密码保护,则ssl.key.password属性将为这两个消费者密钥库提供所需的额外密码:

a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>

Kerberos and Kafka Source:

要使用带有Kerberos的Kafka集群来使用Kafka源,请为consumer设置上面提到的consumer.security.protocol属性。 与Kafka代理一起使用的Kerberos密钥表和主体在JAAS文件的“KafkaClient”部分中指定。“Client”部分描述了Zookeeper连接(如果需要的话)。有关JAAS文件内容的信息,请参阅Kafka文档(http://kafka.apache.org/documentation.html#security_sasl_clientconfig)。 这个JAAS文件的位置以及可选的全系统kerberos配置可以通过flume-env.sh中的JAVA_OPTS指定:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用SASL_PLAINTEXT的示例安全配置:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka

使用SASL_SSL的安全配置示例:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

示例JAAS文件。 有关其内容的参考,请参阅SASL配置的Kafka文档中(http://kafka.apache.org/documentation#security_sasl_clientconfig)所需的身份验证机制(GSSAPI / PLAIN)的客户端配置部分。 由于Kafka Source也可以连接到Zookeeper进行偏移迁移,因此本例中也添加了“Client”部分。 除非需要偏移迁移,否则这将不再需要,或者您需要本节来介绍其他安全组件。 另外,请确保Flume进程的操作系统用户对jaas和keytab文件具有读取权限。

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

1.9 NetCat TCP Source

       一个类似netcat的源代码,它侦听给定的端口并将每行文本转换为一个事件。 像nc -k -l [host] [port]一样行事。 换句话说,它会打开一个指定的端口并侦听数据。 期望的是提供的数据是换行分隔的文本。 每行文本都会变成Flume事件并通过连接的通道发送。

       所需的属性以粗体显示。

属性名称                默认             描述

channels                -                      

type                       -                 组件类型名称,需要netcat

bind                        -                要绑定的主机名或IP地址

port                        -                 要绑定的端口号

max-line-length    512             每个事件主体的最大行长度(以字节为单位)

ack-every-event    true            对每个收到的事件做出“OK”的回应

selector.type    replicating       replicating or multiplexing

selector.*                                  取决于selector.type的值

interceptors           -                 空格分隔的拦截列表

interceptors.*   

agent名称为a1的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

1.10 NetCat UDP Source

根上面类似,可就type换成了netcatudp,参考连接http://flume.apache.org/FlumeUserGuide.html#netcat-udp-source

1.11 Sequence Generator Source

      一个简单的序列发生器,连续产生具有从0开始的计数器的事件,增加1并在totalEvents处停止。 当无法将事件发送到频道时重试。 主要用于测试。 在重试期间,重试消息的主体保持与以前相同,以便唯一事件的数量(在目的地去重复之后)预期等于指定的totalEvents。 所需的属性以粗体显示。参考连接:http://flume.apache.org/FlumeUserGuide.html#sequence-generator-source

1.12 Syslog Sources

读取系统日志数据并生成Flume事件。 UDP源将整个消息视为单个事件。 TCP源为由换行符('n')分隔的每个字符串创建一个新事件。所需的属性以粗体显示。

Syslog TCP Source

原始的,经过验证的系统日志TCP源。

属性名称                默认             描述

channels                -                      

type                       -                 组件类型名称,需要syslogtcp

bind                        -                要绑定的主机名或IP地址

port                        -                 要绑定的端口号

eventSize              2500            单个事件行的最大大小(以字节为单位)

keepFields            none            将其设置为“全部”将保留事件正文中的优先级,时间戳和主机名。 也允许间隔分隔的字段列表。目前,可以包含以下字段:priority, version, timestamp, hostname。以支持“all”和“none”。

selector.type                             replicating or multiplexing

selector.*       replicating          复制取决于selector.type的值

interceptors       -                    空格分隔的拦截列表

interceptors.*

例如,名为a1的agent的系统日志TCP源:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

Multiport Syslog TCP Source

       这是一个更新,更快,多端口的Syslog TCP源版本。 请注意,端口配置设置已经替换了端口。 多端口功能意味着它可以以高效的方式一次侦听多个端口。 这个源使用Apache Mina库来做到这一点。 提供对RFC-3164和许多常见的RFC-5424格式的消息的支持。 还提供了配置每个端口使用的字符集的功能。

属性名称                默认             描述

channels                -                      

type                       -                 组件类型名称,需要syslogtcp

bind                        -                要绑定的主机名或IP地址

port                        -                 要绑定的端口号

eventSize              2500            单个事件行的最大大小(以字节为单位)

keepFields            none            将其设置为“全部”将保留事件正文中的优先级,时间戳和主机名。 也允许间隔分隔的字段列表。目前,可以包含以下字段:priority, version, timestamp, hostname。以支持“all”和“none”。

portHeader           -                 如果指定,则使用此处指定的标题名称将端口号存储在每个事件的标题中。 这允许拦截器和通道选择器根据输入端口来定制路由逻辑。

charset.default     UTF-8           将syslog事件解析为字符串时使用的缺省字符集。  

charset.port.<port>  -              字符集可以基于每个端口进行配置。

batchSize              100              尝试每个请求循环处理的最大事件数。使用默认值通常很好。

readBufferSize      1024            内部Mina读取缓冲区的大小。提供性能调优。使用默认值通常很好。

numProcessors   (auto-detected)   系统处理消息时可用的处理器数量。 默认是使用Java运行时API自动检测CPU数量。 Mina将为每个检测到的CPU产生2个请求处理线程,这通常是合理的。

selector.type     replicating             replicating or multiplexing

selector.*             -                   复制取决于selector.type的值

interceptors        -                    空格分隔的拦截列表

interceptors.*

例如,agent名为a1的多端口syslog TCP源:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port

Syslog UDP Source

跟Syslog TCP Source一致,就是type类型换成了syslogudp

博文来自:www.51niux.com

1.13 HTTP Source

       通过HTTP POST和GET接受Flume事件的源代码。 GET只能用于实验。 HTTP请求通过必须实现HTTPSourceHandler接口的可插入“处理程序”转换为通道事件。 这个处理程序需要一个HttpServletRequest并返回一个flume事件列表。 从一个Http请求处理的所有事件在一个事务中被委托给该通道,从而允许像文件通道一样提高通道的效率。 如果处理程序抛出异常,则此源将返回400的HTTP状态。如果通道已满,或者源无法将事件附加到通道,则源将返回HTTP 503 - 暂时不可用的状态。

        在一个post请求中发送的所有事件都被认为是一个批处理,并在一个事务中插入到通道中。

属性名称                默认             描述

type                       -                 组件类型名称,需要http

port                       -                 应该绑定到source的哪个端口。

bind                   0.0.0.0             要绑定的主机名或IP地址

handler      org.apache.flume.source.http.JSONHandler        处理程序类的FQCN。

handler.*              -                     配置处理程序的参数

selector.type      replicating         replicating or multiplexing

selector.*                                   取决于selector.type的值

interceptors         -                  空格分隔的拦截列表

interceptors.*                         

enableSSL            false             设置属性为true,以启用SSL。HTTP源不支持SSLv3。

excludeProtocols  SSLv3          要排除的空格分隔的SSL/TLS协议列表。 总是排除SSLv3。

keystore                                   密钥库的位置包括密钥库文件名

keystorePassword Keystore password

例如,名为a1的agent的http源:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props

JSONHandler

       提供了一个处理程序,它可以处理以JSON格式表示的事件,并支持UTF-8,UTF-16和UTF-32字符集。 处理程序接受事件数组(即使只有一个事件,事件必须以数组形式发送),并根据请求中指定的编码将其转换为Flume事件。 如果没有指定编码,则假定为UTF-8。 JSON处理程序支持UTF-8,UTF-16和UTF-32。 事件表示如下。

[{
  "headers" : {
             "timestamp" : "434324343",
             "host" : "random_host.example.com"
             },
  "body" : "random_body"
  },
  {
  "headers" : {
             "namenode" : "namenode.example.com",
             "datanode" : "random_datanode.example.com"
             },
  "body" : "really_random_body"
  }]

      要设置字符集,请求必须具有指定为application / json的内容类型; charset = UTF-8(根据需要用UTF-16或UTF-32替换UTF-8)。

      以这种处理程序所期望的格式创建事件的一种方法是使用Flume SDK中提供的JSONEvent,并使用Gson#fromJson(Object,Type)方法使用Google Gson创建JSON字符串。 作为事件列表的此方法的第二个参数传递的类型标记可以通过以下方法创建:

Type type = new TypeToken<List<JSONEvent>>() {}.getType();

BlobHandler

       默认情况下,HTTPSource将JSON输入拆分成Flume事件。 作为替代,BlobHandler是HTTPSource的处理程序,它返回一个包含请求参数的事件以及与此请求一起上传的二进制大对象(BLOB)。 例如PDF或JPG文件。 请注意,这种方法不适用于非常大的对象,因为它缓冲了RAM中的整个BLOB。

handler   #这个类的FQCN:org.apache.flume.sink.solr.morphline.BlobHandler
handler.maxBlobLength    #给定请求的最大读取和缓冲字节数,100000000

1.14 Stress Source

      StressSource是一个内部的负载生成源实现,这是非常有用的压力测试。 它允许用户配置事件有效负载的大小,并标题为空。 用户可以配置要发送的事件总数以及要发送的成功事件的最大数量。

      所需的属性以粗体显示。

属性名称              默认             描述

type                     -                 组件类型名称,需要org.apache.flume.source.StressSource

size                     500              每个事件的有效载荷大小。单位:字节

maxTotalEvents  -1                最大发送事件数

maxSuccessfulEvents  -1      成功发送的最大事件数  

batchSize           1                 一次要发送的事件数

agent名称为a1的示例:

a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1

1.15 Legacy Sources

      传统资源允许Flume 1.x代理接收来自Flume 0.9.4代理的事件。 它接受Flume 0.9.4格式的事件,将它们转换为Flume 1.0格式,并将它们存储在连接的通道中。 像时间戳,pri,主机,nanos等0.9.4事件属性被转换为1.x事件标题属性。 传统源支持Avro和Thrift RPC连接。 要在两个Flume版本之间使用这个桥接,您需要使用avroLegacy或thriftLegacy源代码启动一个Flume 1.x代理。 0.9.4代理应该有代理Sink指向1.x代理的主机/端口。

     注意Flume 1.x的可靠性语义与Flume 0.9.x的不同。 遗留源不支持Flume 0.9.x代理的E2E或DFO模式。 尽管1.x流程的可靠性设置一旦被遗留源保存到Flume 1.x通道,事件就可以应用于唯一支持的0.9.x模式。

     链接地址:http://flume.apache.org/FlumeUserGuide.html#legacy-sources

1.16 Custom Source

     自定义源是您自己的Source接口的实现。 在启动Flume代理时,自定义源的类及其依赖关系必须包含在代理的类路径中。 自定义源的类型是其FQCN。

属性名称              默认             描述

channels             -

type                     -                 组件类型名称,需要是您的FQCN

selector.type                           replicating or multiplexing

selector.*                                   取决于selector.type的值

interceptors      replicating              空格分隔的拦截列表

interceptors.*       

agent名称为a1的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1

1.17 Scribe Source

        Scribe是另一种类型的摄取系统。 采用现有的Scribe系统,Flume应该使用基于Thrift的ScribeSource兼容的传输协议。对于Scribe的部署,请按照来自Facebook的指导。 所需的属性以粗体显示。

属性名称              默认             描述

type                     -                 组件类型名称,需要是org.apache.flume.source.scribe.ScribeSource

port                     1499            scribe应该被连接的端口

maxReadBufferBytes  16384000  Thrift默认FrameBuffer大小

workerThreads    5                    在thrift中传递中启动的线程数量

selector.type         
selector.*

agent名称为a1的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1

二、Flume Sinks

2.1 HDFS Sink

       此接收器将事件写入Hadoop分布式文件系统(HDFS)。 它目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据所用时间,数据大小或事件数量定期滚动文件(关闭当前文件并创建一个新文件)。 它还通过诸如时间戳或发生事件的机器等属性来存储/分区数据。 HDFS目录路径可能包含格式化转义序列,它们将被HDFS接收器替换以生成存储事件的目录/文件名。 使用此接收器需要安装hadoop,以便Flume可以使用Hadoop罐与HDFS集群进行通信。 请注意,需要支持sync()调用的Hadoop版本。

       以下是支持的转义序列:

%{host}      #替换名为“host”的事件标题的值。 任意标题名称被支持。
%t           #Unix时间以毫秒为单位
%a           #本地的短周日名称(星期一,二,...)
%A           #locale的周日全名(星期一,星期二,...)
%b           #locale的简短月份名称(Jan,Feb,...)
%B           #地区的长月名(1月,2月,...)    
%c           #地点的日期和时间(Thu Mar 3 23:05:25 2005)
%d           #月日(01)
%e           #day of month without padding (1)
%D           #date; same as %m/%d/%y
%H           #hour (00..23)    
%I           #hour (01..12)    
%j           #day of year (001..366)    
%k           #hour ( 0..23)    
%m           #month (01..12)    
%n           #month without padding (1..12)    
%M           #minute (00..59) 
%p           #语言环境相当于am或pm
%S           # 自UTC 时间 1970-01-01 00:00:00 以来所经过的秒数
%S           #second (00..59)
%y           #年份最后两位数位 (00-99)
%Y           #year (例如2010)
%z           # 数字时区(例如,-0400)
%[localhost] #替换agent正在运行的主机的主机名
%[IP]        #替换agent正在运行的主机的IP地址
%[FQDN]      #替换agent正在运行的主机的规范主机名

注意:转义字符串%[localhost],%[IP]和%[FQDN]都依赖于Java获取主机名的能力,而在某些网络环境中,主机名可能会失败。

正在使用的文件的名称将在最后包含“.tmp”。 文件关闭后,该扩展名将被删除。 这允许排除目录中的部分完整文件。 所需的属性以粗体显示。

注意对于所有与时间相关的转义序列,事件标题之间必须存在带“timestamp”键的标题(除非hdfs.useLocalTimeStamp设置为true)。 一种自动添加的方法是使用TimestampInterceptor。

属性名称                   默认             描述

channels                   -

type                           -                 组件类型名称,需要是hdfs

hdfs.path                  -                  HDFS目录路径(例如hdfs:// namenode/flume/webdata/)

hdfs.filePrefix        FlumeData       名称作为由Flume在hdfs目录中创建的文件的前缀

hdfs.fileSuffix             -                  后缀追加到文件(例如.avro - 注:周期不会自动添加)

hdfs.inUsePrefix         -                 用于flume往hdfs里面写文件时候的前缀

hdfs.inUseSuffix        .tmp             用于flume往hdfs里面写文件时候的后缀

hdfs.rollInterval         30                滚动当前文件之前等待的秒数(0 =不基于时间间隔滚动)

hdfs.rollSize              1024            触发滚动的文件大小(以字节为单位)(0:根据文件大小决不滚动)

hdfs.rollCount           10               滚动前写入文件的事件数量(0 =根据事件数量决不滚动)

hdfs.idleTimeout       0                 超时之后,非活动文件关闭(0 =禁止自动关闭空闲文件)

hdfs.batchSize          100              在刷新到HDFS之前写入文件的事件数量

hdfs.codeC                -                 压缩编解码器。 以下之一:gzip,bzip2,lzo,lzop,snappy

hdfs.fileType       SequenceFile     文件格式:目前SequenceFile,DataStream或CompressedStream(1)DataStream不会压缩输出文件,请不要设置codeC(2)CompressedStream需要设置hdfs.codeC与一个可用的codeC

hdfs.maxOpenFiles   5000            只允许此数目的打开文件。如果超过这个数字,最老的文件就会被关闭。                  

hdfs.minBlockReplicas  -              指定每个HDFS块的最小数量的副本。 如果未指定,则它来自类路径中的默认Hadoop配置。

hdfs.writeFormat     Writable       序列文件记录的格式。text或Writable。在使用Flume创建数据文件之前设置为text,否则Apache Impala(incubating)或Apache Hive无法读取这些文件

hdfs.callTimeout     10000           HDFS操作允许的毫秒数,例如打开,写入,刷新,关闭。 如果发生许多HDFS超时操作,则应该增加此数字。

hdfs.threadsPoolSize  10             HDFS IO操作的每个HDFS接收器的线程数(打开,写入等)     

hdfs.rollTimerPoolSize  1             调度定时文件滚动的每个HDFS接收器的线程数

hdfs.kerberosPrincipal  -             用于访问安全HDFS的Kerberos用户主体

hdfs.kerberosKeytab   -               用于访问安全HDFS的Kerberos密钥表

hdfs.proxyUser    

hdfs.round             false              如果时间戳被四舍五入(如果正确,将影响所有基于时间的转义序列,除了% t)

hdfs.roundValue      1                  舍入到最高的倍数(在使用hdfs.roundUnit配置的单元中),小于当前时间。

hdfs.roundUnit       second         舍入值的单位 - second, minute or hour

hdfs.timeZone    Local Time         应该用于解析目录路径的时区的名称,例如America/Los_Angeles。

hdfs.useLocalTimeStamp    false     使用本地时间(而不是来自事件头的时间戳),同时替换转义序列。

hdfs.closeTries         0                   启动关闭尝试后,接收器必须尝试重命名文件的次数。 如果设置为1,则此接收器不会重新尝试失败的重命名(例如,由于NameNode或DataNode失败),并且可能会使文件保持打开状态,扩展名为.tmp。 如果设置为0,接收器将尝试重命名文件,直到文件最终重命名为止(对尝试次数没有限制) 如果关闭调用失败,文件仍可能保持打开状态,但数据将保持不变,在这种情况下,只有在重新启动Flume后,文件才会关闭。

hdfs.retryInterval   180             连续尝试关闭文件之间的时间(以秒为单位)。 每次关闭呼叫都需要多次到Namenode的RPC往返,因此将其设置得太低可能会在名称节点上造成大量负载。 如果设置为0或更小,则在第一次尝试失败时,接收器不会尝试关闭文件,并可能使文件保持打开状态或带有“.tmp”扩展名。

serializer             TEXT             其他可能的选项包括avro_event或EventSerializer.Builder接口实现的完全限定类名。

serializer.*

agent名称为a1的示例:   

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

 以上配置将会将时间戳降至最后10分钟。 例如,具有2012年6月12日11:54:34 AM时间戳的事件将导致hdfs路径变为/flume/events/2012-06-12/1150/00。   

2.2 Hive Sink

     该接收器将包含带分隔符的文本或JSON数据的事件直接放入Hive表或分区中。 事件是使用Hive事务写入的。 只要有一组事件提交给Hive,它们就会立即被Hive查询看到。 水槽将流入的分区既可以预先创建,也可以选择Flume(如果缺失)创建。 传入事件数据的字段映射到Hive表中的相应列。     

属性名称                   默认             描述

channels                   -

type                           -                 组件类型名称,需要是hive

hive.metastore         -                 Hive Metastore URI(例如:thrift://a.b.com:9083)

hive.database           -                 hive 数据库名称

hive.table                  -                 hive表名称

hive.partition             -                 逗号分隔列表标识要写入的分区的分区值。 可能包含转义序列。例如:‘Asia,India,2014-02-26-01-21’ will indicate continent=Asia,country=India,time=2014-02-26-01-21

hive.txnsPerBatchAsk  100          Hive提供了一批事务,而不是像Flume这样的流客户端。此设置配置每个事务批次所需事务的数量。单个批处理中的所有事务的数据将在一个文件中结束。Flume将在批处理中的每个事务中写入最大的批量事件。这个设置与batchSize一起提供了对每个文件大小的控制。注意,最终Hive会透明地将这些文件压缩成更大的文件。

heartBeatInterval      240            (以秒为单位)发送给Hive的连续检测信号之间的间隔,以防止未使用的事务过期。 将此值设置为0可禁用心跳。

autoCreatePartitions  true           Flume会自动创建必要的Hive分区来流向

batchSize               150000          在单个Hive事务中写入Hive的最大事件数

maxOpenConnections   500        只允许这个打开的连接数量。 如果超过这个数字,则关闭最近最少使用的连接。

callTimeout            10000             (以毫秒为单位)Hive和HDFS  I/O操作超时,例如openTxn,写入,提交和中止。

serializer                                     序列化器负责解析事件中的字段并将其映射到配置单元表中的列。 序列化器的选择取决于事件中数据的格式。 支持的序列化器:DELIMITED和JSON

roundUnit             minute           舍入值的单位- second, minute or hour.

hdfs.timeZone    Local Time         应该用于解析目录路径的时区的名称,例如America/Los_Angeles。

hdfs.useLocalTimeStamp    false     使用本地时间(而不是来自事件头的时间戳),同时替换转义序列。

serializer.delimiter          ,                (类型:字符串)传入数据中的字段分隔符。 要使用特殊字符,请用双引号括住它们,如“\ t”

serializer.fieldnames     -               从配置单元表中的输入字段到列的映射。 指定为表格列名称的逗号分隔列表(不含空格),以输入字段的顺序标识输入字段。 要跳过字段,请不要指定列名称。 例如。 'time,ip,message'表示输入映射到hive表中的time,ip和message列的第一,第三和第四个字段。

serializer.serdeSeparator   Ctrl-A          (类型:字符)自定义底层serde使用的分隔符。 如果serializer.fieldnames中的字段与table列的顺序相同,则serializer.delimiter与serializer.serdeSeparator相同,serializer.fieldnames中的字段数小于或等于表的number 列,因为传入事件正文中的字段不需要重新排序以匹配表列的顺序。为"\t"等特殊字符使用单引号。确保输入字段不包含此字符。 注意:如果serializer.delimiter是单个字符,最好设置为相同的字符

以下串行器提供了Hive接收器:

JSON   #处理UTF8编码的Json(严格语法)事件并且不需要配置。 JSON中的对象名称直接映射到Hive表中具有相同名称的列。 内部使用org.apache.hive.hcatalog.data.JsonSerDe,但独立于Hive表的Serde。 该序列化程序需要安装HCatalog。
DELIMITED  #处理简单的分隔文本事件。 内部使用LazySimpleSerde,但独立于Hive表的Serde。

以下是支持的转义序列(跟hdfs差不多,就是没有%n,%[localhost]、%[IP] 、%[FQDN]  )。

示例Hive表:

create table weblogs ( id int , msg string )
    partitioned by (continent string, country string, time string)
    clustered by (id) into 5 buckets
    stored as orc;

agent名称为a1的示例:

a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg

以上配置将会将时间戳降至最后10分钟。 例如,将时间戳标题设置为2012年6月12日上午11:54:34,将“country”标题设置为“india”的事件将评估为分区(continent ='asia',country ='india',time ='2012-06-12-11-50'。序列化程序配置为接受包含三个字段的选项卡分隔输入,并跳过第二个字段。

2.3 Logger Sink

在INFO级别记录事件。 通常用于测试/调试目的。所需的属性以粗体显示。 此接收器是唯一不需要在“记录原始数据”部分中(http://flume.apache.org/FlumeUserGuide.html#logging-raw-data)说明的额外配置的例外。

参考链接:http://flume.apache.org/FlumeUserGuide.html#logger-sink

2.4 Avro Sink

        这个接收器构成了Flume分层收集支持的一半。发送到这个接收器的Flume事件被转换为Avro事件并发送到配置的主机名/端口对。事件从配置好的批处理大小中从配置的通道中提取。所需的属性以粗体显示。

属性名称                   默认             描述

channels                   -

type                           -                 组件类型名称,需要是avro

hostname                  -                 要绑定的主机名或IP地址。

port                           -                  监听端口

batch-size                100               事件的数量,以便批量发送。

connect-timeout      20000          允许第一个(握手)请求的时间(ms)。

request-timeout      20000          在第一次之后允许请求的时间(ms)。

reset-connection-interval  none     连接到下一跳之前的时间量被重置。 这将迫使Avro Sink重新连接到下一跳。 当添加新闻主机时,这将允许接收器连接到硬件负载均衡器后面的主机,而无需重新启动代理。

compression-type    none            这可能是“none”或“deflate”。压缩型必须符合与压缩型相匹配的AvroSource

compression-level     6                 压缩事件的压缩级别。 0 =不压缩,1-9压缩。 数字越高压缩越多

ssl                              false             设置为true,为此AvroSink启用SSL。 配置SSL时,可以选择设置“truststore”,“truststore-password”,“truststore-type”,并指定是否“trust-all-certs”。

trust-all-certs          false               如果将其设置为true,则不会检查远程服务器(Avro来源)的SSL服务器证书。 这不应该在生产中使用,因为它使得攻击者更容易执行中间人攻击并在加密连接上“监听”。

truststore                 -                     定制Java信任库文件的路径。 Flume使用此文件中的证书颁发机构信息来确定是否应信任远程Avro Source的SSL身份验证凭证。 如果未指定,则将使用默认的Java JSSE证书颁发机构文件(通常是Oracle JRE中的“jssecacerts”或“cacerts”)。

truststore-password   -                     指定的信任库的密码。

truststore-type         JDK                 Java信任库的类型。 这可以是“JKS”或其他支持的Java信任库类型。

exclude-protocols    SSLv3             要排除的空格分隔的SSL/TLS协议列表。 除了指定的协议之外,SSLv3将始终被排除。

maxIoWorkers    2 * the number of available processors in the machine          I/O工作线程的最大数量。 这是在NettyAvroRpcClient NioClientSocketChannelFactory上配置的。

agent名称为a1的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

博文来自:www.51niux.com

2.5 Thrift Sink

      这个sink形成了Flume分层收集支持的一半。 发送到此接收器的流量事件将转换为Thrift事件并发送到配置的主机名/端口对。 这些事件是从已配置的通道中以批量配置的批处理大小获取的。

     通过启用Kerberos身份验证,可将节点接收器配置为以安全模式启动。 为了与安全模式下的Thrift源进行通信,Thrift接收器也应以安全模式运行。 client-principal和client-keytab是由Thrift接收器用来验证kerberos KDC的属性。 server-principal表示此接收器配置为以安全模式连接到的Thrift源的主体。 所需的属性以粗体显示。

好多参数跟上面差不多就不翻译了:http://flume.apache.org/FlumeUserGuide.html#thrift-sink

下面是个例子:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

2.6 File Roll Sink

      在本地文件系统上存储事件。 所需的属性以粗体显示。

属性名称                   默认             描述

channels                   -

type                           -                 组件类型名称,需要是file_roll

sink.directory           -                 文件将被存储的目录

sink.pathManager   DEFAULT      使用的路径管理器实现。

sink.pathManager.extension   -    文件扩展名(如果使用默认的PathManager)。

sink.pathManager.prefix  -             如果使用默认的PathManager,则添加到文件名开头的字符串

sink.rollInterval          30                每30秒滚动一次文件。 指定0将禁用滚动,并导致所有事件被写入到一个文件。

sink.serializer            TEXT              其他可能的选项包括avro_event或EventSerializer.Builder接口实现的FQCN。

batchSize                  100

下面是个例子:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

2.7 Null Sink

       放弃从通道收到的所有事件。 所需的属性以粗体显示。没啥参数一个type设置成null,然后还有一个batchSize.下面是例子:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1

2.8 HBaseSinks、AsyncHBaseSink、MorphlineSolrSink

HBaseSinks:http://flume.apache.org/FlumeUserGuide.html#hbasesinks

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1

AsyncHBaseSink : http://flume.apache.org/FlumeUserGuide.html#asynchbasesink

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.k1.channel = c1

MorphlineSolrSink: http://flume.apache.org/FlumeUserGuide.html#morphlinesolrsink

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.channel = c1
a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
# a1.sinks.k1.morphlineId = morphline1
# a1.sinks.k1.batchSize = 1000
# a1.sinks.k1.batchDurationMillis = 1000

2.9 ElasticSearchSink

      该接收器将数据写入到elasticsearch集群。 默认情况下,事件将被写入,以便Kibana图形界面可以显示它们 - 就像logstash(https://logstash.net/)写入它们一样。

      您的环境所需的elasticsearch和lucene-core jar必须放置在Apache Flume安装的lib目录中。 Elasticsearch要求客户端JAR的主要版本与服务器的主要版本匹配,并且两者都运行相同的次要版本的JVM。 如果这是不正确的,SerializationException将会出现。 要选择所需的版本,请首先确定elasticsearch的版本以及目标群集正在运行的JVM版本。 然后选择一个与主要版本匹配的elasticsearch客户端库。 0.19.x客户端可以与0.19.x群集进行通信; 0.20.x可以和0.20.x交谈,0.90.x可以和0.90.x交谈。 一旦确定了elasticsearch版本,然后阅读pom.xml文件以确定要使用的正确的lucene-core JAR版本。 运行ElasticSearchSink的Flume代理还应该将目标集群正在运行的JVM与次要版本匹配。

      事件将每天被写入一个新的索引。名称将< indexName>-yyy - mm - dd,其中< indexName >是indexName参数。接收器将在UTC的午夜开始写入新的索引。
事件由ElasticSearchLogStashEventSerializer默认序列化为elasticsearch。 这个行为可以被serializer参数覆盖。 该参数接受org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer或org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory的实现。 实现ElasticSearchEventSerializer已被弃用,以支持更强大的ElasticSearchIndexRequestBuilderFactory。
类型是FQCN:org.apache.flume.sink.elasticsearch.ElasticSearchSink。

所需的属性以粗体显示。

属性名称                   默认             描述

channels                   -

type                           -                 组件类型名称,需要是org.apache.flume.sink.elasticsearch.ElasticSearchSink

hostname                  -                 主机名:端口的逗号分隔列表,如果端口不存在,将使用默认端口“9300”

indexName             flume             日期将被附加到的索引的名称。 示例'flume' - >'flume-yyyy-MM-dd'支持任意头替换,例如。 %{header}用指定事件头的值替换

indexType              logs               索引文档的类型,默认为'log'支持任意的头部替换,例如。 %{header}用指定事件头的值替换

clusterName         elasticsearch     要连接到的ElasticSearch群集的名称

batchSize                100                 每个txn要写入的事件数量。

ttl                             -                     TTL在几天内设置时会导致过期的文件被自动删除,如果没有设置文件将永远不会被自动删除。 TTL在前面的整数形式中仅被接受。 a1.sinks.k1.ttl = 5以及限定符ms(毫秒),s(秒),m(分钟),h(小时),d(日)和w(星期)。 示例a1.sinks.k1.ttl = 5d将把TTL设置为5天。按照http://www.elasticsearch.org/guide/reference/mapping/ttl-field/了解更多信息。

serializer        org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer                 ElasticSearchIndexRequestBuilderFactory或ElasticSearchEventSerializer使用。 任一类的实现都被接受,但ElasticSearchIndexRequestBuilderFactory是首选。

serializer.*              -                       要传递给序列化程序的属性。


注意:使用事件头的值可以方便地使用头替换来动态决定存储事件时要使用的indexName和indexType。 使用此功能时应谨慎,因为事件提交者现在可以控制indexName和indexType。 而且,如果使用elasticsearch REST客户端,则事件提交者可以控制所使用的URL路径。

下面是一个例子:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

2.10 Kite Dataset Sink

官方文档地址:http://flume.apache.org/FlumeUserGuide.html#kite-dataset-sink

2.11 Kafka Sink   

      这是一个Flume Sink实现,可以将数据发布到Kafka主题。 其中一个目标是将Flume与Kafka集成,以便基于拉式的处理系统可以处理来自各种Flume源的数据。 目前支持Kafka 0.9.x系列版本。Flume的这个版本不再支持Kafka的旧版本(0.8.x)。

所需的属性用粗体字标记。

type   #组件类型名称,需要是org.apache.flume.sink.kafka.KafkaSink

kafka.bootstrap.servers    #Kafka-Sink将连接到broker名单,以获得主题分区列表这可能是一个brokers的部分列表,但我们建议至少两个HA。 格式是主机名:端口的逗号分隔列表。

kafka.topic   #默认值是default-flume-topic。kafka中将要发布信息的主题。 如果配置了此参数,则消息将发布到该主题。 如果事件标题包含“主题”字段,则该事件将发布到该主题,覆盖此处配置的主题。 支持任意头替换,例如。 %{header}被替换为名为“header”的事件头的值。 (如果使用替换,建议将Kafka broker的“auto.create.topics.enable”属性设置为true。)

flumeBatchSize   #默认值是100。一个批处理中要处理多少条消息。较大的批次提高了吞吐量,同时增加了延迟。

kafka.producer.acks  #默认值是1. 有多少副本必须在被认为成功写入之前确认一条信息。接受的值为0(从不等待确认),1(只等待领导),- 1(等待所有副本)将其设置为- 1,以避免在某些失败的情况下出现数据丢失。

useFlumeEventFormat   #默认值是false。默认情况下,事件直接从事件主体作为字节放到Kafka主题上。 设置为true以将事件存储为Flume Avro二进制格式。 与KafkaSource上的相同属性或Kafka通道上的parseAsFlumeEvent属性一起使用时,将保留生产端的任何Flume标头。

defaultPartitionId    #指定要发送到此通道中的所有事件的Kafka分区ID(整数),除非被partitionIdHeader覆盖。 默认情况下,如果这个属性没有设置,事件将由Kafka Producer的分区程序分配 - 包括如果指定key(或由kafka.partitioner.class指定的分区程序)。

partitionIdHeader   #设置时,接收器将从事件标题中获取使用此属性值命名的字段的值,并将消息发送到该主题的指定分区。 如果该值表示一个无效分区,则会抛出EventDeliveryException异常。 如果标题值存在,则此设置将覆盖defaultPartitionId。

allowTopicOverride   #默认值为true。当设置时,接收器将允许将消息生成到topicHeader属性指定的主题(如果提供的话)。

topicHeader    #默认值是topic。 当与allowTopicOverride一起设置时,将会使用该属性的值将消息生成到header的值中。在使用Kafka源topicHeader属性时,应注意避免创建回环。

kafka.producer.security.protocol   #默认值是PLAINTEXT。如果使用某种安全级别写入Kafka,则设置为SASL_PLAINTEXT,SASL_SSL或SSL。 请参阅下面的安全设置的其他信息。

more producer security props   #如果使用SASL_PLAINTEXT,SASL_SSL或SSL,请参阅Kafka安全性以获取需要在生产者上设置的其他属性。

Other Kafka Producer Properties   #这些属性用于配置Kafka生成器。可以使用Kafka支持的任何生产者属性。唯一的要求是使用前缀kafka.producer来预写属性名。例如:kafka.producer.linger.ms

注释Kafka Sink使用FlumeEvent标题中的主题和关键属性将事件发送到Kafka。 如果标题中存在主题,则会将该事件发送到该特定主题,覆盖为接收器配置的主题。 
如果标题中存在密钥,则Kafka将使用该密钥对主题分区之间的数据进行分区。 具有相同密钥的事件将被发送到相同的分区。 如果密钥为空,事件将被发送到随机分区。

Kafka接收器还提供了key.serializer(org.apache.kafka.common.serialization.StringSerializer)和value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值。 不建议修改这些参数。

下面给出一个Kafka接收器的配置示例。 以前缀kafka.producer开始的属性Kafka生产者。 创建Kafka生产者时传递的属性不限于本例中给出的属性。 也可以在这里包含您的自定义属性,并通过作为方法参数传入的Flume Context对象在预处理器中访问它们。

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

Security and Kafka Sink:

       Flume和Kafka之间的通信通道支持安全认证和数据加密。 对于安全身份验证SASL/GSSAPI(Kerberos V5)或SSL(即使参数名为SSL,实际协议是TLS实现),可以使用Kafka 0.9.0版本。截至目前,数据加密仅由SSL/TLS提供。

将kafka.producer.security.protocol设置为以下任一值意味着:

SASL_PLAINTEXT - 无数据加密的Kerberos或纯文本身份验证
SASL_SSL - 使用数据加密的Kerberos或纯文本身份验证
SSL - 基于TLS的加密与可选的认证。

警告:启用SSL时性能会降低,其大小取决于CPU类型和JVM实现。 

TLS and Kafka Sink:

      请阅读配置Kafka客户端SSL中描述的步骤,了解其他配置设置,以便进行微调,例如以下任一项:安全提供程序,密码套件,启用的协议,信任库或密钥库类型。

      服务器端身份验证和数据加密的示例配置。

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SSL
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

注意:默认情况下,属性ssl.endpoint.identification.algorithm没有定义,所以不执行主机名验证。 为了启用主机名验证,请设置以下属性

a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS

启用后,客户端将根据以下两个字段之一验证服务器的完全限定的域名(FQDN):

Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

      如果还需要客户端身份验证,则还应在Flume agent配置中添加以下内容。 每个Flume agent必须拥有其客户证书,这个证书必须由kafka broker单独或通过签名链来信任。 常见的例子是通过一个单一的根CA来签署每个客户端证书,而后者又被kafka broker信任。

a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>

如果密钥库和密钥使用不同的密码保护,则ssl.key.password属性将为生产者密钥库提供所需的额外密码:

a1.sinks.sink1.kafka.producer.ssl.key.password = <password to access the key>

Kerberos and Kafka Sink:

      要使用Kafka集群和Kerberos安全的Kafka集群,请为生产者设置上述producer.security.protocol属性。 与Kafka代理一起使用的Kerberos密钥表和主体在JAAS文件的“KafkaClient”部分中指定。 “客户端”部分描述了Zookeeper连接(如果需要的话)。 有关JAAS文件内容的信息,请参阅Kafka文档。 这个JAAS文件的位置以及可选的全系统kerberos配置可以通过flume-env.sh中的JAVA_OPTS指定:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用SASL_PLAINTEXT的示例安全配置:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

使用SASL_SSL的安全配置示例:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

      示例JAAS文件。 有关其内容的参考,请参阅SASL配置的Kafka文档中所需的身份验证机制(GSSAPI / PLAIN)的客户端配置部分。 与Kafka Source或Kafka Channel不同的是,除非其他连接组件需要,否则不需要“客户端”部分。 另外,请确保Flume进程的操作系统用户对jaas和keytab文件具有读取权限。

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";};

2.12 HTTP Sink

       这个接收器的行为是,它将从通道中获取事件,并使用HTTP POST请求将这些事件发送到远程服务。 事件内容作为POST正文发送。

       该接收器的错误处理行为取决于目标服务器返回的HTTP响应。 backoff(接收器退避)/ready(就绪)状态是可配置的,事务commit(提交)/rollback (回滚)结果以及事件是否有助于成功事件耗用计数。

       服务器返回状态码不可读的任何格式错误的HTTP响应将导致退避信号,并且事件不会从通道中消耗。

所需的属性以粗体显示。

属性名称                   默认             描述

channels                   -

type                           -               组件类型名称,需要是http

endpoint                   -               要发送到的完全限定的URL端点

connectTimeout     5000           套接字连接超时(以毫秒为单位)          

requestTimeout      5000          最大请求处理时间,以毫秒为单位

contentTypeHeader  text/plain    HTTP内容类型标头

acceptHeader       text/plain    HTTP接受标头值

defaultBackoff       true             是否在接收所有HTTP状态码时默认退避

defaultRollback     true           是否在接收所有HTTP状态码时默认回滚

defaultIncrementMetrics  false 是否在接收所有HTTP状态码时默认增加度量值

backoff.CODE                         对单个(即200)代码或组(即2XX)代码配置一个特定的backoff

rollback.CODE                     配置单个(即200)代码或组(即2XX)代码的特定回滚

incrementMetrics.CODE     配置单个(即200)代码或组(即2XX)代码的特定指标增量

请注意,最具体的HTTP状态码匹配用于backoff, rollback and incrementMetrics配置选项。 如果有2XX和200状态码的配置值,则200个HTTP代码将使用200值,而201-299范围内的所有其他HTTP代码将使用2XX值。

任何空或空的事件都会被消耗,而不会对HTTP端点发出任何请求。

下面是例子:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/json
a1.sinks.k1.contentTypeHeader = application/json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true

2.13 Custom Sink

      自定义接收器是您自己的接收器接口的实现。 在启动Flume代理时,自定义接收器的类及其依赖项必须包含在代理的类路径中。 自定义接收器的类型是其FQCN。 所需的属性以粗体显示。

channel 

type–     组件类型名称,需要是您的FQCN

下面是例子:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1

三、Flume Channels

       channel是事件在代理上进行的存储库。 Source添加事件,Sink删除它。

3.1 Memory Channel

       事件存储在内存中的队列中,具有可配置的最大大小。 对于需要更高吞吐量并准备在代理故障的情况下丢失暂存数据的流量来说,这是理想之选。 所需的属性以粗体显示。

属性名称                  默认             描述

type                          -               组件类型名称,需要是memory

capacity                  100             存储在通道中的事件的最大数量

transactionCapacity  100          通道将从源接收到的事件的最大数量,或者是每个事务的接收器

keep-alive               3                 以秒为单位添加或删除事件的超时时间

byteCapacityBufferPercentage  20   定义了byteCapacity和该通道中所有事件的估计总大小之间的缓冲区的百分比,以便在header中解释数据。见下文。

byteCapacity    see description     在这个通道中,最大的内存总字节被允许为所有事件的总和。实现只计数活动身体,提供#byteCapacityBufferPercentage配置参数的原因。默认值为JVM可用的最大内存的80%(即在命令行上传递的- xmx值的80%)。请注意,如果在单个JVM上有多个内存通道,并且它们碰巧持有相同的物理事件(例如,如果您使用的是来自单个源的复制通道选择器),那么这些事件大小可能被重复计算为通道byteCapacity目的。将此值设置为0将使该值回落到大约200 GB的硬内限值。

下面是例子:

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

3.2 JDBC Channel

       事件存储在由数据库支持的持久性存储中。 JDBC通道当前支持嵌入式Derby。 这是一个持续的渠道,对于可恢复性非常重要的流程来说是理想之选。 所需的属性以粗体显示。

官网文档链接地址:http://flume.apache.org/FlumeUserGuide.html#jdbc-channel

3.3 Kafka Channel

         事件存储在Kafka集群中(必须单独安装)。Kafka提供高可用性和复制,因此如果agent或kafka的broker崩溃,事件将立即可用到其他接收器

kafaka channel可以用于多种场景:

Flume源和sink - 它提供了一个可靠和高度可用的事件通道
使用Flume源代码和拦截器,但没有接收器 - 它允许将Flume事件写入Kafka主题,供其他应用程序使用
有了Flume sink,但是没有source - 这是一种低延迟,容错方式,可以将事件从Kafka发送到Flume接收器,如HDFS,HBase或Solr

    这个版本的Flume需要Kafka 0.9或更高版本,因为依赖于该版本的Kafka客户端。 频道的配置相比以前的flume版本已经改变。

配置参数的组织方式如下:

与通道相关的配置值通常在通道配置级应用,例如:a1.channel.k1.type =
与Kafka相关的配置值或者Channel如何操作都以“kafka”为前缀,(这与CommonClient Configs是分开的),例如:a1.channels.k1.kafka.topic和a1.channels.k1.kafka.bootstrap.servers。 这与hdfs接收器的工作方式并不相同
特定于生产者/消费者的特性由kafka.producer或kafka.consumer作为前缀
在可能的情况下,使用Kafka参数名称,例如:bootstrap.servers和ack

下面是个例子:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

剩下的官方文档链接:http://flume.apache.org/FlumeUserGuide.html#kafka-channel

3.4 File Channel

所需的属性就一个type。

type                  #组件类型名称,需要type。
checkpointDir         #默认值是~/.flume/file-channel/checkpoint   这是检查点文件将被存储的目录
useDualCheckpoints    #默认值是false       备份检查点。如果设置为true,则必须设置backupCheckpointDir
backupCheckpointDir   #检查点备份到的目录。 该目录不能与数据目录或检查点目录相同
dataDirs              #默认值是~/.flume/file-channel/data    用逗号分隔的目录列表来存储日志文件。在不同的磁盘上使用多个目录可以提高文件通道的性能
transactionCapacity   #默认值1000    通道支持的事务的最大大小
checkpointInterval    #默认值30000    检查点之间的时间(毫秒)
maxFileSize           #默认值2146435071  单个日志文件的最大大小(以字节为单位)
minimumRequiredSpace  #默认值524288000    最小必需可用空间(以字节为单位)。 为了避免数据损坏,当可用空间低于此值时,文件通道停止接受接收/放入请求
capacity              #默认值是1000000     通道最大容量
keep-alive            #默认值是3        等待放置操作的时间(以秒为单位)
use-log-replay-v1     #默认值是false      专门:使用旧的重播逻辑
use-fast-replay       #默认是false        专门:不使用队列重播
checkpointOnClose     #默认值是true       控制通道关闭时是否创建检查点。 在关闭时创建一个检查点,避免重播,从而加快文件通道的后续启动。
encryption.activeKey  #用于加密新数据的密钥名称
encryption.cipherProvider  #密码提供者类型,支持类型:AESCTRNOPADDING
encryption.keyProvider    #主要提供程序类型,受支持的类型:JCEKSFILE
encryption.keyProvider.keyStoreFile   #密钥库文件的路径
encrpytion.keyProvider.keyStorePasswordFile   #密钥库密码文件的路径
encryption.keyProvider.keys    #所有键的列表(例如activeKey设置的历史记录)
encyption.keyProvider.keys.*.passwordFile   #可选密钥密码文件的路径

       注意默认情况下,文件通道使用上面指定的位于用户home中的检查点和数据目录的路径。 因此,如果在代理程序中有多个活动的文件通道实例,则只有一个能够锁定目录并导致其他通道初始化失败。 因此,有必要为所有配置的通道提供显式路径,最好在不同的磁盘上。 此外,由于文件通道在每次提交后都会同步到磁盘,因此将多个磁盘连接在一起可能会将事件组合在一起,以便在多个磁盘不可用于检查点和数据目录的情况下提供良好的性能。

下面是一个例子:

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

Encryption(加密):

以下是一些示例配置:

从密钥存储区密码生成一个带有密码的密钥:

keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
  -keysize 128 -validity 9000 -keystore test.keystore \
  -storetype jceks -storepass keyStorePassword

生成与密钥存储密码相同的密码的密钥:

keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
  -keystore src/test/resources/test.keystore -storetype jceks \
  -storepass keyStorePassword

下面是有秘钥的配置:

a1.channels.c1.encryption.activeKey = key-0
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = key-provider-0
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0

假设你已经老化了key-0,新文件应该用key-1进行加密:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1

与上面相同的scenerio,但是key-0有自己的密码:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password

3.5 Spillable Memory Channel

       事件存储在内存中的队列和磁盘上。 内存中的队列作为主存储和磁盘溢出。 磁盘存储使用嵌入式文件通道进行管理。 当内存中队列已满时,其他传入事件将存储在文件通道中。 该通道非常适合在正常操作期间需要高吞吐量内存通道的流量,但同时需要较大容量的文件通道,以便更好地容忍间歇式接收端故障或降低排水率。 在这种异常情况下,吞吐量将大约降低到文件通道速度。 如果agent程序崩溃或重新启动,agent程序联机时仅恢复存储在磁盘上的事件。 此频道目前是实验性的,不建议用于生产。

官方文档:http://flume.apache.org/FlumeUserGuide.html#spillable-memory-channel

如果达到memoryCapacity或byteCapacity限制,则内存中队列被视为已满。

代理名称为a1的示例:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

要禁用内存中的队列和功能,如文件通道:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 0
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

要禁用使用溢出磁盘并且纯粹作为内存通道的功能:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 100000
a1.channels.c1.overflowCapacity =

3.6 Pseudo Transaction Channel

警告伪事务通道仅用于单元测试目的,不适用于生产用途。

3.7 Custom Channel

      自定义通道是您自己实现的通道界面。 在启动Flume代理时,自定义通道的类及其依赖关系必须包含在代理的类路径中。 自定义渠道的类型是其FQCN。 下面是例子:

a1.channels = c1
a1.channels.c1.type = org.example.MyChannel


作者:忙碌的柴少 分类:大数据 浏览:11762 评论:0
留言列表
发表评论
来宾的头像