常见命令
Topic
创建Topic
查看topic
$ ./kafka-topics.sh --zookeeper localhost: 2181/kafka --describe --topic topic-demo
Topic: test TopicId: Zi5yXIXcTZuLE_MkTyC-Wg PartitionCount: 4 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: test Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
[–describe] 展示主题的更多具体信息
查看所有topic
./kafka-topics.sh --list --zookeeper localhost:2181
Producer
控制台生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
Consumer
从最后一条消息的偏移量+1开始消费
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
[–from-beginning] 从头开始消费 [–consumer-property] group.id={id} 设置消费者组。同一个消费者组中的消费者,如果订阅了相同的topic,只能有一个消费者收到消息(单播消息)。不同消费者组的消费者只能有一个消费者收到消息(多播)
查看消费者组 ./kafka-consumer-groups.sh –bootstrap-server localhost:9092 –list 查看kafka中的消费者组
./kafka-consumer-groups.sh –bootstrap-server localhost:9092 –describe –group test-group 查看组的详细信息: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test-group test-topic 0 29 29 0 consumer-test-group-1-ed58cfd7-73ac-48c6-8f36-cbb58c833c96 /127.0.0.1 consumer-test-group-1
./kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-topic 产看消息状态
Topic: my-topic TopicId: nlRdagkNQ3-2Rfr6FEP9sA PartitionCount: 2 ReplicationFactor: 3 Configs: Topic: my-topic Partition: 0 Leader: 0 Replicas: 2,1,0 Isr: 0,1,2 Topic: my-topic Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
同一topic 的不同分区Leader可能不同
| ps -aux | grep server1.properties |
./kafka-consumer-groups.sh –bootstrap-server iot-kafka-service:9092 –describe –all-groups 查看所有消息的消费状态
同步与异步
同步发送消息是,生产者会阻塞,直到broker发送ack消息,超时时间为3秒,重试次数为3次。ack是可以设置的,
- 0,发送给cluster后直接返回ack,不管broker有没有收到
- 1,Leader收到消息,并写入log文件
- -1/all,leader写入log文件,并且同步给其它follower,可以通过min.insync.replicas指定最少由多少个follower收到消息,默认为1,推荐为大于等于2,方便重新选举leader
每个生产者默认有个32m的缓冲区,消息首先被塞入缓冲区,生产者本地线程每次拉取16k的消息。如果缓冲区没有16k,10ms后自动拉取。
Polll
两次poll时间超过30s,Kafka会认为消费者消费能力不足,改消费者会被踢出消费组
消费方式
指定分区消费 从头消费 指定offset消费 指定时间消费 新的消费组启动后,默认从消费者启动后开始消费,也可指定从头开始消费,以后启动时从记录到的offset处开始消费。
Controller
当分区的Leader副本出现故障时,由controller负责选举新的分区副本。。broker在创建的时候会向zookeeper注册创建临时序号节点,谁最小谁就是controller。Topic中有个isr属性会对broker集群的状态做排序,选取排在Leader后面的broker作为新的Leader。
当isr发生变化、topic分区数增加时,由controller负责通知其他broker
When a new event is published to a topic, it is actually appended to one of the topic’s partitions.A common production setting is a replication factor of 3, i.e., there will always be three copies of your data. This replication is performed at the level of topic-partitions.
To get hands-on experience with Kafka https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
Kafka客户端
public class KafkaProducerAnalysis {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static Properties initConfig(){
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "producer.client.id.demo");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, "Hello, Kafka!");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
}
ProducerRecord中含有多种属性
public class ProducerRecord<K, V> {
private final String topic; //主题
private final Integer partition; //分区号
private final Headers headers; //消息头部
private final K key; //键
private final V value; //值
private final Long timestamp; //消息的时间戳
//省略其他成员方法和构造方法
}
消息的键是消息的附加信息,同时也用来计算分区号。
消息发送有三种模式:
- 发后即忘(fire-and-forget)。只管发,其他都不管
- 同步(sync)
- 异步(async)。
要实现同步的发送方式,可以利用返回的 Future 对象实现。
try {
producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
// 或
try {
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
System.out.println(metadata.topic() + "-" +
metadata.partition() + ":" + metadata.offset());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
异常分为可重试异常和不可重试异常。常见的可重试异常有:NetworkException(网络异常)、LeaderNotAvailableException(eader 副本下线而新的 leader 副本选举完成之前)、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。可重试异常可以配置retries 属性进行重试,如果重试成功则不抛异常。
异步发送可通过指定Callback的形式进行:
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
// 消息发送成功时,metadata 不为 null 而 exception 为 null;消息发送异常时,metadata 为 null 而 exception 不为 null。
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println(metadata.topic() + "-" +
metadata.partition() + ":" + metadata.offset());
}
}
});
回调函数的调用在一个分区中也会保持和发送顺序相同的顺序调用。
消费者组
Kaka消费者
// 配置消费者客户端参数及创建相应的消费者实例。
// 订阅主题。
// 拉取消息并消费。
// 提交消费位移。
// 关闭消费者实例。
public class KafkaConsumerAnalysis {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig(){
Properties props = new Properties();
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("client.id", "consumer.client.id.demo");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic)); // 多次调用以最后一次subcribe,以最后一次调用为准
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = " + record.topic()
+ ", partition = "+ record.partition()
+ ", offset = " + record.offset());
System.out.println("key = " + record.key()
+ ", value = " + record.value());
//do something to process record.
}
}
} catch (Exception e) {
log.error("occur exception ", e);
} finally {
consumer.close();
}
}
}
rebalance
分区所有权从一个消费者转移到另一个消费者,为消费者组具备高可用和可伸缩性提供保障,可以为我们方便的向消费者组安全地删除和添加消费者,rebalance期间,消费者组内的消费者无法消费数据(消费者组不可用)。
当分区分配给另一个消费者,消费者当前状态会丢失。比如消费者拉取消息后还没来的及提交,就发生了rebalance,这个分区被分配给另一个消费者,原来的消息会被重新消费一遍,也就是发生了重复消费。一般情况下,应尽量避免不必要的rebalance的发生。
在subscribe方法中:
subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
subscribe(Pattern pattern, ConsumerRebalanceListener listener)
有ConsumerRebalanceListener参数,用来监听rebalance前后的准备和收尾工作。ConsumerRebalanceListener接口中有两个方法:
interface ConsumerRebalanceListener {
// 会在rebalance之前和消费者停止消费之后调用,
// 可用来处理位移提交,防止重复消费
void onPartitionsRevoked(Collection<TopicPartition> partitions);
// 会在rebalance之后和消费者开始消费之前调用。
// 参数标识rebalance后的分区
void onPartitionsAssigned(Collection<TopicPartition> partitions);
}
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//do nothing.
}
});
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
//process the record.
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
}
consumer.commitAsync(currentOffsets, null);
}
} finally {
consumer.close();
}
也已通过在onPartitionsRevoked中将offset存储到数据库,然后onPartitionsAssigned时将数据取出并调用seek方法。
触发Rebalance机制的前提是消费者没有指定分区消费。
- 如果消费者心跳回复超过10s、或两次poll之间超过30s,Kafka会认为该消费者消费能力太弱,会将其踢出消费者组,topic的风区在消费者组中可能没有消费者消费,Kafka会将分区交给其他消费者,触发Rebalance机制。
- 新加消费者也会触发Rebalance机制