Kafka入門和使用

2020-10-28 12:00:42

Kafka介紹

Kafka最初是LinkedIn的一個內部基礎設施系統,它可以幫助處理持續資料流的元件。在最初開發設計理念上,開發者不想只是開發一個能夠儲存資料的系統,比如關係型資料庫、Nosql資料庫等,更希望把資料看成一個持續變化和不斷增長的流,因此基於這種想法構建出了一個資料系統。
Kafka作為一種分散式訊息系統,允許訊息釋出和訂閱,但是Kafka又不僅僅是一個訊息中介軟體,它和傳統的訊息系統有很大的差異:
(1)Kafka作為一個分散式系統,以叢集的方式執行,支援節點自由伸縮;
(2)Kafka訊息是記錄在磁碟紀錄檔檔案中,天生持久化,支援按照時間和檔案大小進行訊息儲存;
(3)傳統的訊息系統只傳遞資料,而Kafka提供了流式處理將資料處理的層次提升到了新高度,它可以讓開發者使用很少的程式碼就能動態地處理派生流和資料集。

Kafka基本概念

訊息和批次

訊息作為Kafka的資料單元,由位元組陣列組成,訊息除了傳遞的訊息內容之外,還可以包含訊息鍵,主要用於對訊息選取分割區。並且 Kafka作為一個高效的訊息系統,支援對訊息進行分批次寫入,而批次就是一組訊息,這一組訊息同屬於一個主題和分割區。
對Kafka生產者來說傳送一組訊息,如果每次只傳遞單條訊息,會導致大量的網路開銷,而訊息分批次傳遞,批次中包含的訊息越多,單位時間內處理的訊息也就越多,那麼網路開銷也會越少,但是帶來的是單個訊息的傳輸時間更長,所以是否使用批次需要在時間延遲和吞吐量之間做權衡。
訊息對於Kafka來說是晦澀難懂的位元組陣列,所以對Kafka傳遞和消費訊息需要指定序列化和反序列化器。常用的序列化格式有JSON和XML,還有Avro(Hadoop開發的一款序列化框架),具體怎麼使用依據自身的業務來定。

保留訊息

Kafka中的訊息時存放在磁碟紀錄檔檔案中的,它不會像Mysql之類的永久儲存,而是設定了在一定期限內保留訊息。Kafka預設的保留策略:要麼保留一段時間(預設7天),要麼保留一定大小(比如1G),那麼到了限制之後,舊訊息會過期並刪除,在開發時可以為每個主題根據不同業務特性設定不同的保留策略。

主題和分割區

Kafka裡的訊息是通過主題進行分類的,主題好比資料庫中的表,一個主題下又可以被分為若干個分割區,好比分表的技術。一個分割區本質上是一個提交紀錄檔檔案,對於訊息會以追加的方式寫入分割區紀錄檔檔案中,並按照先入先出的順序讀取。
在這裡插入圖片描述
由於一個主題下會有多個分割區,因此在整個主題的範圍內無法保證訊息的順序,但是對單個分割區來說是可以保證順序的。Kafka通過分割區實現資料冗餘和伸縮性,分割區可以分佈在叢集中不同的伺服器節點上,也就是說一個主題能夠跨域多個伺服器節點,因此對主題下分割區的磁碟讀寫,會分散到多臺伺服器,這也就是Kafka高效能的一個原因。

生產者和消費者、偏移量、消費者群組

生產者和消費者是作為訊息中介軟體的基本概念,在Kafka中生產者負責將一個訊息釋出到指定的的主題上,而如果一個主題下存在多個分割區,預設情況下生產者會把訊息均衡釋出到主題的所有分割區上,而不會關心訊息寫入哪個分割區。當前某些情況需要將訊息寫入到指定分割區時,則可以通過訊息裡的訊息鍵和分割區器來實現。
Kafka中消費者會通過訂閱一個或多個主題,並按照訊息的生成順序進行讀取。消費者通過訊息的偏移量來區分訊息是否被讀取過,而偏移量也屬於Kafka的一種後設資料,一個不斷遞增的整數值,每一個訊息被建立時,Kafka都把它新增到訊息裡。在一個主題的一個分割區裡,每個訊息的偏移量都是唯一的,並且每個分割區最後讀取的訊息偏移量會儲存在Zookeeper或者Kafka上,這樣即使消費者關閉或者重新啟動,分割區的訊息讀取狀態也不會丟失。
消費者群組由共同讀取一個主題的多個消費者構成,群組可以保證主題下的每個分割區只能被一個消費者消費,消費者和分割區之間的這種對映關係叫做消費者對分割區的所有權關係,從下圖可以很明顯看出,一個分割區只有一個消費者,而一個消費者可以有多個分割區。
在這裡插入圖片描述

Broker 和叢集

一個獨立的Kafka伺服器稱為Broker。Broker負責接收生產者訊息,為訊息設定偏移量,並提交訊息到磁碟進行儲存。Broker為消費者提供服務,對讀取主題下分割區的請求進行響應,返回已經提交到磁碟上的訊息。在合適的硬體基礎上,單個Broker可以處理上千個分割區和每秒百萬級的訊息量。
多個Broker組成一個叢集,每個叢集中所有Broker會通過選舉,選出一個Broker充當叢集控制器的角色,控制器負責管理工作,包括將分割區分配給Broker和監控Broker。在叢集裡,一個分割區從屬於一個Broker,這個Broker稱為首領,但是一個分割區又可以被分配到多個Broker上,這個時候就會發生分割區複製,而叢集中Kafka內部通過使用管道技術進行高效的複製。
在這裡插入圖片描述

Kafka安裝和啟動

Kafka需要Zookeeper儲存叢集的後設資料和消費者資訊,但Kafka本身會自帶Zookeeper,但是從穩定性考慮,應用使用單獨的Zookeeper服務,並建立叢集。

下載Kafka

前往http://kafka.apache.org/downloads 上尋找合適的版本下載,下載後解壓到本地目錄

>tar -xzf kafka_2.11-2.3.0.tgz
>cd kafka_2.11-2.3.0

啟動服務

執行Kafka需要先啟動Zookeeper服務,這裡使用Kafka自帶設定好的Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

再啟動Kafka服務

bin/kafka-server-start.sh config/server.properties

出現以下畫面表示成功
在這裡插入圖片描述

Kafka常用命令列操作

1、列出所有主題

bin/kafka-topics.sh --zookeeper localhost:2181 --list

2、建立主題,建立一個名為test的主題,指定1個副本,1個分割區

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

3、建立生產者(傳送訊息),當執行producer指令碼後,會出現輸入提示符,可以輸入訊息,然後它會傳送到對應的Broker上

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Hello Kafka

4、建立消費者(消費訊息),執行consumer指令碼後,可以看到,消費者會一直處於監聽狀態,每當生產者傳送一條訊息,就會更新一條訊息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

5、增加分割區

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --partitions 16

6、列出消費者群組

bin/kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --list

簡單的Kafka應用程式

引入maven依賴包

<dependency>
	 <groupId>org.apache.kafka</groupId>
	 <artifactId>kafka-clients</artifactId>
	 <version>2.3.0</version>
</dependency>

生產者傳送訊息

    // 定義主題
    private static final String TOPIC = "mytopic";

    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:8081");//broker地址清單,多個broker可以用逗號隔開
         //指定key和value序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //建立生產者範例
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        try {
            ProducerRecord<String,String> record;
            try {
            	// 傳送業務訊息
                 record = new ProducerRecord<String,String>(TOPIC, null,"hello kafka!");
                 producer.send(record);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } finally {
            producer.close();
        }
    }

