librdkafka配置参数详解

librdkafka配置参数整理。

全局配置参数

原文地址

Property C/P Range Default Description
builtin.features * gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, plugins 标示该librdkafka的支持的内建特性。应用程序可以查看或设置这些值来检查是否支持这些特性.
Type: CSV flags
client.id * rdkafka 客户端标识.
Type: string
metadata.broker.list * 初始化的broker列表,host:port格式, 应用程序可以使用rd_kafka_brokers_add()动态添加broker.
Type: string
bootstrap.servers * 参考 metadata.broker.list
message.max.bytes * 1000 .. 1000000000 1000000 消息发送最大字节数.
Type: integer
message.copy.max.bytes * 0 .. 1000000000 65535 消息缓冲区最大字节数,多出的消息将通过引用传递,但是会消耗更多的内存(struct iovec).
Type: integer
receive.message.max.bytes * 1000 .. 1000000000 100000000 kafka消息接收最大字节数,这事一种安全机制,防止在最坏的情况耗尽内存问题。建议值:fetch.message.max.bytes 分区数 + 消息的最大字节数.
Type: integer*
max.in.flight.requests.per.connection * 1 .. 1000000 1000000 broker连接数最大值,针对每个broker配置。主要用于发送消息,但是其他机制将会限制每个broker未处理消息消费数量为1个.
Type: integer
max.in.flight * 参考 max.in.flight.requests.per.connection
metadata.request.timeout.ms * 10 .. 900000 60000 没有数据操作的超时时间,单位毫秒.
Type: integer
topic.metadata.refresh.interval.ms * -1 .. 3600000 300000 数据刷新时间间隔,单位毫秒,自动刷新错误和连接,设置为-1则关闭刷新时间间隔.
Type: integer
metadata.max.age.ms * 1 .. 86400000 -1 元数据缓存最大生命周期. 默认值为数据刷新时间间隔 metadata.refresh.interval.ms 3
Type: integer*
topic.metadata.refresh.fast.interval.ms * 1 .. 60000 250 topic主题失去leader领导者时,元数据请求发送间隔. 用户快速恢复broker leader.
Type: integer
topic.metadata.refresh.fast.cnt * 0 .. 1000 10 保留,未启用.
Type: integer*
topic.metadata.refresh.sparse * true, false true 极少的元数据请求,消费者的网络带宽很小
Type: boolean
topic.blacklist * topic内名单,逗号分割增则表达式列表.
Type: pattern list
debug * generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, all 一个逗号分割的调试上下文列表,包括:生产者:broker、topic、message; 消费者:cgroup、topic、tetch等
Type: CSV flags
socket.timeout.ms * 10 .. 300000 60000 默认的网络请求超时时间, Producer:ProduceRequests将使用批处理中第一条消息的socket.timeout.ms和其余message.timeout.ms的较小值。 使用者:FetchRequests将使用fetch.wait.max.ms + socket.timeout.ms.
Type: integer
socket.blocking.max.ms * 1 .. 60000 1000 socket套接字可能阻塞的最大时间,比较小的值提高了响应速度,但是CPU负载比较大. 已废弃. Deprecated
Type: integer
socket.send.buffer.bytes * 0 .. 100000000 0 broker端发送缓冲区大小,0则使用系统默认值.
Type: integer
socket.receive.buffer.bytes * 0 .. 100000000 0 broker端接收缓冲区大小,0则使用系统默认值.
Type: integer
socket.keepalive.enable * true, false false 启用TCP keep-alives (SO_KEEPALIVE) on broker sockets
Type: boolean
socket.nagle.disable * true, false false 禁用nagle 算法TCP_NODELAY
Type: boolean
socket.max.fails * 0 .. 1000000 1 发送失败的最大次数,超过该次数后断开与broker的连接,0 禁用;注意:连接会自动重连.
Type: integer
broker.address.ttl * 0 .. 86400000 1000 broker地址解析结果缓存值(毫秒).
Type: integer
broker.address.family * any, v4, v6 any any ipv4 ipv6
Type: enum value
reconnect.backoff.jitter.ms * 0 .. 3600000 500 通过该配置参数+-50%调整broker重连.
Type: integer
statistics.interval.ms * 0 .. 86400000 0 librdkafka统计时间间隔,应用程序也应该通过注册回调函数来实现对统计指标的监控,0禁用,单位1000ms.
Type: integer
enabled_events * 0 .. 2147483647 0 See rd_kafka_conf_set_events()
Type: integer
error_cb * 发送错误的回调函数, (set with rd_kafka_conf_set_error_cb())
Type: pointer
throttle_cb * 调整回调函数 (set with rd_kafka_conf_set_throttle_cb())
Type: pointer
stats_cb * 统计回调函数 (set with rd_kafka_conf_set_stats_cb())
Type: pointer
log_cb * 日志回调函数 (set with rd_kafka_conf_set_log_cb())
Type: pointer
log_level * 0 .. 7 6 日志级别 (syslog(3) levels)
Type: integer
log.queue * true, false false 禁用内部librdkafka线程中的自发log_cb,而是使用rd_kafka_set_log_queue()将队列中的日志消息排入队列,并通过标准轮询API提供日志回调或事件。 注意:日志消息将一直存在于临时队列中,直到日志队列被设置.
Type: boolean
log.thread.name * true, false true 在日志中记录线程名称,经常用于调试librdkafka内部问题
Type: boolean
log.connection.close * true, false true 记录broker关闭事件,受0.9版本的connection.max.idle.ms参数影响,一般建议关闭该参数.
Type: boolean
socket_cb * 为socket套接字创建CLOEXEC提供回调函数
Type: pointer
connect_cb * socket连接的回调函数
Type: pointer
closesocket_cb * socket关闭的回调函数
Type: pointer
open_cb * 打开文件时CLOEXEC的回调函数
Type: pointer
opaque * 应用程序设置的上下文参数,一般用与消息发送后的回调、librdkafka注册函数回调的上下文,主要用于参数传递,由c++到c在转会c++指针寻址上 (set with rd_kafka_conf_set_opaque())
Type: pointer
default_topic_conf * 自动订阅主题的默认配置参数
Type: pointer
internal.termination.signal * 0 .. 128 0 370/5000librdkafka将用于快速终止rd_kafka_destroy()的信号。 如果未设置此信号,则会在rd_kafka_wait_destroyed()返回true之前发生延迟,因为内部线程正在超时执行其系统调用。 如果这个信号被设置,但延迟将是最小的。 应用程序应该在安装内部信号处理程序时屏蔽此信号。
Type: integer
api.version.request * true, false true 请求broker支持的API版本以调整可用协议功能的功能。 如果设置为false,或者ApiVersionRequest失败,则将使用备用版本broker.version.fallback。 注意:取决于broker版本> = 0.10.0。 如果(较旧的)代理不支持请求,则使用broker.version.fallback回退.
Type: boolean
api.version.request.timeout.ms * 1 .. 300000 10000 broker的api版本请求超时时间.
Type: integer
api.version.fallback.ms * 0 .. 604800000 1200000 Dictates how long the broker.version.fallback fallback is used in the case the ApiVersionRequest fails. NOTE: The ApiVersionRequest is only issued when a new connection to the broker is made (such as after an upgrade).
Type: integer
broker.version.fallback * 0.9.0 Older broker versions (<0.10.0) provides no way for a client to query for supported protocol features (ApiVersionRequest, see api.version.request) making it impossible for the client to know what features it may use. As a workaround a user may set this property to the expected broker version and the client will automatically adjust its feature set accordingly if the ApiVersionRequest fails (or is disabled). The fallback broker version will be used for api.version.fallback.ms. Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. Any other value, such as 0.10.2.1, enables ApiVersionRequests.
Type: string
security.protocol * plaintext, ssl, sasl_plaintext, sasl_ssl plaintext 用于和broker通信的协议,默认plaintext.
Type: enum value
ssl.cipher.suites * 根据SSL/TLS规范,客户端提交使用的加密算法. See manual page for ciphers(1) and SSL_CTX_set_cipher_list(3).
Type: string
ssl.key.location * 客户端私钥的路径.
Type: string
ssl.key.password * 私钥密码
Type: string
ssl.certificate.location * 公钥地址.
Type: string
ssl.ca.location * CA证书路径.
Type: string
ssl.crl.location * CRL 路径,用于 broker 的证书校验.
Type: string
ssl.keystore.location * keystore存储路径.
Type: string
ssl.keystore.password * keystore 密码.
Type: string
sasl.mechanisms * GSSAPI 使用 SASL 机制鉴权。 支持:GSSAPI, PLAIN. 提示: 只能配置一种机制名.
Type: string
sasl.mechanism * Alias for sasl.mechanisms
sasl.kerberos.service.name * kafka Kafka 运行的 Kerberos 首要名, not including /hostname@REALM
Type: string
sasl.kerberos.principal * kafkaclient 客户端的 Kerberos 首要名. (Not supported on Windows, will use the logon user’s principal).
Type: string
sasl.kerberos.kinit.cmd * 完整的 kerberos kinit 命令串,%{config.prop.name} 替换为与配置对象一直的值,%{broker.name} broker 的主机名.
Type: string
sasl.kerberos.keytab * Kerberos keytab 文件的路径。如果不设置,则使用系统默认的。提示:不会自动使用,必须在 sasl.kerberos.kinit.cmd 中添加到模板 … -t %{sasl.kerberos.keytab}.
Type: string
sasl.kerberos.min.time.before.relogin * 1 .. 86400000 60000 Key 恢复尝试的最小时间,毫秒.
Type: integer
sasl.username * 使用 PLAIN 机制时,SASL 用户名
Type: string
sasl.password * 使用 PLAIN 机制时,SASL 密码
Type: string
plugin.library.paths * 使用;分割的插件库列表,如果没有标明库文件后缀,则根据平台自动配置dll或者so后缀名.
Type: string
interceptors * 通过rd_kafka_conf_interceptoes添加拦截器.
Type:
group.id * 客户端分组编号id
Type: string
partition.assignment.strategy * range,roundrobin 分区策略名称(轮询、范围).
Type: string
session.timeout.ms * 1 .. 3600000 30000 客户端会话超时时间.
Type: integer
heartbeat.interval.ms * 1 .. 3600000 1000 组会话超时时间.
Type: integer
group.protocol.type * consumer 组协议类型
Type: string
coordinator.query.interval.ms * 1 .. 3600000 600000 多久查询一次当前的客户端组协调器。 如果当前分配的协调器已关闭,则在协调器重新分配的情况下,配置的查询时间间隔将除以10以更快地恢复.
Type: integer
enable.auto.commit C true, false true 在后台周期性的自动提交偏移量. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign().
Type: boolean
auto.commit.interval.ms C 0 .. 86400000 5000 消费者偏移量提交(写入)到存储的频率,毫秒。(0 = 不可用) . (0 = disable). This setting is used by the high-level consumer.
Type: integer
enable.auto.offset.store C true, false true Automatically store offset of last message provided to application.
Type: boolean
queued.min.messages C 1 .. 10000000 100000 Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue.
Type: integer
queued.max.messages.kbytes C 1 .. 2097151 1048576 Maximum number of kilobytes per topic+partition in the local consumer queue. This value may be overshot by fetch.message.max.bytes. This property has higher priority than queued.min.messages.
Type: integer
fetch.wait.max.ms C 0 .. 300000 100 Maximum time the broker may wait to fill the response with fetch.min.bytes.
Type: integer
fetch.message.max.bytes C 1 .. 1000000000 1048576 Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.
Type: integer
max.partition.fetch.bytes C Alias for fetch.message.max.bytes
fetch.min.bytes C 1 .. 100000000 1 Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires the accumulated data will be sent to the client regardless of this setting.
Type: integer
fetch.error.backoff.ms C 0 .. 300000 500 How long to postpone the next fetch request for a topic+partition in case of a fetch error.
Type: integer
offset.store.method C none, file, broker broker Offset commit store method: ‘file’ - local file store (offset.store.path, et.al), ‘broker’ - broker commit store (requires Apache Kafka 0.8.2 or later on the broker).
Type: enum value
consume_cb C Message consume callback (set with rd_kafka_conf_set_consume_cb())
Type: pointer
rebalance_cb C Called after consumer group has been rebalanced (set with rd_kafka_conf_set_rebalance_cb())
Type: pointer
offset_commit_cb C Offset commit result propagation callback. (set with rd_kafka_conf_set_offset_commit_cb())
Type: pointer
enable.partition.eof C true, false true Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition.
Type: boolean
check.crcs C true, false false Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.
Type: boolean
queue.buffering.max.messages P 1 .. 10000000 100000 Maximum number of messages allowed on the producer queue.
Type: integer
queue.buffering.max.kbytes P 1 .. 2097151 1048576 Maximum total message size sum allowed on the producer queue. This property has higher priority than queue.buffering.max.messages.
Type: integer
queue.buffering.max.ms P 0 .. 900000 0 Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency.
Type: integer
linger.ms P Alias for queue.buffering.max.ms
message.send.max.retries P 0 .. 10000000 2 How many times to retry sending a failing MessageSet. Note: retrying may cause reordering.
Type: integer
retries P Alias for message.send.max.retries
retry.backoff.ms P 1 .. 300000 100 The backoff time in milliseconds before retrying a protocol request.
Type: integer
queue.buffering.backpressure.threshold P 0 .. 1000000 10 The threshold of outstanding not yet transmitted requests needed to backpressure the producer’s message accumulator. A lower number yields larger and more effective batches.
Type: integer
compression.codec P none, gzip, snappy, lz4 none compression codec to use for compressing message sets. This is the default value for all topics, may be overriden by the topic configuration property compression.codec.
Type: enum value
compression.type P Alias for compression.codec
batch.num.messages P 1 .. 1000000 10000 Maximum number of messages batched in one MessageSet. The total MessageSet size is also limited by message.max.bytes.
Type: integer
delivery.report.only.error P true, false false Only provide delivery reports for failed messages.
Type: boolean
dr_cb P Delivery report callback (set with rd_kafka_conf_set_dr_cb())
Type: pointer
dr_msg_cb P Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())
Type: pointer

