Fluentd支持从kafka订阅数据,同时支持向kafka发布数据。这两项功能集成在一个插件中:fluent-plugin-kafka,我们在下文中分别称之为输入插件和输出插件。
td-agent-gem install fluent-plugin-kafka
此插件需要Ruby版本不低于2.1,且输入插件要求源kafka版本不低于0.9,输出插件要求目的kafka版本不低于0.8。
如果要使用插件的zookeeper相关参数,需要安装zookeeper gem,可能还需要安装linux开发工具,如ruby-devel、gcc、make等。
单消费者模式下,kafka输入插件配置说明如下:
<source>
@type kafka
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
topics <listening topics(separate with comma',')>
format <input text type (text|json|ltsv|msgpack)> :default => json
message_key <key (Optional, for text format only, default is message)>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
# Optionally, you can manage topic offset by using zookeeper
offset_zookeeper <zookeer node list (<zookeeper1_host>:<zookeeper1_port>,<zookeeper2_host>:<zookeeper2_port>,..)>
offset_zk_root_node <offset path in zookeeper> default => '/fluent-plugin-kafka'
# ruby-kafka consumer options
max_bytes (integer) :default => nil (Use default of ruby-kafka)
max_wait_time (integer) :default => nil (Use default of ruby-kafka)
min_bytes (integer) :default => nil (Use default of ruby-kafka)
</source>
kafka输入插件以topic作为Fluentd内部事件的tag。如果订阅的topic为app_event,输入插件产生的tag就会是app_event。
add_prefix和add_suffix可用于修改tag值。比如:
add_prefix kafka
会将app_event修改为kafka.app_event。
单消费者模式支持单独设置每个topic的读取偏移。
配置说明如下:
<source>
@type kafka
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
format <input text type (text|json|ltsv|msgpack)>
<topic>
topic <listening topic>
partition <listening partition: default=0>
offset <listening start offset: default=-1>
</topic>
<topic>
topic <listening topic>
partition <listening partition: default=0>
offset <listening start offset: default=-1>
</topic>
</source>
<source>
@type kafka_group
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
consumer_group <consumer group name, must set>
topics <listening topics(separate with comma',')>
format <input text type (text|json|ltsv|msgpack)> :default => json
message_key <key (Optional, for text format only, default is message)>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
retry_emit_limit <Wait retry_emit_limit x 1s when BuffereQueueLimitError happens. The default is nil and it means waiting until BufferQueueLimitError is resolved>
time_source <source for message timestamp (now|kafka|record)> :default => now
time_format <string (Optional when use_record_time is used)>
# ruby-kafka consumer options
max_bytes (integer) :default => 1048576
max_wait_time (integer) :default => nil (Use default of ruby-kafka)
min_bytes (integer) :default => nil (Use default of ruby-kafka)
offset_commit_interval (integer) :default => nil (Use default of ruby-kafka)
offset_commit_threshold (integer) :default => nil (Use default of ruby-kafka)
fetcher_max_queue_size (integer) :default => nil (Use default of ruby-kafka)
start_from_beginning (bool) :default => true
</source>
配置说明如下:
<match app.**>
@type kafka2
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
topic_key (string) :default => 'topic'
partition_key (string) :default => 'partition'
partition_key_key (string) :default => 'partition_key'
message_key_key (string) :default => 'message_key'
default_topic (string) :default => nil
default_partition_key (string) :default => nil
default_message_key (string) :default => nil
exclude_topic_key (bool) :default => false
exclude_partition_key (bool) :default => false
exclude_partition (bool) :default => false
exclude_message_key (bool) :default => false
get_kafka_client_log (bool) :default => false
headers (hash) :default => {}
headers_from_record (hash) :default => {}
use_default_for_unknown_topic (bool) :default => false
<format>
@type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
</format>
# Optional. See https://docs.fluentd.org/v/1.0/configuration/inject-section
<inject>
tag_key tag
time_key time
</inject>
# See fluentd document for buffer related parameters: https://docs.fluentd.org/v/1.0/configuration/buffer-section
# Buffer chunk key should be same with topic_key. If value is not found in the record, default_topic is used.
<buffer topic>
flush_interval 10s
</buffer>
# ruby-kafka producer options
idempotent (bool) :default => false
sasl_over_ssl (bool) :default => true
max_send_retries (integer) :default => 1
required_acks (integer) :default => -1
ack_timeout (integer) :default => nil (Use default of ruby-kafka)
compression_codec (string) :default => nil (No compression. Depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression)
</match>
topic_key category
<buffer category> # topic_key should be included in buffer chunk key
# ...
</buffer>
如果你设置了topic_key为category,那么在
<format>
:设置输出消息格式,支持json、ltsv或其他输出插件default_partition_key | partition_key_key | 消息负载均衡方式 |
---|---|---|
未设置 | 不存在 | 随机分配分区 |
已设置 | 不存在 | 分配到default_partition_key指定的分区 |
未设置 | 存在 | 含有partition_key_key字段的消息被分配到该字段指定的分区;其他消息随机分配一个分区 |
已设置 | 存在 | 含有partition_key_key字段的消息被分配到该字段指定的分区;其他消息分配到default_partition_key指定的分区 |
解决办法有两个:
升级kafka集群到最新版本,最新版更快更健壮
降级ruby-kafka或fluent-plugin-kafka以适配当前使用的kafka