消費者接收訊息

    private static final String TOPIC = "mytopic";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:8081");//單節點,kafka多節點時候使用,逗號隔開
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        //可以指定消費者群組
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        try {
        	consumer.subscribe(Arrays.asList(TOPIC));//訂閱主題,支援訂閱多個
            while (true) {
            	//拉取訊息,其中Duration.ofMillis(500)拉取時會把後設資料獲取也記入整個超時時間
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                //預設會自動提交偏移量
            }
		} finally {
            consumer.close();
        }
    }

Spring整合Kafka

引入spring-kafka的maven依賴

<dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
       <version>2.2.0.RELEASE</version>
</dependency>

Spring整合Kafka工程中主要的一些組態檔
在這裡插入圖片描述
kafka.properties檔案中,設定Kafka生產者和消費者的相關屬性

# brokers設定
bootstrap.servers=localhost:9092
# 傳送方確認機制,預設1
kafka.producer.acks = 1
# 失敗重試次數
kafka.producer.retries = 2
# 指定了生產者在傳送批次前等待更多訊息加入批次的時間,  預設0
kafka.producer.linger.ms =  10
# 生產者記憶體緩衝區大小
kafka.producer.buffer.memory = 32 * 1024 * 1024
# 一個批次可以使用的記憶體大小 預設16384(16k)
kafka.producer.batch.size = 16384
# 序列化器
kafka.producer.key.serializer = org.apache.kafka.common.serialization.StringSerializer
kafka.producer.value.serializer = org.apache.kafka.common.serialization.StringSerializer
 
# Kafka consumer 
kafka.consumer.bootstrap.servers = localhost:9092
kafka.consumer.concurrency = 3
# 是否自動提交偏移量 
kafka.consumer.enable.auto.commit = true
# 自動提交偏移量的週期 
kafka.consumer.auto.commit.interval.ms=1000 
# 指定消費者組
kafka.consumer.group.id= group1
# 反序列化器 
kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer

kafka-producer.xml設定資訊

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:context="http://www.springframework.org/schema/context"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd">
    
    <context:property-placeholder location="classpath*:kafka/kafka.properties" />    
    <!-- 定義producer的引數 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${bootstrap.servers}" />
                <entry key="retries" value="${kafka.producer.retries}" />
                <entry key="batch.size" value="${kafka.producer.batch.size}" />
                <entry key="linger.ms" value="${kafka.producer.linger.ms}" />
                <entry key="buffer.memory" value="${kafka.producer.buffer.memory}" />
                <entry key="acks" value="${kafka.producer.acks}" />
                <entry key="key.serializer" value="${kafka.producer.key.serializer}" />
                <entry key="value.serializer" value="${kafka.producer.value.serializer}" />
            </map>
        </constructor-arg>
    </bean>    
    
     <!-- 指定使用的Producerfactory -->
    <bean id="producerFactory"
          class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties"/>
        </constructor-arg>
    </bean>
    
    <!-- 指定監聽器 -->
    <bean id="producerSendListener" class="cn.kafka.config.ProducerSendListener" />
    
    <!-- 指定KafkaTemplate這個bean,使用的時候,只需要注入KafkaTemplate,就可以對kafka進行操作 -->
    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory" />
        <constructor-arg name="autoFlush" value="true" />
        <!-- 設定傳送監聽器bean -->
        <property name="producerListener" ref="producerSendListener"></property>
    </bean>
</beans>

實現指定監聽器,提供傳送方確認

/**
 * 自定義監聽器,傳送方確認
 */
public class ProducerSendListener implements ProducerListener {

	
    public void onSuccess(String topic, Integer partition,
                          Object key, Object value, RecordMetadata recordMetadata) {
    	//傳送成功
        System.out.println("topic:"+recordMetadata.topic()+"-offset: "+recordMetadata.offset()
             +"-" +"partition: "+recordMetadata.partition());
    }
    
