kafka目前已升级到1.1.0版本,Broker的在线upgrade比较麻烦,所以我建议大家新项目直接基于最新版本(1.1.0),旧版本不同程度存在一些bug,比较难以解决。
1、broker配置样例(未声明的配置项,建议保持默认):server.properties
############################# Server Basics ############################# # brokerId,同一集群id必须唯一,Integer类型 broker.id=0 auto.create.topics.enable=true #允许客户端直接删除,admin工具可以。 delete.topic.enable=true auto.leader.rebalance.enable=true # 单条消息最大尺寸,10M message.max.bytes=10000120 replica.fetch.max.bytes=10485760 # ISR,当Producer Ack模式为“all”时必须等待ISR列表中replicas全部写入成功(否则抛出异常,无法继续write); # 此值适用于repicas 可靠性担保。 # 如果replicas为3,此值可以设置为2 + ACK="all"可以极高的确保消息可靠性。 min.insync.replicas=2 # 默认replication个数,broker=3,replication=2 default.replication.factor=2 ############################# Socket Server Settings ############################# listeners=PLAINTEXT://10.0.34.121:9092 # 接收客户端请求的IO线程数,建议保持默认 num.network.threads=3 # 处理请求包括磁盘IO的线程数,建议保持默认,调优方式受制于磁盘并发能力、CPU个数等。 num.io.threads=8 # SO_SNDBUF socket.send.buffer.bytes=102400 # SO_RCVBUF socket.receive.buffer.bytes=102400 # 单次请求,Server允许接收的最大数据量,避免OOM。默认值为100M(104857600) socket.request.max.bytes=104857600 # 亟待处理的请求个数,超过阈值将会阻塞网络IO线程(类似于backlog) queued.max.requests=500 # 客户端等待broker响应最大等待时间(包括重试),Client可配置。默认30000秒。 request.timeout.ms=15000 # 单个IP允许建立的连接数,默认为Integer最大值,此处修正为256 max.connections.per.ip=256 ############################# Log Basics ############################# # 底层日志文件保存的文件路径,多个磁盘路径则以“,”分割,多磁盘可以提高IO并发能力。 log.dirs=/data/kafka # Topic默认的partition个数,此值越大理论上可以提高并发消费能力,不同的partitions将会分布在多个broker上。 # 可以通过AdminClient修改指定Topic的partition数量。 # partition个数,通常不建议大于Broker节点的个数。 num.partitions=1 # 在logs回复或者关闭刷盘时,每个dir所使用的IO线程个数。 num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # 目前已知kafka内部状态数据将保存在特定的topic中, # 比如offsets(由以前的kafka迁移到内部topic中)保存在“__consumer_offsets, # 每个producer的事务,保存在“__transaction_state”。 # 对于内部topic,其replicas个数,建议为“大多数派”,且最多不要超过3。 # 比如: # broker<3,replicas=1; # brokers = 3,replicas=2; # broker>=5,replicas=3; # 线上配置,broker=3;事务功能暂时关闭 offsets.topic.replication.factor=2 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # 日志刷盘策略,我们有两个重要参数可以可以权衡,分别适用在“吞吐量优先”、“数据可靠性优先”。 # 需要明确表达,如果你对数据可靠性有一定的要求,那么尚需要开启replication来保障,仅仅依赖刷盘尚且无法完全意义上确保数据的安全性。 # 如果你对系统确实是“吞吐量优先”,则可以关闭replicaiton(=1)、同时启用“按消息量”刷怕策略。 # 如下两个参数满足其一,即可Flush # 吞吐量优先,默认为关闭(Long最大值) log.flush.interval.messages=10000 # 可靠性 + 延迟优先,每个1秒刷盘,通用高优策略。 log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # 日志保留策略,存储日志可以根据“保留时间长”、“存量数据大小”两个维度,对于超出阈值的日志将会被清理,以确保存储层面更加可控。 log.cleanup.policy=delete log.cleaner.enable=true # 满足retention条件之外的“文件”,保留多久则会被cleanup。默认值为12小时。 log.cleaner.delete.retention.ms=43200000 # 日志保留 7 天:保留时长,取决于每天数据量、消费者回溯需求等。 log.retention.hours=168 ## 350G log.retention.bytes=375809638400 # 底层为LSM树,每个逻辑topic、partition都会有多个segments文件序列构成,每个segments大小,默认为1G。 # 此值建议保持默认,过小、过大都有一定的影响。 log.segment.bytes=1073741824 # 检测周期,5分钟 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # 可以包含chroot,比如:host:ip/kafka zookeeper.connect=10.0.11.107:2181,10.0.12.107:2181,10.0.11.108:2181/kafka zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# # 分组协调 (broker端) # rebalance通常对于“单组 + 多个消费者”场景,此值适用于“消费者入组后,首次relance之前等待的时长” # (消费者启动可能逐个进行,等待足够多的consumers入组后进行rebalance) group.initial.rebalance.delay.ms=3000 group.max.session.timeout.ms=300000 group.min.session.timeout.ms=6000
1)broker.id:如果你手动声明,则整个集群不能重复,数字类型,不能为负数。
2)min.insync.replicas:即ISR的个数,这个是数据可靠性的保障策略之一,配合Producer端“acks”参数。
如果你的数据可靠性要求低于producer性能要求,可以设置为1。
如果可靠性要求较高,建议设置为>=2的值,此值应该应该大于broker节点个数的“一半”;通常我们权衡的策略为,如果broker个数N == 3,此值设置为2;N >= 5,设置为3即可。过大的值,将极其影响producer效率,增加producer响应延迟,而且并不能获得更高的可靠性收益。
3)日志刷盘策略:kafka的数据保存在本地一序列segement文件中(包括用于提高查询效率的index文件),消息提交给broker之后,最终写入磁盘才能确保数据不丢失;kafka提供了“基于时间间隔”、“基于累计消息条数”两种同步刷盘策略。
基于时间间隔:每隔X毫秒,执行一次文件的fsync操作。基于累计消息条数:每X条消息后,同步执行一次fsync操作。
如果你关注“吞吐量优先”,比如一些普通的日志采集、监控消息等业务类型,你完全关闭“基于时间间隔”策略(即注释点log.flush.interval.ms),此时将采用文件系统默认刷盘机制;此外你还需要适度调整IRS个数(包括replication的个数)。
如果你关注“数据可靠性优先”,除了合适的IRS个数、以及producer端合理的“acks”时机,我们还应该权衡这两种刷盘策略的合理阈值,建议使用“每秒刷盘”,同时评估一个broker正常的TPS,并将其作为“刷盘条数”的阈值(此值不能太小,比如TPS为20000,你设置“log.flush.interval.messages=1000”,这以为这broker仍然在不停的刷盘,性能极大降级并最终影响SLA)。
4)num.partitions:每个Topic的partition个数,默认值为1,尽管admin工具可以修改此值,但是仍然建议大家在部署时就设置合理。
如果你的集群是存储一些时序性有严格要求的消息,建议设置为1,或者设置为其他值、但是需要producer端使用合理的Partitioner确保消息一致性,以及consumer消费时不要混合消费多个partitions。
如果你的集群对发送、消费的效率要求很高,那么partition的个数,可以设置的较大,具体partitions的个数应该设置为多少,尚没有"完美"的参考值,通常需要考虑broker集群的个数、每个broker磁盘并发的能力、broker上Topic的个数(以及它们各自partitions的个数总和)等;一个比较有粗略的参考值为,建议初步设置为broker节点的个数。
5)日志保留策略:通常我们需要开启,设定kafka保存消息(数据文件)的时长,也限定consumer允许数据回溯的最大时间。查出时间阈值的日志文件,可以“压缩备份”,也可以删除。
6)分组协调:主要适用于消费者,通常production环境,消费者时分组的,每组多个消费者使用相同的groupId;虽然消费者处理消息的效率基本差异不大,但是仍然存在消费者失效(或者block)等问题,此时Broker则可以决策哪些是“慢消费者”、“失效消费者”,以决定适时平衡(由其他在线的消费者接管消费服务)。
2、Broker JVM参数修改(kafka-server-start.sh)
base_dir=$(dirname $0) ## 在base_dir参数之后增加 export KAFKA_HEAP_OPTS="-Xms12G -Xmx12G -XX:NewRatio=2 -XX:SurvivorRatio=8 -XX:MaxMetaspaceSize=512M -XX:CompressedClassSpaceSize=512M" ## 开启JMX监控,默认端口为5760 export JMX_PORT=${JMX_PORT:-5760} ## 如果你希望基于jolokia + HTTP方式输出监控数据,以被采集器获取,可以增加jolokia配置 ## export KAFKA_OPTS="$KAFKA_OPTS -javaagent:/opt/jolokia/jolokia-jvm-1.5-agent.jar"
只需要增加有关内存的相关参数即可。
3、kafka-server-stop.sh
#将原来PIDS的获取方式,修改为“kafka\.kafka”,否则无法获取实际进程的ID PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
主要是解决,此脚本无法正确找到kafka进程的问题。
4、kafka broker与client兼容关系描述
参考:https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
简述:0.9以上版本的client和broker,低版本client可以访问高版本的broker,但是高版本的client访问低版本的broker时可能存在兼容性问题。
5、kafka consumer group的几种状态
1)Empty:此group中没有任何消费者在线(曾经提交过offset),可能是新建的group。当一个“Empty”的group过期之后,其状态会迁移为“Dead”。
2)PrepareingRebalance:准备进行rebalance,通常是此group中有消费者加入或者离开时(探测周期)触发,这意味着topic/partition在组内多个消费者之间重新分配。
3)AwaitingSync:等待Group Leader重新分配topic/partitions。
4)Stable:正常状态。
5)Dead:Group内已经没有任何消费者(members),且其offset记录等meta信息即将被删除。