Spring Boot Apache Kafka


本教學演示了如何從Spring Kafka傳送和接收訊息。 首先建立一個Spring Kafka Producer,它能夠將訊息傳送到Kafka主題。 接下來建立一個Spring Kafka Consumer,它能夠收聽傳送給Kafka的訊息。使用適當的鍵/值序列化器和反序列化器來組態它們。 最後,使用簡單的Spring Boot應用程式演示應用程式。

下載並安裝Apache Kafka

要下載並安裝Apache Kafka,請閱讀此處的官方文件。本教學假定使用預設組態啟動伺服器,並且不更改任何伺服器埠。

注意:在使用 Kafka 之前,需要安裝好

專案設定

  • Spring Kafka:2.1.4.RELEASE
  • Spring Boot:2.0.0.RELEASE
  • Apache Kafka:kafka_2.11-1.0.0
  • Maven:3.5

專案結構

請參考以下專案結構來構建專案。

Maven依賴

在這個專案中,使用Apache Maven來管理專案依賴項。確保以下依賴項存在於類路徑上。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.yiibai.spring.kafka</groupId>
    <artifactId>producer-consumer</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>/20/238/8840.html-boot</url>
    <name>Spring Kafka - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Spring Kafka將訊息傳送到主題

這個專案是從傳送訊息開始,使用KafkaTemplate類來包裝Producer並提供高階操作以將資料傳送到Kafka主題。 提供非同步和同步方法,非同步方法返回Future

package com.yiibai.kafka.producer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Sender {

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.topic.foo}")
    private String topic;

    public void send(String message){
        LOG.info("sending message='{}' to topic='{}'", message, topic);
        kafkaTemplate.send(topic, message);
    }
}

使用ProducerFactory的實現來組態KafkaTemplate,更具體地說是DefaultKafkaProducerFactory。可以使用Map <String,Object>初始化這個生產者工廠。使用從ProducerConfig類中獲取鍵。

  • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG指定用於建立與Kafka群集的初始連線的主機/埠對列表。用戶端將使用所有伺服器,而不管此處指定哪些伺服器進行引導/此列表僅影響用於發現整套伺服器的初始主機。此列表應採用host1:port1,host2:port2,....的形式。由於這些伺服器僅用於初始連線以發現完整的叢集成員資格(可能會動態更改),因此此列表不需要包含完整集 伺服器(但是,如果伺服器關閉,可能需要多個伺服器)。
  • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG指定用於實現org.apache.kafka.common.serialization.Serializer介面的鍵的序列化程式類。
  • ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG指定用於實現org.apache.kafka.common.serialization.Serializer介面的值的序列化程式類。

有關組態選項的完整列表,請檢視ProducerConfig類。

package com.yiibai.kafka.producer;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class SenderConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

Spring Kafka監聽來自主題的訊息

接下來,將演示如何從Kafka主題中收聽訊息。 Receiver類將使用Kafka主題訊息。建立一個Listen()方法並使用@KafkaListener注釋對其進行了注釋,該注釋將該方法標記為指定主題上的Kafka訊息偵聽器的目標。

package com.yiibai.kafka.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class Receiver {

    private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

    @KafkaListener(topics = "${app.topic.foo}")
    public void listen(@Payload String message) {
        LOG.info("received message='{}'", message);
    }

}

此機制需要在其中一個@Configuration類和偵聽器容器工廠上使用@EnableKafka注釋,該工廠用於組態基礎ConcurrentMessageListenerContainer。使用SenderConfig類中相同型別的鍵/值反序列化器。

  • ConsumerConfig.GROUP_ID_CONFIG指定一個唯一字串,用於標識此使用者所屬的組。
  • ConsumerConfig.AUTO_OFFSET_RESET_CONFIG指定當Kafka中沒有初始偏移量或伺服器上當前偏移量不再存在時要執行的操作(例如,因為該資料已被刪除):
    • earliest: 自動將偏移重置為最早的偏移量
    • latest: 自動將偏移重置為最新的偏移量
    • none: 如果沒有找到消費者組的先前偏移量,則向消費者丟擲異常
    • anything else: 向消費者丟擲異常。

消費者使用消費者組名稱標記自己,並且發布到主題的每個記錄被傳遞到每個訂閱消費者組中的一個消費者範例。 消費者範例可以在單獨的進程中,也可以在不同的機器。
如果所有使用者範例具有相同的使用者組,則記錄將有效地在使用者範例上進行負載平衡。 如果所有消費者範例具有不同的消費者組,則每個記錄將被廣播到所有消費者進程。

有關組態選項的完整列表,請檢視ConsumerConfig類。

package com.yiibai.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

使用 application.yml 組態應用程式

需要建立了一個application.yml 屬性檔案,該檔案位於src/main/resources 檔案夾中。 這些屬性通過spring boot在組態類中注入。

spring:
  kafka:
    bootstrap-servers: localhost:9092

app:
  topic:
    foo: foo.t

logging:
  level:
    root: ERROR
    org.springframework.web: ERROR
    com.memorynotfound: DEBUG

執行應用程式

現在,編寫一個簡單的Spring Boot應用程式來演示應用程式。 為了使這個演示工作,需要前先在埠9092上執行localhost的Kafka伺服器(Kafka的預設組態)。

package com.yiibai.kafka;

import com.yiibai.kafka.producer.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProducerConsumerApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ProducerConsumerApplication.class, args);
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... strings) throws Exception {
        sender.send("Spring Kafka Producer and Consumer Example");
    }
}

使用 Maven 命令構建專案:

mvn clean install

看到構建成功後,執行以下Java命令,執行Jar程式:

java -jar target\producer-consumer-1.0.0-SNAPSHOT.jar

當執行應用程式時,應該會得到類似以下的結果: