大数据(十三)flume筛选器
一、Flume channel Selectors(筛选器)
如果没有指定类型,则默认为replicating(“复制”)。
1.1 Replicating Channel Selector (default)
selector.type #默认值是replicating 组件类型名称需要replicating selector.optional #将被标记为可选的通道集合
agent名称为a1,source为r1的示例:
a1.sources = r1 a1.channels = c1 c2 c3 a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2 c3 a1.sources.r1.selector.optional = c3
在上面的配置中,c3是一个可选通道。 未能写入c3简单地被忽略。 由于c1和c2没有标记为可选,因此写入这些通道的失败将导致事务失败。
1.2 Multiplexing Channel Selector
selector.type #默认值是replicating 组件类型名称需要复用multiplexing selector.header flume.selector.header selector.default – selector.mapping.* –
agent名称为a1,source为r1的示例:
a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4
1.3 Custom Channel Selector
自定义通道选择器是您自己的ChannelSelector接口的实现。 在启动Flume代理时,自定义通道选择器的类及其相关性必须包含在代理的类路径中。 自定义渠道选择器的类型是其FQCN。
selector.type #组件类型名称,需要是您的FQCN
agent名称为a1及其source为r1的示例:
a1.sources = r1 a1.channels = c1 a1.sources.r1.selector.type = org.example.MyChannelSelector
二、Flume Sink Processors
Sink groups允许用户将多个接收器分组到一个实体中。可以使用Sink处理器来为组中的所有接收器提供负载平衡能力,或者在发生时间故障时从一个接收器到另一个接收器上实现失败。
sinks #参与该组的空格分隔列表,这个属性是必须的 processor.type #默认值是default 组件类型名称需要是default, failover or load_balance
下面是例子:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance
2.1 Default Sink Processor
默认接收处理器只接受一个接收器。 用户不必强制为单个接收器创建处理器(接收器组)。 相反,用户可以按照本用户指南中所述的source - channel - sink模式进行操作。
2.2 Failover Sink Processor
Failover Sink Processor维护一个优先级的汇聚列表,保证只要有一个可用的事件将被处理(传送)。故障转移机制的工作原理是将失败的sinks转移到一个池中,在那里他们被分配了一个冷却时间,在重新尝试之前增加了连续的失败。Sinks有优先级,更大的数字,更高的优先级。如果一个接收器在发送一个事件时失败,那么下一个优先级的接收器将被尝试发送事件。例如,一个具有优先级100的接收器在具有优先级80的接收器之前被激活。如果没有指定优先级,则根据配置中指定的Sinks的顺序来确定优先级。
要进行配置,请将接收器组处理器设置为failover(故障转移),并为所有单个接收器设置优先级。 所有指定的优先级必须是唯一的 此外,使用maxpenalty属性可以设置故障切换时间的上限(以毫秒为单位)。
下面是属性介绍,前三个是必须的属性。
sinks #多个sinks组用空格隔开 processor.type #默认是default 组件类型名称需要default processor.priority.<sinkName> #优先级值。<sinkName >必须是与当前sink组关联的一个接收器实例,一个更高的优先级值接收器会在更早的时候被激活。更大的绝对值意味着更高的优先级 processor.maxpenalty #默认值是30000 发生故障的接收器的最大回退周期(以毫秒为单位)
下面是例子:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
博文来自:www.51niux.com
2.3 Load balancing Sink Processor
负载平衡接收器处理器提供了对多个接收器进行负载平衡的能力。 它维护一个负载必须在其上分布的活动汇的索引列表。实现支持使用通过分发负载round_robin or random选择机制。选择机制的选择默认为round_robin类型,但可以通过配置覆盖。 自定义选择机制通过从AbstractSinkSelector继承的自定义类来支持。
当被调用时,这个选择器使用其配置的选择机制来选择下一个接收器并调用它。 For round_robin and random如果所选接收器未能传送事件,则处理器通过其配置的选择机制选择下一个可用接收器。这个实现并没有将失败的接收器列入黑名单,而是继续乐观地尝试所有可用的接收器。如果所有的sink调用都导致失败,则选择器将失败传播给sink runner。
如果启用了backoff,sink处理器将会将失败的接收器列入黑名单,删除它们以选择给定的超时。当超时结束时,如果sink仍然没有响应超时,则会以指数方式增加,以避免可能陷入长时间等待无响应的接收器。在此禁用的情况下,在循环中,所有失败的下沉负荷将被传递到下一个sinks,因此不平衡
下面是属性信息,前两个是必须的:
processor.sinks #多个sinks组用空格隔开 processor.type #default 组件类型名称,需要load_balance processor.backoff #false 如果失败的sinks能够成倍backed off。 processor.selector #round_robin 选择机制。 必须是从AbstractSinkSelector继承的自定义类的round_robin,random或FQCN processor.selector.maxTimeOut #默认值是30000 backoff选择器用来限制指数后退的方法(以毫秒为单位)
下面是例子:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random
#另外Custom Sink Processor现在不被支持
三、Event Serializers
file_roll接收器和hdfs接收器都支持EventSerializer接口。 下面提供了Flume附带的EventSerializer的细节。
3.1 Body Text Serializer
别名:文本。 这个拦截器将事件的主体写入到输出流中,而不进行任何转换或修改。 事件标题被忽略。 配置选项如下:
appendNewline #默认值是true 写入时是否在每个事件中附加换行符。 出于传统原因,缺省值假定事件不包含换行符。
下面是例子:
a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume a1.sinks.k1.sink.serializer = text a1.sinks.k1.sink.serializer.appendNewline = false
3.2 “Flume Event” Avro Event Serializer
别名:avro_event。
这个拦截器将Flume事件序列化成一个Avro容器文件。 所使用的模式与Avro RPC机制中用于Flume事件的模式相同。
该序列化程序继承自AbstractAvroEventSerializer类。
配置选项如下:
syncIntervalBytes #默认值是2048000 Avro同步间隔,以近似的字节数。 compressionCodec #默认值是null Avro压缩编解码器。 有关支持的编解码器,请参阅Avro的CodecFactory文档。
下面是例子:
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.serializer = avro_event a1.sinks.k1.serializer.compressionCodec = snappy
3.3 Avro Event Serializer
Alias:这个序列化器没有别名,必须使用完全合格的类名来指定。这将Flume事件序列化为Avro容器文件,如“Flume事件”Avro事件序列化器,但是记录模式是可配置的。记录模式可以指定为Flume配置属性,也可以在事件头中传递。要将记录模式作为Flume配置的一部分传递,请使用下面列出的属性schemaURL。
要在事件标头中传递记录模式,请指定事件头文件flume.avro.schema。包含模式或flume.schema的json格式表示。url有一个可以找到模式的url(hdfs:/…支持uri)。这个序列化器从AbstractAvroEventSerializer类继承。
配置选项如下:
syncIntervalBytes #默认2048000 Avro同步间隔,近似字节。 compressionCodec #默认null Avro压缩编解码器。 有关支持的编解码器,请参阅Avro的CodecFactory文档。 schemaURL #默认null Avro模式URL。 标题中指定的模式将覆盖此选项。
下面是例子:
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder a1.sinks.k1.serializer.compressionCodec = snappy a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc
四、Flume Interceptors
Flume有能力 modify/drop在in-flight中的events。这是在拦截器的帮助下完成的。 拦截器是实现org.apache.flume.interceptor.Interceptor接口的类。拦截器可以根据拦截器开发人员选择的任何标准修改甚至删除事件。Flume支持拦截拦截器。通过在配置中指定截取程序生成器类名称的列表,可以实现这一点。在源配置中,拦截器被指定为空白分隔的列表。拦截器的指定顺序是调用它们的顺序。一个拦截器返回的事件列表将传递给链中的下一个拦截器。拦截器可以修改或删除事件。 如果拦截器需要删除事件,它只是不会返回它返回的列表中的事件。 如果是放弃所有的事件,那么它只是返回一个空的列表。 拦截器是命名组件,下面是通过配置创建它们的示例:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder a1.sources.r1.interceptors.i1.preserveExisting = false a1.sources.r1.interceptors.i1.hostHeader = hostname a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d a1.sinks.k1.channel = c1
#请注意拦截器构建器被传递给类型配置参数。 拦截器本身是可配置的,并且可以像传递给其他可配置组件一样传递配置值。 在上面的例子中,事件首先被传递给HostInterceptor,然后HostInterceptor返回的事件被传递给TimestampInterceptor。 您可以指定全限定类名(FQCN)或timestamp。 如果您有多个收集器写入相同的HDFS路径,那么您也可以使用HostInterceptor。
4.1 Timestamp Interceptor
这个拦截器插入到事件头中,处理事件的时间是毫秒。 这个拦截器插入一个带有关键时间戳(或者由header属性指定)的标题,其值是相关的时间戳。 如果拦截器已经存在于配置中,该拦截器可以保留现有的时间戳。
type #组件类型名称必须是timestamp或FQCN header #默认值是timestamp 要在其中放置生成的时间戳的标题的名称。 preserveExisting #默认值是false 如果时间戳已经存在,是否应该保留 - true或false
下面是例子:
a1.sources = r1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp
4.2 Host Interceptor
此拦截器插入该代理正在运行的主机的主机名或IP地址。它插入一个带有密钥主机或配置键的头,其值是主机的主机名或IP地址,基于配置。
type #组件类型名称,必须是host preserveExisting #默认值是false 如果主机头已经存在,是否应该保留 - true或false useIP #默认值是true 如果为true,请使用IP地址,否则使用hostname。 hostHeader #默认值是host 要使用的头键。
下面是例子
a1.sources = r1 a1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = host
4.3 Static Interceptor
静态拦截器允许用户将具有静态值的静态标题附加到所有事件。
目前的实现不允许一次指定多个头文件。 相反,用户可能链接多个静态拦截器,每个拦截器定义一个静态头。
type #组件类型名称,必须是static preserveExisting #默认值是true 如果配置的头文件已经存在,是否应该保留 - true或false key #默认值是key 应该创建的头的名称 value #默认值是value 应该创建的静态值
下面是例子:
a1.sources = r1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = datacenter a1.sources.r1.interceptors.i1.value = NEW_YORK
博文来自:www.51niux.com
4.4 Remove Header Interceptor
这个拦截器通过删除一个或多个头来操纵Flume事件头。 它可以删除静态定义的标题,基于正则表达式的标题或列表中的标题。 如果这些都没有定义,或者如果没有标题符合标准,Flume事件不会被修改。
请注意,如果只需要删除一个头文件,那么通过名称来指定它可以提供比其他两种方法更好的性能。
type #组件类型名称必须是remove_header withName #要删除标题的名称 fromList #要移除的标题列表,用fromListSeparator指定的分隔符分隔 fromListSeparator #默认值是\s*,\s* 正则表达式用于在fromList指定的列表中分隔多个标头名称。默认值是一个逗号,周围有许多空格字符 matching #所有命名匹配这个正则表达式的头都被删除了
4.5 UUID Interceptor
这个拦截器为所有被拦截的事件设置一个通用的唯一标识符。 一个示例UUID是b5755073-77a9-43c1-8fad-b7a586fc1b97,它表示一个128位的值。
如果没有适用于该事件的应用程序级别唯一密钥,请考虑使用UUIDInterceptor为事件自动分配一个UUID。 一旦进入Flume网络,将UUID分配给事件就很重要; 也就是流量的第一个流量源。 这样可以在面向复制和重新传输的情况下对事件进行后续重复数据消除,而Flume网络旨在实现高可用性和高性能。 如果应用程序级别密钥可用,则优先于自动生成的UUID,因为它使用所述众所周知的应用程序级别密钥启用后续更新和删除数据存储中的事件。
type #组件类型名称必须是org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder headerName #默认值是id 要修改的Flume头的名称 preserveExisting #默认值是true 如果UUID头已经存在,是否应该保留 - true或false prefix #默认值是"" 将前缀字符串常量预先添加到每个生成的UUID
4.6 Morphline Interceptor
这个拦截器通过一个morphline配置文件(http://cloudera.github.io/cdk/docs/current/cdk-morphlines/index.html)来过滤事件,这个配置文件定义了一条从一个命令到另一个命令管道记录的转换命令链。 例如,morphline可以通过基于正则表达式的模式匹配来忽略某些事件,或者改变或插入某些事件头,或者可以通过Apache Tika自动检测和设置MIME类型,以侦听被拦截的事件。 例如,这种包嗅探可以用于Flume拓扑中基于内容的动态路由。 MorphlineInterceptor还可以帮助实现到多个Apache Solr集合的动态路由(例如,用于多租户)。
目前,拦截器的morphline不得为每个输入事件生成多个输出记录。 这个拦截器不适用于重型ETL处理 - 如果你需要这个考虑将ETL处理从Flume Source移动到Flume Sink, 到一个MorphlineSolrSink。
type #组件类型名称必须是org.apache.flume.sink.solr.morphline.MorphlineInterceptor $ Builder morphlineFile #本地文件系统到morphline配置文件的相对或绝对路径。 例如:/etc/flume-ng/conf/morphline.conf morphlineId #默认值是null 如果在morphline配置文件中有多个morphlines,可选名称用于识别morphline
示例flume.conf文件:
a1.sources.avroSrc.interceptors = morphlineinterceptor a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1
4.7 Search and Replace Interceptor
这个拦截器提供了基于Java正则表达式的简单的基于字符串的搜索和替换功能。 Backtracking/group也是可用的。 这个拦截器使用与Java Matcher.replaceAll()方法相同的规则。
type #组件类型名称必须是search_replace searchPattern #搜索和替换的模式。 replaceString #替换字符串。 charset #默认值是UTF-8 事件体的字符集。 默认情况下是UTF-8。
配置示例:
a1.sources.avroSrc.interceptors = search-replace a1.sources.avroSrc.interceptors.search-replace.type = search_replace # Remove leading alphanumeric characters in an event body. a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+ a1.sources.avroSrc.interceptors.search-replace.replaceString =
另一个例子:
a1.sources.avroSrc.interceptors = search-replace a1.sources.avroSrc.interceptors.search-replace.type = search_replace # Use grouping operators to reorder and munge words on a line. a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+) a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1
4.8 Regex Filtering Interceptor
该拦截器通过将事件正文解释为文本并将文本与配置的正则表达式进行匹配来选择性地过滤事件。 提供的正则表达式可用于包含事件或排除事件。
type #组件类型名称必须是regex_filter regex #默认值是".*" 用于匹配事件的正则表达式 excludeEvents #默认值是false 如果为true,则regex确定要排除的事件,否则regex确定要包含的事件。
4.9 Regex Extractor Interceptor
这个拦截器使用指定的正则表达式提取regex匹配组,并将匹配组附加到事件的头部。它还支持可插入序列化器来格式化匹配组,然后将它们添加为事件标题。
type #组件类型名称必须是regex_extractor regex #用于匹配事件的正则表达式 serializers #空格分隔的序列化程序列表,用于将匹配映射到标题名称并对其值进行序列化。 (请参阅下面的示例)Flume为以下序列化程序提供了内置支持:org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer serializers.<s1>.type #默认值是default 必须是默认的(org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer),org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer,或者实现org.apache.flume.interceptor.RegexExtractorInterceptorSerializer的自定义类的FQCN serializers.<s1>.name serializers.* #序列化器特定的属性
序列化器用于将匹配映射到标题名称和格式化的标题值; 默认情况下,您只需指定标头名称,将使用默认的org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer。 这个序列化器只是将匹配映射到指定的头名称,并通过由正则表达式提取的值传递。 您可以使用完全限定的类名称(FQCN)将自定义序列化器实现插入到提取器中,以便按照您喜欢的方式格式化匹配。
如果Flume事件主体包含1:2:3.4foobar5并且使用了以下配置:
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) a1.sources.r1.interceptors.i1.serializers = s1 s2 s3 a1.sources.r1.interceptors.i1.serializers.s1.name = one a1.sources.r1.interceptors.i1.serializers.s2.name = two a1.sources.r1.interceptors.i1.serializers.s3.name = three
#提取的事件将包含相同的主体,但以下头文件将被添加one => 1,two => 2,three => 3
如果Flume事件正文包含2012-10-18 18:47:57,614某些日志行和以下配置被使用:
a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d) a1.sources.r1.interceptors.i1.serializers = s1 a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
#提取的事件将包含相同的主体,但以下标题将被添加timestamp=> 1350611220000
4.10 Flume Properties
flume.called.from.service #如果指定了这个属性,那么即使配置文件没有在预期的位置找到,Flume代理也会继续轮询配置文件。 否则,如果配置不在预期位置,则Flume代理将终止。 设置此属性时不需要属性值(例如,只需指定-Dflume.called.from.service就足够了)
属性:flume.called.from.service:
Flume每30秒钟定期轮询指定配置文件的更改。 如果现有文件是第一次轮询,或者现有文件的修改日期自上次轮询以来已更改,则Flume agent将从配置文件加载新配置。 重命名或移动文件不会更改其修改时间。 当Flume agent轮询不存在的文件时,会发生以下两种情况之一:1.agent第一次轮询不存在的配置文件时,agent根据flume.called.from.service属性进行操作。 如果该属性已设置,则agent将继续轮询(始终以相同的周期 - 每30秒)。 如果该属性没有设置,那么agent立即终止。 ...或... 2.当agent程序轮询一个不存在的配置文件,并且这不是第一次轮询文件时,agent程序不会在此轮询期间更改配置。 agent继续投票而不是终止。
五、Log4J Appender
将Log4j事件附加到flume代理的avro源。使用这个appender的客户机必须在类路径中有flume-ng-sdk(例如,flume-ng-sdk-1.8.0.jar)。所需属性是前两个:
Hostname #使用avro源运行远程Flume agent的主机名 Port #远程Flume agent的avro源正在侦听的端口。 UnsafeMode #默认值是false 如果为true,appender不会在发送事件失败时抛出异常。 AvroReflectionEnabled #默认值是false 使用Avro Reflection来序列化Log4j事件。 (当用户记录字符串时不要使用) AvroSchemaUrl #可以检索Avro模式的URL。
示例log4j.properties文件:
#... log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = example.com log4j.appender.flume.Port = 41414 log4j.appender.flume.UnsafeMode = true # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
默认情况下,每个事件通过调用toString()或者使用Log4j布局(如果指定)转换为字符串。
如果事件是org.apache.avro.generic.GenericRecord,org.apache.avro.specific.SpecificRecord的实例,或者如果AvroReflectionEnabled属性设置为true,则事件将使用Avro序列化进行序列化。
使用Avro模式序列化每个事件效率不高,所以最好提供一个模式URL,通过这个模式URL可以通过下游接收器(通常是HDFS接收器)检索模式。 如果没有指定AvroSchemaUrl,那么该模式将作为一个Flume头被包含。
配置为使用Avro序列化的示例log4j.properties文件:
#... log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = example.com log4j.appender.flume.Port = 41414 log4j.appender.flume.AvroReflectionEnabled = true log4j.appender.flume.AvroSchemaUrl = hdfs://namenode/path/to/schema.avsc # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
博文来自:www.51niux.com
六、Load Balancing Log4J Appender
将Log4j事件追加到flume代理的avro源列表中。 使用这个appender的客户端必须在类路径中包含flume-ng-sdk(例如,flume-ng-sdk-1.8.0.jar)。 该appender支持循环和随机方案来执行负载平衡。 它还支持可配置的退避超时,以便临时从主机组中删除临时代理。所需属性就下面一个Hosts。
Hosts #Flume(通过AvroSource)监听事件的主机:端口的空格分隔列表 Selector #默认值是ROUND_ROBIN 选择机制。 必须是ROUND_ROBIN,RANDOM或从LoadBalancingSelector继承的类的自定义FQDN。 MaxBackoff #表示负载平衡客户端将从未能使用事件的节点退避的最大时间量(以毫秒为单位)的长整型值。默认为没有backoff UnsafeMode #默认值是false 如果为true,appender不会在发送事件失败时抛出异常。 AvroReflectionEnabled #默认值是false 使用Avro Reflection来序列化Log4j事件。 AvroSchemaUrl #可以检索Avro模式的URL。
使用默认值配置的示例log4j.properties文件:
#... log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.out2.Hosts = localhost:25430 localhost:25431 # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
使用RANDOM负载平衡配置的示例log4j.properties文件:
#... log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.out2.Hosts = localhost:25430 localhost:25431 log4j.appender.out2.Selector = RANDOM # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
示例log4j。使用backoff配置的属性文件:
#... log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432 log4j.appender.out2.Selector = ROUND_ROBIN log4j.appender.out2.MaxBackoff = 30000 # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume #...
七、安全
HDFS接收器,HBase接收器,Thrift源,Thrift接收器和Kite数据集接收器均支持Kerberos身份验证。 请参阅相应章节以配置Kerberos相关选项。
Flume代理将对Kerberos KDC进行身份验证,作为单个主体,这将由需要kerberos身份验证的不同组件使用。 为Thrift源,Thrift接收器,HDFS接收器,HBase接收器和DataSet接收器配置的主键和密钥表应该相同,否则组件将无法启动。
八、监控
在Flume的监测工作仍在进行中。变化经常发生。几个Flume组件向JMX平台MBean服务器报告指标。可以使用Jconsole查询这些指标。
8.1 JMX Reporting
JMX报告可以通过使用flume-env.sh在JAVA_OPTS环境变量中指定JMX参数来启用:
export JAVA_OPTS=”-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false”
#注意:上面的示例禁用安全性。 要启用安全性,请参阅http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html
8.2 Ganglia Reporting
Flume也可以将这些指标报告给Ganglia 3或Ganglia 3.1 metanodes。为了向Ganglia报告指标,必须从这个支持开始flume agent。Flume agent必须通过传递以下参数来启动,这是由Flume.monitoring控制的系统属性。,并且可以在flume-env. sh中指定(下面的属性前两个是必须的):
type #组件类型名称,必须是ganglia hosts #Ganglia服务器hostname:port的逗号分隔列表 pollFrequency #默认值是60 连续报告给Ganglia服务器之间的时间,以秒为单位 isGanglia3 #默认值是false Ganglia服务器版本为3.默认情况下,Flume以Ganglia 3.1格式发送
我们可以用Ganglia支持启动Flume,如下所示:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
8.3 JSON Reporting
Flume还可以以JSON格式报告指标。 为了以JSON格式启用报告功能,Flume在可配置端口上托管一台Web服务器。 Flume以下列JSON格式报告指标:
{ "typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"}, "typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"} }
这里是一个例子:
{ "CHANNEL.fileChannel":{"EventPutSuccessCount":"468085", "Type":"CHANNEL", "StopTime":"0", "EventPutAttemptCount":"468086", "ChannelSize":"233428", "StartTime":"1344882233070", "EventTakeSuccessCount":"458200", "ChannelCapacity":"600000", "EventTakeAttemptCount":"458288"}, "CHANNEL.memChannel":{"EventPutSuccessCount":"22948908", "Type":"CHANNEL", "StopTime":"0", "EventPutAttemptCount":"22948908", "ChannelSize":"5", "StartTime":"1344882209413", "EventTakeSuccessCount":"22948900", "ChannelCapacity":"100", "EventTakeAttemptCount":"22948908"} }
下面是属性type是必须属性:
type #组件类型名称必须是http port #默认是41414 启动服务器的端口。
我们可以使用JSON报告支持启动Flume,如下所示:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
Metrics将在http://<hostname>:<port>/metrics网页上提供。 自定义组件可以报告上面Ganglia部分中提到的度量。
8.4 Custom Reporting
通过编写报告服务器可以向其他系统报告度量标准。 任何报告类都必须实现接口org.apache.flume.instrumentation.MonitorService。 这样的类可以使用GangliaServer用于报告的相同方式。 他们可以轮询平台mbean服务器来轮询mbeans的指标。 例如,如果可以使用名为HTTPReporting的HTTP监视服务,如下所示:
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
属性就是:type #组件类型名称必须是FQCN
8.5 从自定义组件报告指标
任何自定义的flume组件应该从org.apache.flume.instrumentation.MonitoredCounterGroup类继承。 然后这个类应该为它提供的每个指标提供getter方法。 请参阅下面的代码。 MonitoredCounterGroup需要一个属性列表,这些属性的度量值由这个类显示。 到目前为止,这个类只支持将度量值公开为long值。
public class SinkCounter extends MonitoredCounterGroup implements SinkCounterMBean { private static final String COUNTER_CONNECTION_CREATED = "sink.connection.creation.count"; private static final String COUNTER_CONNECTION_CLOSED = "sink.connection.closed.count"; private static final String COUNTER_CONNECTION_FAILED = "sink.connection.failed.count"; private static final String COUNTER_BATCH_EMPTY = "sink.batch.empty"; private static final String COUNTER_BATCH_UNDERFLOW = "sink.batch.underflow"; private static final String COUNTER_BATCH_COMPLETE = "sink.batch.complete"; private static final String COUNTER_EVENT_DRAIN_ATTEMPT = "sink.event.drain.attempt"; private static final String COUNTER_EVENT_DRAIN_SUCCESS = "sink.event.drain.sucess"; private static final String[] ATTRIBUTES = { COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED, COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY, COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE, COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS }; public SinkCounter(String name) { super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES); } @Override public long getConnectionCreatedCount() { return get(COUNTER_CONNECTION_CREATED); } public long incrementConnectionCreatedCount() { return increment(COUNTER_CONNECTION_CREATED); } }
九、组件汇总
org.apache.flume.Channel #memory org.apache.flume.channel.MemoryChannel org.apache.flume.Channel #jdbc org.apache.flume.channel.jdbc.JdbcChannel org.apache.flume.Channel #file org.apache.flume.channel.file.FileChannel org.apache.flume.Channel #– org.apache.flume.channel.PseudoTxnMemoryChannel org.apache.flume.Channel #– org.example.MyChannel org.apache.flume.Source #avro org.apache.flume.source.AvroSource org.apache.flume.Source #netcat org.apache.flume.source.NetcatSource org.apache.flume.Source #seq org.apache.flume.source.SequenceGeneratorSource org.apache.flume.Source #exec org.apache.flume.source.ExecSource org.apache.flume.Source #syslogtcp org.apache.flume.source.SyslogTcpSource org.apache.flume.Source #multiport_syslogtcp org.apache.flume.source.MultiportSyslogTCPSource org.apache.flume.Source #syslogudp org.apache.flume.source.SyslogUDPSource org.apache.flume.Source #spooldir org.apache.flume.source.SpoolDirectorySource org.apache.flume.Source #http org.apache.flume.source.http.HTTPSource org.apache.flume.Source #thrift org.apache.flume.source.ThriftSource org.apache.flume.Source #jms org.apache.flume.source.jms.JMSSource org.apache.flume.Source #– org.apache.flume.source.avroLegacy.AvroLegacySource org.apache.flume.Source #– org.apache.flume.source.thriftLegacy.ThriftLegacySource org.apache.flume.Source #– org.example.MySource org.apache.flume.Sink #null org.apache.flume.sink.NullSink org.apache.flume.Sink #logger org.apache.flume.sink.LoggerSink org.apache.flume.Sink #avro org.apache.flume.sink.AvroSink org.apache.flume.Sink #hdfs org.apache.flume.sink.hdfs.HDFSEventSink org.apache.flume.Sink #hbase org.apache.flume.sink.hbase.HBaseSink org.apache.flume.Sink #asynchbase org.apache.flume.sink.hbase.AsyncHBaseSink org.apache.flume.Sink #elasticsearch org.apache.flume.sink.elasticsearch.ElasticSearchSink org.apache.flume.Sink #file_roll org.apache.flume.sink.RollingFileSink org.apache.flume.Sink #irc org.apache.flume.sink.irc.IRCSink org.apache.flume.Sink #thrift org.apache.flume.sink.ThriftSink org.apache.flume.Sink #– org.example.MySink org.apache.flume.ChannelSelector #replicating org.apache.flume.channel.ReplicatingChannelSelector org.apache.flume.ChannelSelector #multiplexing org.apache.flume.channel.MultiplexingChannelSelector org.apache.flume.ChannelSelector #– org.example.MyChannelSelector org.apache.flume.SinkProcessor #default org.apache.flume.sink.DefaultSinkProcessor org.apache.flume.SinkProcessor #failover org.apache.flume.sink.FailoverSinkProcessor org.apache.flume.SinkProcessor #load_balance org.apache.flume.sink.LoadBalancingSinkProcessor org.apache.flume.SinkProcessor #– org.apache.flume.interceptor.Interceptor #timestamp org.apache.flume.interceptor.TimestampInterceptor$Builder org.apache.flume.interceptor.Interceptor #host org.apache.flume.interceptor.HostInterceptor$Builder org.apache.flume.interceptor.Interceptor #static org.apache.flume.interceptor.StaticInterceptor$Builder org.apache.flume.interceptor.Interceptor #regex_filter org.apache.flume.interceptor.RegexFilteringInterceptor$Builder org.apache.flume.interceptor.Interceptor #regex_extractor org.apache.flume.interceptor.RegexFilteringInterceptor$Builder org.apache.flume.channel.file.encryption.KeyProvider$Builder #jceksfile org.apache.flume.channel.file.encryption.JCEFileKeyProvider org.apache.flume.channel.file.encryption.KeyProvider$Builder #– org.example.MyKeyProvider org.apache.flume.channel.file.encryption.CipherProvider #aesctrnopadding org.apache.flume.channel.file.encryption.AESCTRNoPaddingProvider org.apache.flume.channel.file.encryption.CipherProvider #– org.example.MyCipherProvider org.apache.flume.serialization.EventSerializer$Builder #text org.apache.flume.serialization.BodyTextEventSerializer$Builder org.apache.flume.serialization.EventSerializer$Builder #avro_event org.apache.flume.serialization.FlumeEventAvroEventSerializer$Builder org.apache.flume.serialization.EventSerializer$Builder #– org.example.MyEventSerializer$Builder