    public void onError(String topic, Integer partition,
                        Object key, Object value, Exception exception) {
       //訊息傳送失敗,執行其它業務操作,或者重試
    }

    public boolean isInterestedInSuccess() {
        return true;
    }
}

kafka-consumer.xml設定資訊

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:context="http://www.springframework.org/schema/context"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd">
    
    <context:property-placeholder location="classpath*:kafka/kafka.properties" />    
     <!-- 定義Consumer的引數 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${kafka.consumer.bootstrap.servers}" />
                <entry key="group.id" value="${kafka.consumer.group.id}" />
                <entry key="enable.auto.commit" value="${kafka.consumer.enable.auto.commit}" />
                <entry key="auto.commit.interval.ms" value="${kafka.consumer.auto.commit.interval.ms}" />
                <entry key="key.deserializer" value="${kafka.consumer.key.deserializer}" />
                <entry key="value.deserializer" value="${kafka.consumer.value.deserializer}" />
            </map>
        </constructor-arg>
    </bean>

    <!-- 建立ConsumerFactory -->
    <bean id="consumerFactory"
          class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
        <constructor-arg>
            <ref bean="consumerProperties" />
        </constructor-arg>
    </bean>

    <!-- 指定消費者實現類 -->
    <bean id="kafkaConsumerService" class="cn.kafka.service.KafkaConsumerService" />

    <!-- 消費者容器設定資訊 -->
    <bean id="containerProperties"
          class="org.springframework.kafka.listener.ContainerProperties">
        <constructor-arg name="topics">
        <list>
            <value>my-topic</value>
        </list>
        </constructor-arg>
        <property name="messageListener" ref="kafkaConsumerService"></property>
    </bean>
    
    <!-- 消費者並行訊息監聽容器,執行doStart方法 -->
    <bean id="messageListenerContainer"
          class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
          init-method="doStart" >
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
        <property name="concurrency" value="${kafka.consumer.concurrency}" />
    </bean>
</beans>

實現訊息者監聽,接收訊息

/**
 * 消費者監聽,對消費的訊息進行處理
 * 實現MessageListener介面,消費者會預設自動提交偏移量
 * 實現AcknowledgingMessageListener介面,消費者可以手動提交偏移量
 */
public class KafkaConsumerService implements MessageListener<String,String> {

    public void onMessage(ConsumerRecord<String, String> data) {
    	//接收業務訊息,執行響應業務方法
        String name = Thread.currentThread().getName();
        System.out.println(name+"|"+String.format(
                "主題:%s,分割區:%d,偏移量:%d,key:%s,value:%s",
                data.topic(),data.partition(),data.offset(),
                data.key(),data.value()));
    }
}

Spring組態檔中引入Kafka組態檔

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:context="http://www.springframework.org/schema/context"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd">

     <!-- 設定掃描路徑 -->
     <context:component-scan base-package="cn.kafka">
     	<context:exclude-filter type="annotation"
              expression="org.springframework.stereotype.Controller"/>
     </context:component-scan>

    <!-- 引入kafka組態檔,根據個人檔案位置-->
    <import resource="classpath*:kafka/kafka-consumer.xml"/>
    <import resource="classpath*:kafka/kafka-producer.xml"/>
</beans>

向Kafka的指定主題傳送訊息

@Controller
@RequestMapping("/kafka")
public class KafkaController {

	@Autowired
	private KafkaTemplate<String,String> kafkaTemplate;

	private Logger logger = LoggerFactory.getLogger(KafkaController.class);
	
    private  static final String TOPIC="my-topic" ; 
	
	/**
	 * @param message
	 * @return String
	 */
	@ResponseBody
	@RequestMapping("/pushMessage")
	public String queueSender(@RequestParam("message")String message){
		try {
			kafkaTemplate.send(TOPIC,message);
		} catch (Exception e) {
			logger.error("send message error: "+e.getMessage());
			return "failed";
		}
		return "Success";
	}
}