# The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=3
############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 # By default the log cleaner is disabled and the log retention policy will default to #just delete segments after their retention expires. # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs #can then be marked for log compaction. log.cleaner.enable=false
#If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead replica.lag.max.messages=4000 #If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead replica.lag.time.max.ms=10000
(本节所讲述内容均基于Kafka consumer high level API)
Kafka保证同一consumer group中只有一个consumer会消息某条消息,实际上,Kafka保证的是稳定状态下每一个consumer实例只会消费某一个或多个特定 partition的数据,而某个partition的数据只会被某一个特定的consumer实例所消费。这样设计的劣势是无法让同一个 consumer group里的consumer均匀消费数据,优势是每个consumer不用都跟大量的broker通信,减少通信开销,同时也降低了分配难度,实现也 更简单。另外,因为同一个partition里的数据是有序的,这种设计可以保证每个partition里的数据也是有序被消费。
如果某consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据,如果consumer 的数量与partition数量相同,则正好一个consumer消费一个partition的数据,而如果consumer的数量多于 partition的数量时,会有部分consumer无法消费该topic下任何一条消息。
如下例所示,如果topic1有0,1,2共三个partition,当group1只有一个consumer(名为consumer1)时,该 consumer可消费这3个partition的所有数据。
<img src="/files/0924-11.jpg">
增加一个consumer(consumer2)后,其中一个consumer(consumer1)可消费2个partition的数据,另外一个consumer(consumer2)可消费另外一个partition的数据。
<img src="/files/0924-12.jpg">
再增加一个consumer(consumer3)后,每个consumer可消费一个partition的数据。consumer1消费partition0,consumer2消费partition1,consumer3消费partition2
<img src="/files/0924-13.jpg">
再增加一个consumer(consumer4)后,其中3个consumer可分别消费一个partition的数据,另外一个consumer(consumer4)不能消费topic1任何数据。
<img src="/files/0924-14.jpg">
此时关闭consumer1,剩下的consumer可分别消费一个partition的数据。
<img src="/files/0924-15.jpg">
接着关闭consumer2,剩下的consumer3可消费2个partition,consumer4可消费1个partition。
<img src="/files/0924-16.jpg">
再关闭consumer3,剩下的consumer4可同时消费topic1的3个partition。
<img src="/files/0924-17.jpg">
consumer rebalance算法如下:
* Sort PT (all partitions in topic T)
* Sort CG(all consumers in consumer group G)
* Let i be the index position of Ci in CG and let N=size(PT)/size(CG)
* Remove current entries owned by Ci from the partition owner registry
* Assign partitions from iN to (i+1)N-1 to consumer Ci
* Add newly assigned partitions to the partition owner registry
目前consumer rebalance的控制策略是由每一个consumer通过Zookeeper完成的。具体的控制方式如下:
* Register itself in the consumer id registry under its group.
* Register a watch on changes under the consumer id registry.
* Register a watch on changes under the broker id registry.
* If the consumer creates a message stream using a topic filter, it also registers a watch on changes under the broker topic registry.
* Force itself to rebalance within in its consumer group.在这种策略下,每一个consumer或者broker的增加或者减少都会触发consumer rebalance。因为每个consumer只负责调整自己所消费的partition,为了保证整个consumer group的一致性,所以当一个consumer触发了rebalance时,该consumer group内的其它所有consumer也应该同时触发rebalance。
目前(2015-01-19)最新版(0.8.2)Kafka采用的是上述方式。
但该方式有不利的方面:
* Herd effect : 任何broker或者consumer的增减都会触发所有的consumer的rebalance
* Split Brain : 每个consumer分别单独通过Zookeeper判断哪些partition down了,那么不同consumer从Zookeeper“看”到的view就可能不一样,这就会造成错误的reblance尝试。而且有可能所有的 consumer都认为rebalance已经完成了,但实际上可能并非如此。
根据Kafka官方文档,Kafka作者正在考虑在还未发布的0.9.x版本中使用中心协调器(coordinator)。 大体思想是选举出一个broker作为coordinator,由它watch Zookeeper,从而判断是否有partition或者consumer的增减,然后生成rebalance命令,并检查是否这些rebalance 在所有相关的consumer中被执行成功,如果不成功则重试,若成功则认为此次rebalance成功(这个过程跟replication controller非常类似,所以我很奇怪为什么当初设计replication controller时没有使用类似方式来解决consumer rebalance的问题)。流程如下:
<img src="/files/0924-18.jpg">
Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。Kafka的delivery guarantee semantic非常直接。当producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。 但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经commit。这一点 有点像向一个自动生成primary key的数据库表中插入数据。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以生成一种类似于primary key的东西,发生故障时幂等性的retry多次,这样就做到了Exactly one。截止到目前(Kafka 0.8.2版本,2015-01-25),这一feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从producer和broker是确保了At least once,但可通过设置producer异步发送实现At most once)。
############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 ############################# Socket Server Settings ############################# # The port the socket server listens on port=9092 # Hostname the broker will bind to and advertise to producers and consumers. # If not set, the server will bind to all interfaces and advertise the value returned from # from java.net.InetAddress.getCanonicalHostName(). #host.name=localhost # The number of threads handling network requests num.network.threads=4 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=1048576 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=1048576 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # The directory under which to store log files log.dirs=/grid/a/dfs-data/kafka-logs,/grid/b/dfs-data/kafka-logs,/grid/c/dfs-data/kafka-logs,/grid/d/dfs-data/kafka-logs,/grid/e/dfs-data/kafka-logs,/grid/f/dfs-data/kafka-logs # The number of logical partitions per topic per server. More partitions allow greater parallelism # for consumption, but also mean more files. num.partitions=8 ############################# Log Flush Policy ############################# # The following configurations control the flush of data to disk. This is the most # important performance knob in kafka. # There are a few important trade-offs here: # 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. # 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). # 3. Throughput: The flush is generally the most expensive operation. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # Per-topic overrides for log.flush.interval.ms #log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=536870912 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.cleanup.interval.mins=1 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=esv4-hcl197.grid.linkedin.com:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=1000000 # metrics reporter properties kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter kafka.csv.metrics.dir=/tmp/kafka_metrics # Disable csv reporting by default. kafka.csv.metrics.reporter.enabled=false replica.lag.max.messages=10000000
参考
使用消息队列的 10 个理由
Apache Kafka
Efficient data transfer through zero copy
Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines)