一、基础概念
-
传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列
最新定义:kafka是一个开源的分布式事件流平台,用于高性能数据管道、流分析、数据集成和关键任务应用。
-
kafka是一个分布式的发布-订阅消息系统,能够支持海量数据传递,它将消息持久化到磁盘中,并对消息创建了备份保证数据的安全。kafka在保证了较高处理速度的同时,又能保证数据的低延迟和零丢失。
-
应用场景:
-
缓冲/消峰
-
解耦
-
异步通信举例
(发送短信)
-
-
消息队列模式
-
点对点模式
- 一个生产者对应一个消费者
- 一个topic主题
- 消费完删除
-
发布订阅模式
- 一个生产者对应多个消费者
- 多个topic主题
- 消费完不删除,默认7天删除
-
二、特性
-
高吞吐、低延迟:每秒可以处理几十万条消息,延迟低,只有几毫秒,每个主题可以有多个分区、消费组。
-
可扩展性:集群支持热扩展。
-
持久性、可靠性:消息持久化到本地,并支持数据备份防止丢失。
-
容错性:副本机制,允许集群中节点失败,若副本数量为n,允许n-1个失败。
-
高并发:支持数千个客户端同时读写。
- 相对于其他消息队列的优势:
- 【待补充】
-
版本选择
版本 功能 备注 0.7 只提供了最基础的消息队列功能,甚至连副本机制都没有 不推荐 0.8 引入副本机制, 成了真正意义上完备的分布式高可靠消息队列解决方案 (老版本客户端api)需要需要指定 ZooKeeper 的地址而非 Broker 的地址; 生产者api默认使用同步方式; 推荐最低版本v0.8.2.0, 搭配old client api(此时虽有了New produder api, 但bug还很多) 0.9(2015.11) 增加了基础的安全认证 / 权限功能,用 Java 重写了新版本消费者 API,还引入了 Kafka Connect 组件用于实现高性能的数据抽取 此时V0.9以上new producer api已经算比较稳定了, 线上可以使用了; 但new consumer api还有很多bug, 千万不要用 0.10(里程碑) 引入Kafka streams, 正式成为分布式流处理平台 推荐至少V0.10.2.2再使用new consumer api, 而且修复了一个可能导致producer性能降低的bug 0.11(重量级变更) 一个是提供幂等性 Producer API 以及事务(Transaction) API;另一个是对 Kafka 消息格式做了重构。 Producer 实现幂等性以及支持事务都是 Kafka 实现流处理结果正确性的基石。此时的事务 API 有一些 Bug,不算十分稳定. 如果线上不敢用V1.0, 推荐至少V0.11.0.3(3个补丁版本之后), 此时功能已经非常完善(最主流版本之一)! 1.0&2.0&3.0 Kafka Streams 的各种改进 如果你是 Kafka Streams 的用户,至少选择 V2.0.0 版本吧
三、技术名词
- topic : 主题,kafka处理的消息分为不同的分类,分类就是按照主题来划分,可理解为数据库中的表。
- broker:消息服务器的代理。kafka集群中的一个节点一般我都门都叫做一个broker;主要是用来存储消息。存在硬盘中。生产环境中,broker的数量最好大于topic中partition分区的数量,保证每个broker存储一个partition。
- partition:分区。Topic的在物理上的分组。一个topic在broker上被分为1个或者多个partition。分区在创建主题的时候指定的。
- message:消息,通信的基本单位,每个消息属于某一个partition
- Producer: 生产者,消息和数据都是由这个组件产生的,由它发送到kafka集群中的。
- Consumer:消费者,消息和数据都是由这个组件来消费的。
- Zookeeper: 他需要zk来做分布式协调,通知生产者和消费者。
-
replica:副本,只要为了防止数据丢失,不会被用来消费。每个partition有多个副本,其中仅有一个作为leader,leader是当前负责数据读写的partition。
-
完整逻辑图:
【自我总结】生产者和消费者按照topic去进行发布和订阅,topic属于逻辑分区,partition属于物理分区。一个topic可以分在多个parition中,而每个partition又可以在各自的broker中。
为什么要分为多个partition?
答:为了提高吞吐量,将topic进行分区,分为多个partition,producer将消息按照一定规则推到不同的partition中,消费者再按照一定的规则去partition中消费数据。见下图,图一是不分区的,图二是分区的。
-
segment
四、配置
-
kafka重要的配置说明(/conf/server.properties):
############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. # broker编号,如果有集群,编号要不同 broker.id=0 ############################# Socket Server Settings ############################# # The address the socket server listens on. If not configured, the host name will be equal to the value of # java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 # broker默认地址,默认是9092 对外提供服务时需要配置 listeners=PLAINTEXT://119.91.29.168:9092 # port port=9092 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files # 消息日志文件地址 log.dirs=/tmp/kafka-logs ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. # zk服务地址 zookeeper.connect=localhost:2181 # 服务器接收单个消息的大小,默认1000012,约976.6kb message.max.bytes=1000012
由于3.6版本已经内置zookeeper,所以没有进行额外的安装。zk默认端口是2181,kafka默认服务端口是9092。
Kafka3.0完全不需要Zookeeper了吗?
例如以前 Consumer 的位移数据是保存在 ZooKeeper 上的,所以当提交位移或者获取位移的时候都需要访问 ZooKeeper ,这量一大 ZooKeeper 就顶不住(Zookeeper集群同步数据、选举不算快)。没了 Zookeeper 的 Kafka 就把元数据存储到自己内部了,利用之前的 Log 存储机制来保存元数据。
五、使用命令
首先cd到kafka安装目录下,云服务器的安装地址为:cd /www/server/kafka/kafka_2.13-3.6.0
-
启动
nohup bin/kafka-server-start.sh ./config/server.properties > /dev/null 2>&1 &
-
创建主题
- –bootstrap-server:kafka服务地址
- –topic:主题名词
- –partitions:分区个数
- —-replication-factor:副本数
- 副本数不能大于broker个数(每个副本必须在不同节点,不允许2个副本在同一节点)
- –create:创建主题操作指令
需要注意的是,新版本的kafka创建主题的命令不再需要zookeeper参数,相反,需要kafka服务地址参数,后续的其他命令亦是如此,如下
版本2.2+:使用–bootstrap-server [kafka服务地址]
[root@VM-8-9-centos kafka_2.13-3.6.0]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic hhw --partitions 2 --replication-factor 1 Created topic hhw.
other:使用–zookeeper [zookeeper服务地址]
[root@VM-8-9-centos kafka_2.13-3.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic hhw --partitions 2 --replication-factor 1
-
展示所有topic主题
[root@VM-8-9-centos kafka_2.13-3.6.0]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --list hhw yangmd
-
展示主题详情
展示了2个分区的信息,因为目前只有1个节点(节点0),所以leader、Replicas都是节点0
[root@VM-8-9-centos kafka_2.13-3.6.0]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic hhw Topic: hhw TopicId: 3AuIbtlpQd-7iakvj8xW7Q PartitionCount: 2 ReplicationFactor: 1Configs: Topic: hhw Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: hhw Partition: 1 Leader: 0 Replicas: 0 Isr: 0
-
消费端接收消息
注意,消费者要先连接到topic,生产者再发生消息,空白字符也会被发送
[root@VM-8-9-centos kafka_2.13-3.6.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hhw hello nihao,i am hhw,this is the first one of the kafka message either the blank char will be send 中文尝试
-
生产端发送消息
[root@VM-8-9-centos kafka_2.13-3.6.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hhw > >hello >nihao,i am hhw,this is the first one of the kafka message > >either the blank char will be send >中文尝试 >
-
增加分区(只能增加分区,不能减少)
[root@VM-8-9-centos kafka_2.13-3.6.0]# bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic hhw --partitions 3
减少分区报错:
ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 3 partitions, which is higher than the requested 1. (kafka.admin.TopicCommand$)
-
kafka重新平衡
默认是开启自动再平衡的,但是根据测试环境观察,暂未确定何时会触发?
当broker停止或崩溃时,这个broker中的所有分区的leader会转移到其他副本。这意味着当这个broker重新启动后,他的分区都将仅作为follower,不再用于客户端的读写操作。Kafka中,当一个broker挂掉并重新启动时,partition的leader确实可能不会立即重新分配。这是因为Kafka采用一种分布式的、容错的消息系统架构,称为分区(partition)和副本(replica)机制。
Kafka中的每个主题(topic)都被分为多个分区,每个分区可以有多个副本,其中一个是主副本(leader),其余是从副本(follower)。当一个broker挂掉后重新启动,Kafka会尽力保持分区的leader不变,以确保数据的可靠性和一致性。
如果你想强制重新分配partition的leader,你可以考虑以下方法:
- 手动触发Reassignment: 使用Kafka提供的工具手动触发分区的重新分配。你可以使用
kafka-reassign-partitions.sh
脚本来执行分区重新分配。这样可以强制Kafka重新评估分区的leader。 - 故障转移: 如果你的副本设置得当,而且有足够数量的副本,Kafka会在发现leader不可用时自动进行故障转移。这可能需要一些时间,但Kafka会尽力确保数据的可靠性。
bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092
注意:在最新的 Kafka 版本中已经不提供该文件,可以考虑使用 Kafka 自带的
kafka-topics.sh
工具触发首选副本选举。(测试无效,参考另一途径:https://blog.csdn.net/qq_34306010/article/details/121736194)bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config config/client.properties --alter --topic <your_topic_name> --replica-assignment <json_replica_assignment>
- 手动触发Reassignment: 使用Kafka提供的工具手动触发分区的重新分配。你可以使用
-
查看消费组
需要先启动一个消费组,才能查询到信息。
查询所有消费组
[root@VM-8-9-centos kafka_3]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --list
查询具体某个消费组
[root@VM-8-9-centos kafka_3]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --describe --group group.demo GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID group.demo multiHhw 2 105 105 0 - - - group.demo hhw 1 6 0 -6 - - - group.demo multiHhw 1 12 12 0 - - - group.demo hhw 0 1196 1196 0 - - - group.demo hhw 3 0 0 0 - - - group.demo multiHhw 0 0 0 0 - - - group.demo hhw 2 0 0 0 - - -
-
更多命令参考:https://kafka.apache.org/documentation/#clientconfig
六、接入Java程序
6.1快速接入
源码见:https://gitee.com/sensationhhw/springboot-kafka-demo.git
生产者:
Properties properties = new Properties();
// 设置key序列化,防止乱码
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置值序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 10);
// 集群地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
int i = 0;
while(i <= 100) {
// 封装的消息对象
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC, "kafka-demo", "hello,hhw-kafka-" + i);
kafkaProducer.send(producerRecord);
i ++;
Thread.sleep(1000);
}
kafkaProducer.close();
消费者:
Properties properties = new Properties();
// 设置key反序列化,防止乱码
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置值反序列化
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 集群地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
// groupId
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton(TOPIC));
while (true) {
// 1s接收一次
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key() + "----" + record.value());
}
}
示意图:
6.2发送方式
-
同步发送
// 同步发送 // 封装的消息对象 ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC, "kafka-demo", "hello,hhw-kafka-" + i); Future<RecordMetadata> send = kafkaProducer.send(producerRecord); // 获取返回值 RecordMetadata recordMetadata = send.get(); System.out.println("topic:" + recordMetadata.topic()); System.out.println("partition:" + recordMetadata.partition()); System.out.println("offset:" + recordMetadata.offset());
-
异步发送
kafkaProducer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { // 回调 if (Objects.isNull(e)) { // 正常 System.out.println("topic:" + recordMetadata.topic()); System.out.println("partition:" + recordMetadata.partition()); System.out.println("offset:" + recordMetadata.offset()); } else { // 异常 } } });
6.3序列化
- 将字符串或其他类型的消息数据转为byte数组。
- 当需要传输自定义类时,可自定义实现序列化器,要求实现Serializer<自定义的类>接口。自定义的类>
- 自带的序列化器可满足大部分要求(面对String字符串来说):StringSerializer。
6.4分区器
-
kafka自带分区策略,如果未指定,就会使用默认的分区策略(DefaultPartitioner)。
-
默认分区器策略
-
获取当前topic的分区数numPartitions
- key不为null:hash(key)%numPartitions来获取分区
- key为null:获取当前topic的nextValue,以及当前topic活跃的分区数,如果活跃的分区数为空,那么根据nextValue哈希值%numPartitions。如果活跃的分区数不为空,那么和活跃的分区数取%
-
-
实现Partitioner可自定义分区策略
6.5拦截器
-
作用
- 按照某个规则过滤掉不符合要求的消息
- 修改消息内容
- 统计类需求
-
自定义拦截器demo:
-
实现ProducerInterceptor接口
-
举例:给所有消息增加
prefix-hhw
前缀 -
代码:
拦截器:
@Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { // 可用于实现业务逻辑,序列化键和值并分配分区之前调用 // 允许此方法修改记录,在这种情况下,将返回新记录(record) // 这里模拟给所有消息加前缀 String newValue = "prefix-hhw-" + producerRecord.value(); return new ProducerRecord<>(producerRecord.topic(), producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(), newValue, producerRecord.headers()); } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { // 当已确认发送到服务器的记录时,或者当发送记录在发送到服务器之前发送失败时,将调用此方法。 if (Objects.isNull(e)) { // 没有异常 } else { // 出现异常 } } @Override public void close() { // 调用kafkaProducer.close()时会调用,此方法主要用于执行一些资源的清理工作。 System.out.println("[info]发送成功===="); } @Override public void configure(Map<String, ?> map) { // 用来初始化此类的方法,一般不需要关注 }
注册拦截器:
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaInterceptor.class.getName());
拦截器实现思路(查看源代码得出结论):在new kafkaProducer时传入配置参数,而我们又在配置参数中设置了拦截器的类路径,在new kafkaProducer时,kafka会去get拦截器的路径,并把他设置到参数
interceptors
中,完整示例this.interceptors = new ProducerInterceptors<>(interceptorList);
,而每次调用send方法时,就会遍历this.interceptors
,如果有拦截器,就调用拦截器中对应的方法。
-
6.6发送顺序
-
kafkaProducer–拦截器–序列化器–分区器
-
kafkaProducer线程安全,多个线程可共享同一个kafkaProducer对象
6.7其他重要参数
- acks:这个参数用来确定消息是否写入成功的判断逻辑
- ack=0,生产者不等待任何来自服务器的响应,意味着生产者不知道消息是否发生成功(吞吐量最大)
- ack=1,默认值=1,只要集群的leader收到消息,就返回成功响应
- ack=-1,只有当所有主从节点都收到消息,生产者才会收到成功响应(最慢,但最安全)
- retries:重试次数,大于重试次数才会抛出异常
- batch.size:当有多个消息要被分到同一分区时,生产者会将他们放到同一批次里,该参数指定了一个批次可以使用的内存大小,按字节数计算,当批次被填满时,批次的所有消息会被发送出去。不过不一定是等满了才发送出去,半满或者只有一条也有可能。所以该参数不会造成延迟,只是会占用更多内存,如果设置太小,生产者会因为频繁发送消息而增加一些额外的开销。
- max.request.size:控制生产者发送的请求大小,指定的是单个消息的最大值。broker对可接受的消息也有自己的限制(message.max.size)
七、消费者详解
本章内容包括:
深入学习kafka数据消费的大致流程
如何创建并使用kafka消费者
kafka消费者常用配置
7.1概念入门
-
消费者和消费组
Kafka 消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个 T1 主题,该主题有 4 个分区;同时我们有一个消费组 G1, 这个消费组只有一个消费者 C1。那么消费者 C1 将会收到这 4 个分区的消息,如下所示:
- 如果有多个消费组,那么如下所示:
7.2必要参数配置
Properties properties = new Properties();
// 设置key反序列化,防止乱码
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置值反序列化
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 集群地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
// groupId,消费者隶属消费组,如果设置为空会抛出异常,可设置具有业务意义的名称
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
7.3订阅主题和分区
-
指定topic
consumer.subscribe(Collections.singleton(TOPIC));
-
正则表达式
consumer.subscribe(Pattern.compile("hhw*")); // 支持正则表达式
-
指定分区
consumer.assign(Arrays.asList(new TopicPartition("hhw", 0)));
7.4offset
对于kafka的分区而言,每条消息都有位移的offset,用来表示消息在分区中的位置。可以保证分区有序,但不能保证topic有序。
当我们调用poll时,该方法会返回我们没有消费的消息,当消息从broker返回时,broker并不跟踪这些消息是否被消费者接收到,kafka会让消费者自身来管理offset,并向消费者提供更新offset的接口,这种更新位移方式成为提交commit。
扩展:
kafka0.8.1.1以前,offset保存在zk中,存放在/consumers节点下。但是由于频繁访问zk,zk需要一个一个节点更新offset,不能批量或分组更新,导致offset更新成了瓶颈。后续两个过渡版本增加了参数“offsets.storage”,该参数可配置为“zookeeper”或“kafka”分别表示offset的保持位置在zk或是broker,默认保留在zk,0.9版本以后offset就默认保存在broker下。若配置的“kafka”,当设置了“dual.commit.enabled”参数时,offset仍然可以提交到zk。 zk中保存offset结构为:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value
broker中存在一个offset manager 实例负责接收处理offset提交请求,并返回提交操作结果。
可能遇到的问题:
-
重复消费:
-
引发的原因:已经消费了数据,但是offset没有成功提交。
其中很大原因都在于发生了再均衡
- 消费者宕机、重启。导致消息已消费但是没有提交offset
- 分区中的消费者发生变更
- 消费者使用自动提交offset,但当还没有提交的时候,有新的消费者加入或者移除,发生了rebalance。再次消费的时候,消费者会根据提交的偏移量来,于是重复消费了数据。
- 消息处理耗时,或者消费者拉取的消息量太多,处理耗时,超过了max.poll.interval.ms的配置时间,导致认为当前消费者已经死掉,触发再均衡。
-
由谁解决,如何解决:
- 消费者需要实现消费幂等。
- 消息表、数据库唯一索引、缓存消费过的消息id。
-
-
消息丢失:
三种情况:
-
生产者到broker:通过设置acks来解决,设置-1时保证不丢失。如果消息发生失败时会触发retries机制,设置重试次数。
消息重发引起的消息顺序性问题
要注意,消息发送失败进行重发不能保证消息发送的顺序性,
这里的顺序性是单分区顺序性
,如果服务对于消息的顺序性有严格的要求,那么我们可以通过设置属性max.in.flight.requests.per.connection=1
来保证消息的顺序性,这个配置对应的是kafka中InFlightRequests
,max.in.flight.requests.per.connection
代表请求的个数,kafka在创建Sender的时候会判断,如果maxInflightRequests为1,那么guaranteeMessageOrder就为true,就能保证消息的顺序性。 -
broker到磁盘:设置多副本,当broker故障的时候,如果还有其他副本,那么数据就不会丢失。
-
消费者:kakfa的消费模式是拉模式,拉取的消息消费后需要提交offset。当我们收到消息后对消息进行处理,如果在处理的过程中发生异常,而又设置为自动提交offset,那么消息没有处理成功,offset已经提交了,当下次获取消息的时候,由于已经提交过offset,所以之前的消息就获取不到了,所以应该改为手动提交offset,当消息处理成功后,再进行手动提交offset(同步或异步)。
- 手动提交有一个缺点,就是当发起提交调用时应用会阻塞,当然我们可以减少手动提交的频率, 但是这个会增加消息重复的概率,另外一个解决方法是,使用异步提交的API
- 异步提交也有一个缺点,就是服务器返回提交失败,异步提交是不会进行重试,同步提交会进行重试直到成功或者最后抛出异常给应用.
相关参数:
// 设置自动提交 enable.auto.commin = true // 设置自动提交时每隔5秒提交一次 auto.commit.interval.ms = 5
java实现:
// 配置手动提交offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 业务处理完手动提交 while (true) { // 1s接收一次 ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { // 业务处理 } // 手动-同步提交offset consumer.commitAsync(); // 手动-异步提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { } }); }
-
7.5指定位移消费
-
通过
seek(分区, offset)
方法追溯之前的消息,之间将消费者的offset重置到指定值,之后的消费都从该offset开始。Properties properties = new Properties(); // 设置key反序列化,防止乱码 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 设置值反序列化 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 集群地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST); // groupId,消费者隶属消费组,如果设置为空会抛出异常,可设置具有业务意义的名称 properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singleton(TOPIC)); // 1.获取消费者所分配到的分区 Set<TopicPartition> assignment = new HashSet<>(); while (assignment.size() == 0) { // 如果没有分配到分区,就一直循环下去(注意:需要等待一会才会得到assignment) // kafka的分区逻辑是在poll方法里执行的,所以执行seek方法之前先执行一次poll方法 consumer.poll(100L); assignment = consumer.assignment(); } System.out.println("assignment--" + assignment); for (TopicPartition topicPartition : assignment) { // 2.每个分区的offset都回溯到10 consumer.seek(topicPartition, 10); } while (true) { // 3.接收一次 ConsumerRecords<String, String> records = consumer.poll(2000); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key() + "----" + record.value() + "----" + record.offset()); } }
7.6再均衡
-
再均衡是指分区的所属从一个消费者转移到另一个消费者的行为。它为消费组的高可用和高伸缩性提供了保障,使得我们方便且安全地删除消费者或往消费组中添加消费者。
-
再均衡期间消费者无法拉取消息。
-
kafka提供了再均衡监听器,可做业务处理,通过实现ConsumerReblanceListener。
-
开启再均衡
auto.leader.rebalance.enable=true
7.6消费者拦截器
-
消费消息或提交offset时进行一些定制操作。
-
implement ConsumerInterceptor
@Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { // 消费消息前执行,可做过滤、数据预处理等操作 System.out.println("===== 消费消息前执行 ====="); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> recordList = records.records(partition); for (ConsumerRecord<String, String> record : recordList) { // record.key(); // record.value(); } } return records; } @Override public void close() { // 关闭消费时前执行 } @Override public void onCommit(Map offsets) { // 提交offset前执行 System.out.println("===== 提交offset前执行 ====="); }
八、KafkaAdmin
使用java通过KakfaAdmin来管理Kafka(一般中台系统会进行管理,应用系统用的较少)
8.1增加topic的partition数量
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
AdminClient adminClient = AdminClient.create(properties);
NewPartitions newPartitions = NewPartitions.increaseTo(4);
// 需要扩展partition的topic,格式 => topic:partition个数
Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
newPartitionsMap.put(TOPIC, newPartitions);
CreatePartitionsResult result = adminClient.createPartitions(newPartitionsMap);
// 结果
result.all().get();
adminClient.close();
8.2查询topic
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
AdminClient adminClient = AdminClient.create(properties);
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC);
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Collections.singleton(configResource));
Config config = describeConfigsResult.all().get().get(configResource);
// 结果
System.out.println(config);
adminClient.close();
九、分区
本章内容:Kafka分区管理、优先副本的选举,分区重新分配等
Kafka可以将主题划分为多个partition,根据分区规则把消息存储到具体的分区中去。只要规则合理,理论上所有消息都会均匀分布到不同分区,这样就实现了负载均衡和扩展。多个订阅者可以从一个或多个分区同时消费数据,以支撑海量数据并发。
kafka的消息是追加到分区中的,多个分区顺序写磁盘的总效率比随机写内存还要高,是kafka高吞吐率的重要保证之一。
9.1副本
一个分区可以有多个副本,这些副本保存在不同的broker上。每个分区的副本中都会有一个作为Leader。当一个broker失败时,这台broker上的leader都会变得不可用,kafka会自动移除Leader,再其他副本中选一个作为新的Leader。 在通常情況下,增加分区可以提供kafka集群的吞吐量。然而,也应该意识到集群的总分区数或是单台服务器上的分区数过多,会增加不可用及延迟的风险。
红色是leader,绿色是副本。
9.2leader选举
略,后续补充
9.3分区重新分配
-
为什么需要分区重新分配?
我们往已经部署好的Kafka集群里面添加机器是最正常不过的需求,而且添加起来非常地方便,我们需要做的事是从已经部署好的Kafka节点中复制相应的配置文件,然后把里面的broker id修改成全局唯一的,最后启动这个节点即可将它加入到现有Kafka集群中。 但是问题来了,新添加的Kafka节点并不会自动地分配数据,已有的数据不会被分配到新的broker中,所以无法分担集群的负载,除非我们新建一个topic。 但是现在我们想手动将部分分区移到新添加的Kafka节点上,Kafka内部提供了相关的工具来重新分布某个topic的分区。
例如:当前有4个分区,分布在3个节点中,如果此时再增加一个节点,之前的4个分区仍然分布在之前的3个节点,第4个分区就没有真正用起来。
-
kafka提供了
kafka-reassign-partitions.sh
脚本来执行分区重分配的工作,它可以在集群扩容、broker节点失效的场景下对分区进行迁移。接上述案例,现在我们希望将4个分区均匀地分散到4个节点中,需要借助kafka提供的脚本进行处理。步骤如下:
-
编写一个配置reassign.json,然后借助
kafka-reassign-partitions.sh
工具生成reassign plan。配置内容如下,包括需要对哪些topic进行重新分配:
{ "topic":[ { "topic":"multiHhw" } ], "version":"1" }
-
执行命令:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file reassign.json --broker-list "0,1,2,3" --generate
“0,1,2,3” 表示要分配到的broker,这里全选,表示topic要重新分配到所有broker中。
输出结果的第一行是当前分区的副本分配情况,第二行是重新分配的候选方案,注意这里只是生产方案,并还没有去执行重新分配的动作。
-
执行候选方案
将候选方案的json串拷贝到一个文件中-result.json,准备用于执行。
-
9.4分区策略
- 按照kafka默认消费逻辑,一个分区只能被同一个消费者组内的一个消费者消费。
-
如果消费者组中的消费者大于分区数量,那么就会存在有的消费者因为分配不到任何分区而无法消费。
-
上图是按照kakfa的默认分区策略,也就是
org.apache.kafka.clients.consumer.RangeAssignor
。即RangeAssignor分配策略。除此之外,kafak还提供RoundRobinAssignor和StickyAssignor。消费者可通过partition.assignment.strategy配置多个分区策略,多个逗号隔开。 -
默认的RangeAssignor:
十、Kafka存储(重点)
本章内容:在完成kafka应用开发基础上,知道文件存储机制;kafka为什么使用磁盘作为存储介质、分析文件存储格式、快速检索消息。
10.1存储结构
-
每个partition中的数据都被平均分配到大小相等的segment数据文件中。默认情况下,每个segment为1G。每个partition仅仅需要支持顺序读写,segment文件的生命周期由服务端参数配置决定。
-
segment文件结构
-
由2大部分组成。分别为index file和data file(还有一个时间戳文件)。2个文件一一对应。分别表示segment的索引文件和数据文件。
-
segment命名规则:partition全局的第一个segment从0开始,后续每一个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位的long,19位数字字符长度,没有的数字用0填充。
-
10.2日志索引
1.数据文件分段
如何快速定义partition中的数据?根据segment的命名,使用二分法快速找到segment文件。
Kafka解决查询效率的手段之一是将数据文件分段(segment),比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段(segment)中。
2.偏移量索引
找到segment文件后,如何快速定位内容?使用偏移量索引。
Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。索引文件中记录了offset对应的位置关系,kakfa利用二分法可以快速找到数据。
比如:要查找offset为7的Message: 首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。 打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。 打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。
一句话,kafka的messge存储采用了分区(partition)、分段(segment)和稀疏索引这几个手段来达到高效性。10.3日志清理
kafka日志管理器允许定制删除策略。目前的策略是删除修改时间在N天前的日志(按时间删除)。也可以选择另外的策略,保留最后的N GB数据(按大小删除)。为了避免在删除时阻塞读操作,采用了copy on write形式的视线,删除操作进行时,读取操作的二分查找功能实际上是在一个静态的快照副本上进行的,类似于java的CopyOnWriteArrayList。
kafka消费日志删除思想,kafka把topic中的一个partition大文件分为多个segment文件,通过多个小文件段,就容易清除已消费的文件,减少磁盘占用。
默认的删除策略是关的,需手动配置
log.cleanup.policy=delete #启动删除策略,可配置如下两个策略
# 清理超过指定时间的
log.retention.hours=16
# 超过指定大小的,删除旧消息
log.retention.bytes=1073741824
10.4磁盘存储优势
Kafka在设计的时候,采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且不允许修改已经写入的消息,这种方式属于典型的顺序写入,所以就算是Kafka使用磁盘作为存储介质,所能实现的额吞吐量也非常可观。
Kafka中大量使用页缓存,这也是Kafka实现高吞吐的重要因素之一。
十一、稳定性
深入学习kafka在保证高性能、高吞吐的同时通过各种机制来保证高可用性。
- kafka稳定性相关操作、幂等性、事务的处理,同时对可靠性和一致性做一些了解
生产者发生消息流程:
- Kafka的消息传输保障机制非常直观。当producer向broker发送消息时,一旦这条消息被commit,由于副本机制(replication)的存在,它就不会丢失(acks参数)。
- 但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经提交(commit)。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以retry多次,确保消息已经正确传输到broker中,所以目前Kafka实现的是at least once。
11.1幂等性
-
在生产者写入出现失败时,将会进行retry,这时有可能会产生重复数据。为了应对这种问题,kafka支持消费者写入数据的幂等性,避免了写入重复数据,但消费者的幂等性还需自己实现。
-
通过如下配置实现,默认是true,已经开起了幂等性。
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG = true
支持在一个topic下单个partition实现幂等性,跨partition无法实现。
11.2事务
-
幂等性配置不能跨分区操作,而事务可以弥补这个缺陷。子性是指多个操作要么全部成功,要么全部失败,不存在部 为了实现事务,应用程序必须提供唯一的
transactionalld
,这个参数通过客户端程序来进行设定。properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId); // 字符串
-
开启事务的前提是要求生产者开启幂等性配置,如果幂等性=false,kafka会抛出异常。
-
事务步骤
- init事务
- 开启事务
- 提交事务或回滚事务
Properties properties = new Properties();
// 设置key序列化,防止乱码
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置值序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 10);
// 集群地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
// 定义transactionId
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_ID);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 1、初始化事务
kafkaProducer.initTransactions();
// 2、开启事务
kafkaProducer.beginTransaction();
try {
// ...处理业务逻辑
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC, "kafka-transaction",
"hello,hhw-transaction-1");
kafkaProducer.send(producerRecord);
// 模拟错误异常
System.out.println(1/0);
ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>(TOPIC, "kafka-transaction",
"hello,hhw-transaction-2");
kafkaProducer.send(producerRecord2);
ProducerRecord<String, String> producerRecord3 = new ProducerRecord<>(TOPIC, "kafka-transaction",
"hello,hhw-transaction-3");
kafkaProducer.send(producerRecord3);
// 3、提交事务
kafkaProducer.commitTransaction();
System.out.println("发送完成");
} catch (Exception e) {
// 4、回滚事务
kafkaProducer.abortTransaction();
throw new RuntimeException("出现异常:" + e);
}
kafkaProducer.close();
11.3控制器
在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh
脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka中的控制器选举的工作依赖于Zookeeper(老版本,新版本未知),成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL) 节点。
在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试去读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选,如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerld。
具备控制器身份的broker需要比其他普通broker多一些职责:
- 监听partition相关的变化
- 监听topic相关的变化
- 监听broker相关的变化
- 从Zookeeper中读取获取当前所有的topic、partition以及broker有关的信息并进行管理
11.4可靠性
-
可靠性保证:确保系统在各种不同环境下能够发生一致的行为
- kafka的保证:
- 保证分区消息的顺序
- 如果使用同一个生产者往同一个分区写入消息,而且消息B在消息A之后写入,那么Kafka可以保证消息B的偏移量比消息A偏移量大,而且消费者会先读取消息A在读取消息B。
-
只有当消息被写入分区的所有副本时,他才被认为已提交(通过acks设置)
-
只要还有一个副本是活跃的,那么已提交的消息就不会丢失
-
消费者只能读取已经提交的消息
-
副本复制:
-
Kafka 中的每个主题分区都被复制了n次,其中的n是主题的复制因子(replication factor)。这允许 Kafka 在集群服务器发生故障时自动切换到这些副本,以便在出现故障时消息仍然可用。Kafka的复制是以分区为粒度的分区的预写日志被复制到n个服务器。在n 个副本中,一个副本作为 leader,其他副本成为 followers。顾名思义,producer只能往leader分区上写数据(读也只能从 leader分区上进行),followers 只按顺序从 leader上复制日志。
-
副本同步队列(ISR) - 所谓同步,必须满足如下两个条件:
- 副本节点必须能与zookeeper保持会话(心跳机制)
- 副本能复制leader上的所有写操作,并且不能落后太多。(卡住或滞后的副本控制是由 replica.lag.time.max.ms 配置)
-
-
默认情况下Kafka对应的topic的replica数量为1,即每个partition都有一个唯一的leader,为了确保消息的可靠性,通常应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1,比如3。 所有的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟。任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
-
副本不同步的异常情况
-
慢副本:在一定周期时间内follower不能追赶上leader。最常见的原因之一是I / O瓶颈导致follower追加复制消息速度慢于从leader拉取速度。
-
卡住副本:在一定周期时间内follower停止从leader拉取请求。follower replica卡住了是由于GC暂停或follower失效或死亡。
-
新启动副本:当用户给主题增加副本因子时,新的follower不在同步副本列表中,直到他们完全赶上了leader日志。
-
11.5一致性
-
CAP定理,C:一致性,A:可用性,P:分区容错性。
-
Kakfa的高水位机制(HW:High Watermark),用来提升一致性
-
Kafka 在不同的副本之间维护了一个水位线的机制,消费者只能读取到水位线以下的的数据。 也就是说,消费者一开始在消费 Leader 的时候,虽然 Leader 副本中已经有 (a,b,c,d,e,f,g),但是也只能消费到(a,b,c)的数据。
-
如果 Leader 挂了,其他 Follower 怎么知道之前的高水位线在哪里? 高水位线是保存在什么地方了吗? 还是它们之间有一个消息同步机制?
答案是:高水位是在 Follower 向 Leader 同步数据的时候,完成高水位数据的更新的。
这里要引入一个概念:LEO(Log End Offset),日志末端位移,表示下一条待写入消息的 offset,每个 Partition 副本都会记录自己的 LEO。
Leader会计算HW的最小值,=所有节点LEO的最小值。
参考:https://blog.csdn.net/qq_24434251/article/details/129259835
-
-
在leader宕机后,只能从ISR(活跃的副本同步队列)列表中选取新的leader,无论ISR中哪个副本被选为新的leader,它都知道HW之前的数据,可以保证在切换了leader后,消费者可以继续看到HW之前已经提交的数据。
-
HW的截断机制:选出了新的leader,而新的leader并不能保证已经完全同步了之前leader的所有数据,只能保证HW之前的数据是同步过的,此时所有的follower都要将数据截断到HW的位置,再和新的leader同步数据,来保证数据一致。 当宕机的leader恢复,发现新的leader中的数据和自己持有的数据不一致,此时宕机的leader会将自己的数据截断到宕机之前的hw位置,然后同步新leader的数据。宕机的leader活过来也像follower一样同步数据,来保证数据的一致性。
-
数据丢失场景1
HW为所有节点的LEO最小值,此时为0-1,这时A宕机,B成为leader,之后A启动,A作为副本同步B的数据,A中的LEO也恢复到0-1,1-2的数据丢失。
- 新版本引入了epoch和offset来解决数据一致性的问题。
11.6kafka是CA系统吗?(重点)
- CAP是“一致性(Consistency)、可用性 (Availability)以及分区容忍性(Partition Tolerance)”的缩写,接下来就来详细的阐述一下CAP原理。
- C即一致性(Consistency、统之后,所有访问数据的请求不管是访问分布式存储的那个节点上一致性就是要求分布式系统要保障,一旦数据写入到分布式存储来查到到该写入的数据都是一致的,不能出现3个副本中有的副本有该条数据,有的副本没有该条数据(插入问题),更不能是有的副本该条数据和另外一个副本该条数据是不一样的(更新问题)
- A 即可用性 (Availability)统之后,所有访问该数据的请求都可以正常响应,不管该数据能不们可用性就是要求分布式系统要保障,一旦数据写入到分布式存储系查到,又或者该条数据查出来的一不一致,不能出现查询该数据时出现长期等待或者报错的发生
- P 即分区容错性 (Partition Tolerance) 分区容忍性时要求分布式系统要保障,一旦数据写入到分布式存价依然能够对外提供服务,网络在分系统的主本文件后,因为网络的的问题无法同步到副本的时候,系统布式系统来讲是不敢绝对保障的,如果因为网络问题,导致写入娄[靠从科学角度上来讲是无法做到(据无法向副本同步,这时候就是分区的情况出现,但网络的绝对的的,因此,所有分布式系统必须是满足“P”的存在,不然就只能使用单机系统来解决,那就不是分布式系统了。因此,综上所述,分布式系统基本上所有的都必须满足“P”,在“A'和“C”之间来选择,要么是AP,要么是CP
-
Kafka的开发人员申明kafka是CA系统,但实际情况却没有那么简单。
- kafka设计是运行在一个数据中心,网络分区问题基本不会发生,所以是CA系统。
- 但实际情况网络问题即使是数据中心,也可能发生。
-
定制配置
-
方案1:
下面这种配置,就保证强一致性,使得Kafka满足CP。任意写入一条数据,都需要等到replicate到所有节点之后才响应;接下来,在任意节点都可以消费到这条数据,即是在有节点宕机的情况下,包括主节点。
replication.factor = 3 // 副本数量3 min.insync.replicas = 3 // 最小同步副本数 acks = all // 需要所有副本收到消息才返回
-
方案2:
下面的配置,就主要保证可用性,使得Kafka满足AP。对于任意写入一条数据,当主节点commmit了之后就返回ack;如果主节点在数据被replicate到从节点之前就宕机,这时,重新选举之后,消费端就读不到这条数据。这种配置,保证了availability,但是损失了consistency。
replication.factor = 3 // 副本数量3 min.insync.replicas = 3 // 最小同步副本数 acks = 1 // 主节点收到消息就返回
-
十二、消息重复场景及解决方案
12.1生产者重复
引发:生产者发送消息没有收到正确的响应(可能是网络等原因),触发重试
解决:
- 方案一:启动kafka幂等性:修改配置文件,默认为关闭:
enable.idempotence=true
,同时要求ack=all且retries>1。(ps,acks=all 或 -1 意义相同) - 方案二:ack=0,不重试。可能导致丢失消息,适用于吞吐量优先的环境,例如:日志收集。
12.2消费者重复
引发:数据消费完没有成功提交offset
解决:
- 使用手动commit:每次消费完或程序退出时手动提交,但没法保证100%不重复。
- 下游做业务的幂等性校验。(数据库表、缓存记录消费的id或offset)
十三、集成SpringBoot
13.1快速集成
-
配置maven
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--kafka相关开始--> <!--<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.0</version> </dependency>--> <!-- spring-kafka,如果要集成到spring,直接引入这个就行了,上面是单单测试kafka用的 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.9.13</version> </dependency> <!--kafka相关结束--> </dependencies>
PS:要注意
spring-kafka
搭配boot的版本,大版本尽量保持一致,2.x对应2.x -
配置文件
spring: kafka: bootstrap-servers: 119.91.29.168:9092,119.91.29.168:9093,119.91.29.168:9094
-
编写代码
@Autowired private KafkaTemplate<String, String> kafkaTemplate; public static final String TOPIC = "hhw"; private static final Logger logger = LoggerFactory.getLogger(KafkaController.class); /** kafka生产者 */ @GetMapping(path = "/send/{msg}") public String send(@PathVariable String msg) { kafkaTemplate.send(TOPIC, msg); return "send success" + msg; } /** kafka消费者 */ @KafkaListener(id = "listener1", topics = TOPIC, groupId = "group.demo") public void listen(String msg) { logger.info("receive message: " + msg); }
13.2事务
-
配置事务id
spring: kafka: # 119.91.29.168:9092,119.91.29.168:9093, bootstrap-servers: 119.91.29.168:9092,119.91.29.168:9093,119.91.29.168:9094 producer: retries: 3 # 开启事务时必须要配置为all acks: all # 事务支持(设置一个非空字符串即可开启事务) transaction-id-prefix: kafka_tx
-
代码示例
第一种事务运行方式:
/** kafka生产者(事务版) */ @GetMapping(path = "/sendTransaction/{msg}") public String sendTransaction(@PathVariable String msg) { // 事务支持,方式一 String result = kafkaTemplate.executeInTransaction(kafkaOperations -> { kafkaTemplate.send(TOPIC, msg); if ("error".equals(msg)) { throw new RuntimeException("生产异常"); } kafkaTemplate.send(TOPIC, msg + "2"); return "生产发送成功"; }); return result; }
第二种事务运行方式:注解
/** kafka生产者(【推荐】事务版2,使用@Transaction) */ @GetMapping(path = "/sendTransaction2/{msg}") @Transactional(rollbackFor = RuntimeException.class) public String sendTransaction2(@PathVariable String msg) { kafkaTemplate.send(TOPIC, msg); if ("error".equals(msg)) { throw new RuntimeException("生产异常"); } kafkaTemplate.send(TOPIC, msg + "2"); return "生产发送成功"; }
13.3实际开发遇到问题
-
pcmsz推送思路:
生产者:监听变化-保存日志-推送到kafka,推送成功:变更状态;推送失败:等待定时任务补偿推送
消费者:消费成功(保存日志+调用业务方法成功):变更状态;消费失败,进入重试,保存日志,等待定时任务补偿调用业务方法。
PS:消费时需要注意消息顺序,可使用时间进行比较,只更新比当前记录晚的数据(日志需全量记录)。
PS:如果是更新消息,但新增消息还未执行时,认定该消息失败,等待后续处理。
-
kafka意外关闭后无法启动,报错:The Cluster ID yKfsjtowTyu1zHuKkInesw doesn’t match stored clusterId Some(finN2zUTRWaXMomXCknRew) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
解决:删除配置文件配置的log.dirs下的所有文件
-
手动启动内置的Zookeeper:
bin/kafka-server-start.sh config/server.properties
-
消费者异常处理器
当消费方法抛出异常后,会进入到该异常处理器中,可以进行后续的业务处理
-
在@KafkaListener上配置errorHandler = “consumerAwareErrorHandler”
-
编写consumerAwareErrorHandler
@Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return (message, exception, consumer) -> { logger.error("进入异常。。。。。消费异常:" + message.getPayload()); //do something return null; }; }
-
-
消费者配置手动提交offset(可能导致阻塞)
-
消费者入参遵循如下规则:
入参需要有:ConsumerRecord<String, String> record, Acknowledgment acknowledgment
@KafkaListener(id = "listener3", topics = TOPIC, groupId = "group.demo", errorHandler = "consumerAwareErrorHandler") public void listen3(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) { // 可从record获取该条信息的topic、partition、offset等信息 // 使用acknowledgment手动提交offset(前提,配置文件配置: // enable-auto-commit: false、 // listener: // #Acknowledgment作为接收参数时,必须设置为manual // ack-mode: manual) logger.warn("接收kafka消息:{}", record.key() + "----offset: " + record.offset() + "----partition:" + record.partition()); // 业务处理..... // ..... // 手动提交offset acknowledgment.acknowledge(); }
-
配置文件配置
consumer: enable-auto-commit: false listener: #Acknowledgment作为接收参数时,必须设置为manual ack-mode: manual
-
-
配置消费者失败后的重试策略:
kafka没有重试机制不支持消息重试,也没有死信队列,因此使用kafka做消息队列时,需要自己实现消息重试的功能。
- 记录失败的消息并跳过:当消费者无法处理某条消息时,可以将该消息记录到日志或其他持久化存储中,并继续处理下一条消息。这样可以确保消费者不会因为单个消息的处理失败而停止消费。
- 重试机制:在某些情况下,可以对失败的消息进行重试,直到消息成功处理或达到最大重试次数。可以使用重试策略来控制重试的次数和间隔。如果消息在重试达到最大次数后仍然无法处理成功,可以考虑将消息记录到死信队列中等待进一步处理。
- 发送到死信队列:对于无法处理的消息,可以将其发送到死信队列(Dead Letter Queue,DLQ),以便稍后进一步处理。死信队列是一种专门用于存放无法正常处理的消息的队列,通常会有专门的消费者来处理这些消息。
- 手动处理
- 回退策略
- 监控和警告
以下为使用spring retry进行重试处理的策略
@KafkaListener(id = "listener3", topics = TOPIC, groupId = "group.demo", errorHandler = "consumerAwareErrorHandler") public void listen3(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) { // 使用spring自带的retry进行重试 RetryTemplate retryTemplate = new RetryTemplate(); // 设置重试策略:最多重试3次 SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); retryPolicy.setMaxAttempts(3); retryTemplate.setRetryPolicy(retryPolicy); // 设置回退策略:每次重试之间固定等待2000毫秒 FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(2000L); retryTemplate.setBackOffPolicy(backOffPolicy); // 执行 try { retryTemplate.execute(context -> { // 从kafka拉取日志并写入数据库、解析 logger.warn("接收kafka消息:{}", record.key() + "----offset: " + record.offset() + "----partition:" + record.partition()); // 业务处理..... // ..... throw new RuntimeException("测试失败"); //return null; }); } catch (Exception e) { // 达到重试次数后抛出异常,进入errorHandler处理 throw new RuntimeException(e); } // 手动提交offset acknowledgment.acknowledge(); }
十四、部署&集群
1、掌握kafka集群创建
2、参数调优
-
集群特点
- 可扩展性:新的服务可以随时加入,从而增强性能(负载均衡)。
- 高可用性:当某个节点发生故障时,其他节点可以保证系统的整体正常运行,增加可靠性。
-
配置kafka集群
-
复制整个kafka文件夹
-
删除目录logs/*,否则启动会冲突
-
修改配置文件中broker.id
-
修改log.dirs地址
-
zk地址配置,多个zk逗号隔开
-
修改port
-
注意listener配置
-
按指定配置启动:
bin/kafka-server-start.sh conf/server.properties
-
Zookeeper配置(如有需要)
-
Zookeeper的clientPort设置、dataDir配置,zoo.cfg ;
-
zoo.cfg配置server信息
server.0=127.0.0.1:2181:3888 server.0=127.0.0.1:2182:3889
格式:server.服务器id=服务器ip地址:端口:投票选举端口
-
在data目录下创建myid文件,标识不同节点(类似brokerId)
-
集群实现topic分区负载均衡案例:
3个集群,1个topic分为3个分区,每个分区有3个副本,每个分区的leader均匀分布在3个broker中,实现了负载均衡。例如:分区0的副本在2,0,1机器中,其中leader是机器2
-
-
部署问题:
-
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed;
原因:机器配置不足,修改kafka启动配置
vim bin/kafka-server-start.sh export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" ==> export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
-
后台启动kafka命令
nohup bin/kafka-server-start.sh ./config/server.properties > /dev/null 2>&1 &
-
复制文件夹
cp -r source dest
-
测试环境部署地址:/www/server/kafka ,端口分别是9092、9093、9094
-
测试kafka是否成功部署集群,通过创建topic尝试
如下,创建一个新的topic,3个分区,3个副本
[root@VM-8-9-centos kafka_3]# bin/kafka-topics.sh --bootstrap-server localhost:9094 --create --topic multiHhw --partitions 3 --replication-factor 3 Created topic multiHhw.
查询topic信息
如下:3个分区的leader分散在3个节点中,每个分区的副本也均分在节点中。
[root@VM-8-9-centos kafka_3]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic multiHhw Topic: multiHhw TopicId: FjR5-dLyTTSrLRYi_f6B1w PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: multiHhw Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: multiHhw Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: multiHhw Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
-
十五、监控
知道kafka监控体系
掌握JMX监控指标
数据异动实时提醒
15.1监控度量指标
-
开启JMX监控
在启动时加上JMX命令,端口随意
nohup JMX_PORT=9999 bin/kafka-server-start.sh ./config/server.properties > /dev/null 2>&1 &
-
使用JDK自带的Jconsole监控进程
-
第三方工具
略
十六、MQ对比
16.1消息队列的作用
1、应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;
2、异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;
3、限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;
4、消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理;
16.2MQ选型标准(重要)
- 开源:问题方便搜寻
- 流行:问题方便搜寻
- 兼容性强
- 消息不丢失
- 性能:能够满足日志这种数据量较大的场景
16.3各个MQ优劣
RabbitMQ
优点
1、RabbitMQ 的特点 Messaging that just works,“开箱即用的消息队列”。 RabbitMQ 是一个相对轻量的消息队列,非常容易部署和使用;
2、多种协议的支持:支持多种消息队列协议,算的上是最流行的消息队列之一;
3、灵活的路由配置,和其他消息队列不同的是,它在生产者 (Producer)和队列(Queue)之间增加了一个Exchange模块,你可以理解为交换机。这个Exchange模块的作用和交换机也非常相似,根据配置的路由规则将生产者发出的消息分发到不同的队 列中。路由的规则也非常灵活,甚至你可以自己来实现路由规则。
4、健壮、稳定、易用、跨平台、支持多种语言、文档齐全,RabbitMQ的客户端支持的编程语言大概是所有消息队列中最多的;
5、管理界面较丰富,在互联网公司也有较大规模的应用;
6、社区比较活跃。
缺点
1、RabbitMQ 对消息堆积的处理不好,在它的设计理念里面,消息队列是一个管道,大量的消息积压是一种不正常的情况,应当尽量去避免。当大量消息积压的时候,会导致RabbitMQ的性能急剧下降;
2、性能上有瓶颈,它大概每秒钟可以处理几万到十几万条消息,这个对于大多数场景足够使用了,如果对需求对性能要求非常高,那么就不太合适了。
3、RabbitMQ 使用 Erlang。开发,Erlang 的学习成本还是很高的,如果后期进行二次开发,就不太容易了。
RocketMQ
RocketMQ出自阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,消息可靠性上比 Kafka 更好。经历过多次双十一的考验,性能和稳定性还是值得信赖的,RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
优点
1、单机吞吐量:十万级;
2、可用性:非常高,分布式架构;
3、消息可靠性:经过参数优化配置,消息可以做到0丢失,RocketMQ 的所有消息都是持久化的,先写入系统 PAGECACHE,然后刷盘,可以保证内存与磁盘都有一份数据;
4、功能支持:MQ功能较为完善,还是分布式的,扩展性好;
5、支持10亿级别的消息堆积,不会因为堆积导致性能下降;
6、源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控。
缺点
1、支持的客户端语言不多,目前是 java 及 c++,其中 c++ 不成熟;
2、社区活跃度一般,作为国产的消息队列,相比国外的比较流行的同类产品,在国际上还没有那么流行,与周边生态系统的集成和兼容程度要略逊一筹;
3、没有在 mq 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码。
Kafka
Apache Kafka是一个分布式消息发布订阅系统。它最初由LinkedIn公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log),之后成为Apache项目的一部分。
这是一款为大数据而生的消息中间件,在数据采集、传输、存储的过程中发挥着举足轻重的作用。
优点
1、性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高;
2、性能卓越,单机写入TPS约在百万条/秒,消息大小10个字节;
3、可用性:非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用;
4、消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;
5、有优秀的第三方Kafka Web管理界面Kafka-Manager;
6、在日志领域比较成熟,被多家公司和多个开源项目使用;
7、功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用
缺点
由于“攒一波再处理”导致延迟比较高
16.4为什么选择Kafka(重要)
- 吞吐量高,适合处理海量的业务日志。
- 为什么吞吐量高?
- 高效的磁盘存储:每个分区的数据都是以文件的形式存在,并且都是采用追加的方式-顺序存储,每个分区下又分为多个segment,segment包含数据信息以及索引信息,得益于segment文件的命名方式外加上偏移量索引,使得速度非常快,
- 支持批量处理:支持多条消息组合批次发送
- 为什么吞吐量高?
- 分布式架构:少数机器宕机不会丢失数据(多副本机制、acks配置保证数据存储成功等)
- 之前在别的项目中有接触过kafka,在分析需求后,发现kafka已经可以满足需求,并且也具备未来扩展的能力,选择kafka是保证工作时效的最好选择。
- 缺点:存在部分延迟,对延迟要求不高的场景可使用。