常见命令

Topic

创建Topic

查看topic

$ ./kafka-topics.sh --zookeeper localhost: 2181/kafka --describe --topic topic-demo
Topic: test     TopicId: Zi5yXIXcTZuLE_MkTyC-Wg PartitionCount: 4       ReplicationFactor: 3    Configs:
        Topic: test     Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: test     Partition: 1    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
        Topic: test     Partition: 2    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
        Topic: test     Partition: 3    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0

[–describe] 展示主题的更多具体信息

查看所有topic

./kafka-topics.sh --list --zookeeper localhost:2181

Producer

控制台生产者

./kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

Consumer

从最后一条消息的偏移量+1开始消费

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic 

[–from-beginning] 从头开始消费 [–consumer-property] group.id={id} 设置消费者组。同一个消费者组中的消费者,如果订阅了相同的topic,只能有一个消费者收到消息(单播消息)。不同消费者组的消费者只能有一个消费者收到消息(多播)

查看消费者组 ./kafka-consumer-groups.sh –bootstrap-server localhost:9092 –list 查看kafka中的消费者组

./kafka-consumer-groups.sh –bootstrap-server localhost:9092 –describe –group test-group 查看组的详细信息: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test-group test-topic 0 29 29 0 consumer-test-group-1-ed58cfd7-73ac-48c6-8f36-cbb58c833c96 /127.0.0.1 consumer-test-group-1

./kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-topic 产看消息状态

Topic: my-topic TopicId: nlRdagkNQ3-2Rfr6FEP9sA PartitionCount: 2 ReplicationFactor: 3 Configs: Topic: my-topic Partition: 0 Leader: 0 Replicas: 2,1,0 Isr: 0,1,2 Topic: my-topic Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2

同一topic 的不同分区Leader可能不同

ps -aux grep server1.properties

./kafka-consumer-groups.sh –bootstrap-server iot-kafka-service:9092 –describe –all-groups 查看所有消息的消费状态

同步与异步

同步发送消息是,生产者会阻塞,直到broker发送ack消息,超时时间为3秒,重试次数为3次。ack是可以设置的,

  • 0,发送给cluster后直接返回ack,不管broker有没有收到
  • 1,Leader收到消息,并写入log文件
  • -1/all,leader写入log文件,并且同步给其它follower,可以通过min.insync.replicas指定最少由多少个follower收到消息,默认为1,推荐为大于等于2,方便重新选举leader

每个生产者默认有个32m的缓冲区,消息首先被塞入缓冲区,生产者本地线程每次拉取16k的消息。如果缓冲区没有16k,10ms后自动拉取。

Polll

两次poll时间超过30s,Kafka会认为消费者消费能力不足,改消费者会被踢出消费组

消费方式

指定分区消费 从头消费 指定offset消费 指定时间消费 新的消费组启动后,默认从消费者启动后开始消费,也可指定从头开始消费,以后启动时从记录到的offset处开始消费。

Controller

当分区的Leader副本出现故障时,由controller负责选举新的分区副本。。broker在创建的时候会向zookeeper注册创建临时序号节点,谁最小谁就是controller。Topic中有个isr属性会对broker集群的状态做排序,选取排在Leader后面的broker作为新的Leader。

当isr发生变化、topic分区数增加时,由controller负责通知其他broker

When a new event is published to a topic, it is actually appended to one of the topic’s partitions.A common production setting is a replication factor of 3, i.e., there will always be three copies of your data. This replication is performed at the level of topic-partitions.

To get hands-on experience with Kafka https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying

Kafka客户端

public class KafkaProducerAnalysis {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";

