教育行业A股IPO第一股(股票代码 003032)

全国咨询/投诉热线:400-618-4000

Kafka中消费者与消费者组的关系是什么?

更新时间:2023年10月20日10时32分 来源:传智教育 浏览次数:

好口碑IT培训

  在Apache Kafka中,消费者(Consumers)和消费者组(Consumer Groups)是核心概念,用于处理消息的订阅和处理。接下来笔者将详细解释它们之间的关系,并提供一个简单的代码示例来演示它们的用法。

  1.消费者(Consumers):

  消费者是Kafka中的客户端应用程序,它负责订阅主题并处理从主题中生产的消息。消费者可以独立订阅一个或多个主题,并且可以以不同的速度处理消息。它们可以在不同的分区中并行地处理消息。

  2.消费者组(Consumer Groups):

  消费者组是消费者的逻辑集合,它们一起协作处理主题中的消息。每个消费者组可以包含一个或多个消费者。消费者组的关键特性是它可以协调多个消费者来消费主题中的消息,确保每个分区的消息只被组内的一个消费者处理。这有助于实现负载均衡和提高容错性。

  接下来我们看一个使用Java语言的Kafka消费者和消费者组示例:

  (1)创建消费者:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class MyKafkaConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-broker-list");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName);

        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅一个主题
        consumer.subscribe(Collections.singletonList("my-topic"));

        while (true) {
            // 拉取消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 处理消息
                System.out.printf("消费者: key=%s, value=%s%n", record.key(), record.value());
            }
        }
    }
}

  (2)创建消费者组:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class MyKafkaConsumerGroup {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-broker-list");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消费者组名称
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName);

        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅一个主题
        consumer.subscribe(Collections.singletonList("my-topic"));

        while (true) {
            // 拉取消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 处理消息
                System.out.printf("消费者组成员: key=%s, value=%s%n", record.key(), record.value());
            }
        }
    }
}

  在上述示例中,两个消费者(可以是同一消费者组的成员)订阅了同一个主题,但消费者组确保每个分区的消息只被一个消费者处理,实现了负载均衡和高可用性。这是Kafka中消费者和消费者组的基本关系和用法。

0 分享到:
和我们在线交谈!