TAILDIR 方式消费

参数默认描述
channels
typeThe component type name, needs to be TAILDIR.
组件类型名称需要为TAILDIR
filegroupsSpace-separated list of file groups. Each file group indicates a set of files to be tailed.
以空格分隔的文件组列表。每个文件组表示一组
filegroups.Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only.
文件组的绝对路径。正则表达式(而不是文件系统模式)只能用于文件名。
positionFile~/.flume/taildir_position.jsonFile in JSON format to record the inode, the absolute path and the last position of each tailing file.
JSON格式的文件,用于记录索引节点、绝对路径和每个尾部文件的最后位置。
headers..Header value which is the set with header key. Multiple headers can be specified for one file group.
头的值,即使用标题键设置的标题值。可以为一个文件组指定多个标题。
byteOffsetHeaderfalseWhether to add the byte offset of a tailed line to a header called ‘byteoffset’.
是否将尾行的字节偏移量添加到名为“byteoffset”的标头中。
skipToEndfalseWhether to skip the position to EOF in the case of files not written on the position file.
如果文件未写入位置文件,是否将该位置跳到EOF。
idleTimeout120000Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it.
关闭非活动文件的时间(ms)。如果关闭的文件追加了新行,此源将自动重新打开它。
writePosInterval3000Interval time (ms) to write the last position of each file on the position file.
将每个文件的最后位置写入位置文件的间隔时间(ms)。
batchSize100Max number of lines to read and send to the channel at a time. Using the default is usually fine.
一次读取和发送到通道的最大行数。使用默认值通常可以。
maxBatchCountLong.MAX_VALUEControls the number of batches being read consecutively from the same file. If the source is tailing multiple files and one of them is written at a fast rate, it can prevent other files to be processed, because the busy file would be read in an endless loop. In this case lower this value.
控制从同一文件连续读取的批数。如果源跟踪多个文件,并且其中一个文件的写入速度很快,则会阻止处理其他文件,因为繁忙的文件将以无休止的循环读取。在这种情况下,请降低此值。
backoffSleepIncrement1000The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.
当上次尝试未找到任何新数据时,重新尝试轮询新数据之前的时间延迟增量。
maxBackoffSleep5000The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.
上次尝试未找到任何新数据时,每次重新尝试轮询新数据之间的最大时间延迟。
cachePatternMatchingtrueListing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires that the file system keeps track of modification times with at least a 1-second granularity.
对于包含数千个文件的目录,列出目录和应用filename regex模式可能会非常耗时。缓存匹配文件的列表可以提高性能。文件的使用顺序也将被缓存。要求文件系统以至少1秒的粒度跟踪修改时间。
fileHeaderfalseWhether to add a header storing the absolute path filename.
是否添加存储绝对路径文件名的标头。
fileHeaderKeyfileHeader key to use when appending absolute path filename to event header.
将绝对路径文件名附加到事件头时使用的头键。

例子

例子一

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

例子二

tier1.sources = r1 r2
tier1.channels = c1 c2
tier1.sinks = k1 k2

# Describe/configure the source
#source的类型为TAILDIR,这里的类型大小写都可以
tier1.sources.r1.type = TAILDIR
tier1.sources.r1.channels = c1
tier1.sources.r2.type = TAILDIR
tier1.sources.r2.channels = c2
#存储tial最后一个位置存储位置
tier1.sources.r1.positionFile = /opt/logs/flumePosition/taildir_position.json
tier1.sources.r2.positionFile = /opt/logs/flumePosition/errordata_position.json
#设置tiail的组, 使用空格隔开
tier1.sources.r1.filegroups = f1
tier1.sources.r2.filegroups = f2
#设置每个分组的绝对路径
tier1.sources.r1.filegroups.f1 = /opt/web/logs/access_log.*
tier1.sources.r2.filegroups.f2 = /opt/logs/flumeDataProducer/Error.*
#.匹配除换行符 \n 之外的任何单字符。*匹配前面的子表达式零次或多次。这里也可以用messages.*

tier1.sources.r1.interceptors = i1
tier1.sources.r1.interceptors.i1.type = interceptor.CustomInterceptor$Builder


#是否在header中添加文件的完整路径信息
tier1.sources.r1.fileHeader = true
tier1.sources.r2.fileHeader = true

#正确内容输出kafka
tier1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.k1.kafka.topic = flumeData
tier1.sinks.k1.kafka.bootstrap.servers = pDataAnalysis01:9092,pDataAnalysis03:9092,pDataAnalysis02:9092
tier1.sinks.k1.kafka.flumeBatchSize = 20
tier1.sinks.k1.kafka.producer.acks = 1
tier1.sinks.k1.kafka.producer.linger.ms = 1
tier1.sinks.k1.kafka.producer.compression.type = snappy


# 配置sink,包括类名、posgresql的ip地址端口和账号密码、表名、字段名
tier1.sinks.k2.type = org.apache.flume.sink.pgsql.PGSQLSink
tier1.sinks.k2.hostname = 10.176.40.30
tier1.sinks.k2.port = 5432
tier1.sinks.k2.password = ypdGbfl1sOb4YAboI0tmEw52
tier1.sinks.k2.user = ap_prod_rw
tier1.sinks.k2.tablename= flume_flink_error_data
tier1.sinks.k2.column_name = data_time,error_data,json_data,error_type,events,project


# Use a channel which buffers events in memory
tier1.channels.c1.type = memory
tier1.channels.c1.capacity = 1000
tier1.channels.c1.transactionCapacity = 100

tier1.channels.c2.type = memory
tier1.channels.c2.capacity = 1000
tier1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

tier1.sources.s1.channels = c1
tier1.sinks.k1.channel = c1

tier1.sources.r2.channels = c2
tier1.sinks.k2.channel = c2