    public static Properties initConfig(){
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("client.id", "producer.client.id.demo");
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record =
                new ProducerRecord<>(topic, "Hello, Kafka!");
        try {
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

ProducerRecord中含有多种属性

public class ProducerRecord<K, V> {
    private final String topic; //主题
    private final Integer partition; //分区号
    private final Headers headers; //消息头部
    private final K key; //键
    private final V value; //值
    private final Long timestamp; //消息的时间戳
    //省略其他成员方法和构造方法
}

消息的键是消息的附加信息,同时也用来计算分区号。

消息发送有三种模式:

  • 发后即忘(fire-and-forget)。只管发,其他都不管
  • 同步(sync)
  • 异步(async)。

要实现同步的发送方式,可以利用返回的 Future 对象实现。

try {
    producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {
    e.printStackTrace();
}
// 或
try {
    Future<RecordMetadata> future = producer.send(record);
    RecordMetadata metadata = future.get();
    System.out.println(metadata.topic() + "-" +
            metadata.partition() + ":" + metadata.offset());
} catch (ExecutionException | InterruptedException e) {
    e.printStackTrace();
}

异常分为可重试异常和不可重试异常。常见的可重试异常有:NetworkException(网络异常)、LeaderNotAvailableException(eader 副本下线而新的 leader 副本选举完成之前)、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。可重试异常可以配置retries 属性进行重试,如果重试成功则不抛异常。

异步发送可通过指定Callback的形式进行:

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        // 消息发送成功时,metadata 不为 null 而 exception 为 null;消息发送异常时,metadata 为 null 而 exception 不为 null。
        if (exception != null) {
            exception.printStackTrace();
        } else {
            System.out.println(metadata.topic() + "-" +
                    metadata.partition() + ":" + metadata.offset());
        }
    }
});

回调函数的调用在一个分区中也会保持和发送顺序相同的顺序调用。

消费者组

Kaka消费者

// 配置消费者客户端参数及创建相应的消费者实例。
// 订阅主题。
// 拉取消息并消费。
// 提交消费位移。
// 关闭消费者实例。
public class KafkaConsumerAnalysis {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";
    public static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static Properties initConfig(){
        Properties props = new Properties();
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        props.put("client.id", "consumer.client.id.demo");
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic)); // 多次调用以最后一次subcribe,以最后一次调用为准
        try {
            while (isRunning.get()) {
                ConsumerRecords<String, String> records = 
                    consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic = " + record.topic() 
                            + ", partition = "+ record.partition() 
                            + ", offset = " + record.offset());
                    System.out.println("key = " + record.key()
                            + ", value = " + record.value());
                    //do something to process record.
                }
            }
        } catch (Exception e) {
            log.error("occur exception ", e);
        } finally {
            consumer.close();
        }
    }
}

rebalance

分区所有权从一个消费者转移到另一个消费者,为消费者组具备高可用和可伸缩性提供保障,可以为我们方便的向消费者组安全地删除和添加消费者,rebalance期间,消费者组内的消费者无法消费数据(消费者组不可用)。

当分区分配给另一个消费者,消费者当前状态会丢失。比如消费者拉取消息后还没来的及提交,就发生了rebalance,这个分区被分配给另一个消费者,原来的消息会被重新消费一遍,也就是发生了重复消费。一般情况下,应尽量避免不必要的rebalance的发生。

在subscribe方法中:

subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
subscribe(Pattern pattern, ConsumerRebalanceListener listener)

有ConsumerRebalanceListener参数,用来监听rebalance前后的准备和收尾工作。ConsumerRebalanceListener接口中有两个方法:

interface ConsumerRebalanceListener {
    // 会在rebalance之前和消费者停止消费之后调用,
    // 可用来处理位移提交,防止重复消费
    void onPartitionsRevoked(Collection<TopicPartition> partitions);

    // 会在rebalance之后和消费者开始消费之前调用。
    // 参数标识rebalance后的分区
    void onPartitionsAssigned(Collection<TopicPartition> partitions);
}
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        consumer.commitSync(currentOffsets);
	        currentOffsets.clear();
    }
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        //do nothing.
    }
});

try {
    while (isRunning.get()) {
        ConsumerRecords<String, String> records =
                consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            //process the record.
            currentOffsets.put(
                    new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset() + 1));
        }
        consumer.commitAsync(currentOffsets, null);
    }
} finally {
    consumer.close();
}

也已通过在onPartitionsRevoked中将offset存储到数据库,然后onPartitionsAssigned时将数据取出并调用seek方法。

触发Rebalance机制的前提是消费者没有指定分区消费。

  • 如果消费者心跳回复超过10s、或两次poll之间超过30s,Kafka会认为该消费者消费能力太弱,会将其踢出消费者组,topic的风区在消费者组中可能没有消费者消费,Kafka会将分区交给其他消费者,触发Rebalance机制。
  • 新加消费者也会触发Rebalance机制