Topic configuration properties

Property C/P Range Default Description
request.required.acks P -1 .. 1000 1 This field indicates how many acknowledgements the leader broker must receive from ISR brokers before responding to the request: 0=Broker does not send any response/ack to client, 1=Only the leader broker will need to ack the message, -1 or all=broker will block until message is committed by all in sync replicas (ISRs) or broker’s min.insync.replicas setting before sending response.
Type: integer
acks P Alias for request.required.acks
request.timeout.ms P 1 .. 900000 5000 The ack timeout of the producer request in milliseconds. This value is only enforced by the broker and relies on request.required.acks being != 0.
Type: integer
message.timeout.ms P 0 .. 900000 300000 Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite.
Type: integer
queuing.strategy P fifo, lifo fifo Producer queuing strategy. FIFO preserves produce ordering, while LIFO prioritizes new messages. WARNING: lifo is experimental and subject to change or removal.
Type: enum value
produce.offset.report P true, false false Report offset of produced message back to application. The application must be use the dr_msg_cb to retrieve the offset from rd_kafka_message_t.offset.
Type: boolean
partitioner P consistent_random Partitioner: random - random distribution, consistent - CRC32 hash of key (Empty and NULL keys are mapped to single partition), consistent_random - CRC32 hash of key (Empty and NULL keys are randomly partitioned), murmur2 - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), murmur2_random - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.).
Type: string
partitioner_cb P Custom partitioner callback (set with rd_kafka_topic_conf_set_partitioner_cb())
Type: pointer
msg_order_cmp P Message queue ordering comparator (set with rd_kafka_topic_conf_set_msg_order_cmp()). Also see queuing.strategy.
Type: pointer
opaque * Application opaque (set with rd_kafka_topic_conf_set_opaque())
Type: pointer
compression.codec P none, gzip, snappy, lz4, inherit inherit Compression codec to use for compressing message sets. inherit = inherit global compression.codec configuration.
Type: enum value
compression.type P Alias for compression.codec
auto.commit.enable C true, false true If true, periodically commit offset of the last message handed to the application. This committed offset will be used when the process restarts to pick up where it left off. If false, the application will have to call rd_kafka_offset_store() to store an offset (optional). NOTE: This property should only be used with the simple legacy consumer, when using the high-level KafkaConsumer the global enable.auto.commit property must be used instead. NOTE: There is currently no zookeeper integration, offsets will be written to broker or local file according to offset.store.method.
Type: boolean
enable.auto.commit C Alias for auto.commit.enable
auto.commit.interval.ms C 10 .. 86400000 60000 The frequency in milliseconds that the consumer offsets are committed (written) to offset storage. This setting is used by the low-level legacy consumer.
Type: integer
auto.offset.reset C smallest, earliest, beginning, largest, latest, end, error largest Action to take when there is no initial offset in offset store or the desired offset is out of range: ‘smallest’,’earliest’ - automatically reset the offset to the smallest offset, ‘largest’,’latest’ - automatically reset the offset to the largest offset, ‘error’ - trigger an error which is retrieved by consuming messages and checking ‘message->err’.
Type: enum value
offset.store.path C . Path to local file for storing offsets. If the path is a directory a filename will be automatically generated in that directory based on the topic and partition.
Type: string
offset.store.sync.interval.ms C -1 .. 86400000 -1 fsync() interval for the offset file, in milliseconds. Use -1 to disable syncing, and 0 for immediate sync after each write.
Type: integer
offset.store.method C file, broker broker Offset commit store method: ‘file’ - local file store (offset.store.path, et.al), ‘broker’ - broker commit store (requires “group.id” to be configured and Apache Kafka 0.8.2 or later on the broker.).
Type: enum value
consume.callback.max.messages C 0 .. 1000000 0 Maximum number of messages to dispatch in one rd_kafka_consume_callback() call (0 = unlimited)
Type: integer*

C/P legend: C = Consumer, P = Producer, * = both

文章目录
  1. 1. 全局配置参数
  2. 2. Topic configuration properties
    1. 2.1. C/P legend: C = Consumer, P = Producer, * = both