kafka生產者你不得不知的那些事兒

2023-05-23 12:00:27

前言

kafka生產者作為訊息傳送中很重要的一環,這裡面可是大有文章,你知道生產者訊息傳送的流程嗎?知道訊息是如何發往哪個分割區的嗎?如何保證生產者訊息的可靠性嗎?如何保證訊息傳送的順序嗎?如果對於這些問題還比較模糊的話,那麼很有必要看看這篇文章了,本文主要是基於kafka3.x版本講解。

生產者流程

kafka生產者最重要的就是訊息傳送的整個流程,我們來看下究竟是怎麼一回事把。

在訊息傳送的過程中,涉及到了兩個執行緒——main 執行緒和 Sender 執行緒。在 main 執行緒中建立了一個雙端佇列 RecordAccumulatormain 執行緒將訊息傳送給 RecordAccumulatorSender 執行緒不斷從 RecordAccumulator 中拉取訊息傳送到 Kafka Broker

  1. 在主執行緒中由 kafkaProducer 建立訊息,然後通過可能的攔截器、序列化器和分割區器的作用之後快取到訊息累加器(RecordAccumulator, 也稱為訊息收集器)中。
  • 攔截器: 可以用來在訊息傳送前做一些準備工作,比如按照某個規則過濾不符合要求的訊息、修改訊息的內容等,也可以用來在傳送回撥邏輯前做一些客製化化的需求,比如統計類工作。
  • 序列化器: 用於在網路傳輸中將資料序列化為位元組流進行傳輸,保證資料不會丟失。
  • 分割區器: 用於按照一定的規則將資料分發到不同的kafka broker節點中
  1. Sender 執行緒負責從 RecordAccumulator 獲取訊息並將其傳送到 Kafka 中。
  • RecordAccumulator 主要用來快取訊息以便 Sender 執行緒可以批次傳送,進而減少網路傳輸的資源消耗以提升效能。
  • RecordAccumulator 快取的大小可以通過生產者使用者端引數 buffer.memory 設定,預設值為 33554432B ,即 32M
  • 主執行緒中傳送過來的訊息都會被迫加到 RecordAccumulator 的某個雙端佇列( Deque )中,RecordAccumulator 內部為每個分割區都維護了一個雙端佇列,即 Deque<ProducerBatch>, 訊息寫入快取時,追加到雙端佇列的尾部。
  • Sender 讀取訊息時,從雙端佇列的頭部讀取。ProducerBatch 是指一個訊息批次;與此同時,會將較小的 ProducerBatch 湊成一個較大 ProducerBatch ,也可以減少網路請求的次數以提升整體的吞吐量。ProducerBatch 大小可以通過batch.size 控制,預設16kb
  • Sender 執行緒會在有資料積累到batch.size,預設16kb,或者如果資料遲遲未達到batch.sizeSender執行緒等待linger.ms設定的時間到了之後就會獲取資料。linger.ms單位ms,預設值是0ms,表示沒有延遲。
  1. SenderRecordAccumulator 獲取快取的訊息之後,會將資料封裝成網路請求<Node,Request> 的形式,這樣就可以將 Request 請求發往各個 Node 了。
  2. 請求在從 sender 執行緒發往 Kafka 之前還會儲存到 InFlightRequests 中,它的主要作用是快取了已經發出去但還沒有收到伺服器端響應的請求。InFlightRequests預設每個分割區下最多快取5個請求,可以通過設定引數為max.in.flight.request.per. connection修改。
  3. 請求Request通過通道Selector傳送到kafka節點。
  4. 傳送後,需要等待kafka的應答機制,取決於設定項acks.
  • 0:生產者傳送過來的資料,不需要等待資料落盤就應答。
  • 1:生產者傳送過來的資料,Leader 收到資料後應答。
  • -1(all):生產者傳送過來的資料,Leader和副本節點收齊資料後應答。預設值是-1,-1 和all 是等價的。
  1. Request請求接受到kafka的響應結果,如果成功的話,從InFlightRequests 清除請求,否則的話需要進行重發操作,可以通過設定項retries決定,當訊息傳送出現錯誤的時候,系統會重發訊息。retries表示重試次數。預設是 int 最大值,2147483647
  2. 清理訊息累加器RecordAccumulator 中的資料。

生產者重要引數

現在我們來看看kafka生產者中常用且關鍵的設定引數。

  1. bootstrap.servers

生產者連線叢集所需的 broker 地 址 清 單 。 例 如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以設定 1 個或者多個,中間用逗號隔開。注意這裡並非需要所有的 broker 地址,因為生產者從給定的 broker裡查詢到其他 broker 資訊。

  1. key.serializer value.serializer

指定傳送訊息的 key 和 value 的序列化型別。一定要寫全類名。

  1. buffer.memory

RecordAccumulator 緩衝區總大小,預設 32m。

  1. batch.size

緩衝區一批資料最大值,預設 16k。適當增加該值,可以提高吞吐量,但是如果該值設定太大,會導致資料傳輸延遲增加。

  1. linger.ms

如果資料遲遲未達到 batch.size,kafka等待這個時間之後就會傳送資料。單位 ms,預設值是 0ms,表示沒有延遲。生產環境建議該值大小為 5-100ms 之間。

  1. max.request.size

這個引數用來限制生產者使用者端能傳送的訊息的最大值,預設值為 1048576B ,即 lMB 一般情況下,這個預設值就可以滿足大多數的應用場景了。

  1. compression.type

這個引數用來指定訊息的壓縮方式,預設值為「none ",即預設情況下,訊息不會被壓縮。該引數還可以設定為 "gzip","snappy" 和 "lz4"。對訊息進行壓縮可以極大地減少網路傳輸、降低網路 I/O,從而提高整體的效能 。訊息壓縮是一種以時間換空間的優化方式,如果對時延有一定的要求,則不推薦對訊息進行壓縮;

  1. acks

acks的值為0,1和-1或者all。

  • 0表示Producer 往叢集傳送資料不需要等到叢集的返回,不確保訊息傳送成功。安全性最低但是效率最高。
  • 1表示Producer 往叢集傳送資料只要 Leader 成功寫入訊息就可以傳送下一條,只確保 Leader 接收成功。
  • -1 或 all表示Producer 往叢集傳送資料需要所有的 ISR Follower 都完成從 Leader 的同步才會傳送下一條,確保Leader 傳送成功和所有的副本都成功接收。安全性最高,但是效率最低。
  1. max.in.flight.requests.per.connection

允許最多沒有返回 ack 的次數,預設為 5,開啟冪等性要保證該值是 1-5 的數位。

  1. retriesretry.backoff.ms

當訊息傳送出現錯誤的時候,系統會重發訊息。retries表示重試次數。在kafka3.4.0預設是 int 最大值,2147483647。如果設定了重試,還想保證訊息的有序性,需要設定max.in.flight.requests.per.connection=1否則在重試此失敗訊息的時候,其他的訊息可能傳送成功了。另外retry.backoff.ms控制兩次重試之間的時間間隔,預設是 100ms。

更多kafka生產者的設定可以查閱官網https://kafka.apache.org/documentation/#producerconfigs

生產者傳送訊息API

生產者傳送demo

通常情況下,生產者傳送訊息分為以下4個步驟:

(1)設定生產者使用者端引數及建立相應的生產者範例

(2)構建待傳送的訊息

(3)傳送訊息

(4)關閉生產者範例

我們直接上程式碼。

  1. 引入maven依賴
<dependency>
 		<groupId>org.apache.kafka</groupId>
 		<artifactId>kafka-clients</artifactId>
 		<version>3.3.0</version>
</dependency>
  1. 核心傳送邏輯
public static void main(String[] args) {
    // 1. 建立 kafka 生產者的設定物件
    Properties properties = new Properties();
    // 2. 給 kafka 設定物件新增設定資訊:bootstrap.servers
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    // key,value 序列化(必須):key.serializer,value.serializer
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
    // 3. 建立 kafka 生產者物件
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    // 4. 呼叫 send 方法,傳送訊息
    for (int i = 0; i < 5; i++) {
        kafkaProducer.send(new
                ProducerRecord<>("first", Integer.toString(i), "hello " + i));
    }
    // 5. 關閉資源
    kafkaProducer.close();
}
  1. 訊息物件ProducerRecord

kafka傳送時主要構造出ProducerRecord物件,包含傳送的主題,partition,key,value等。

public class ProducerRecord<K, V> {

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
}

三種傳送模式

kafka提供了3種傳送訊息的模式,發後即忘,同步傳送和非同步傳送,我們直接上程式碼。

  1. 發後即忘( fire-and-forget

發後即忘,它只管往 Kafka 傳送,並不關心訊息是否正確到達。 在大多數情況下,這種傳送方式沒有問題。 不過在某些時候(比如發生不可重試異常時)會造成訊息的丟失。 這種傳送方式的效能最高,可靠性最差。

Future<RecordMetadata> send = producer.send(rcd);
  1. 同步傳送( sync ****)

只需在上面種傳送方式的基礎上,再呼叫一下 get()方法即可,該方法時阻塞的。

// 同步傳送
 kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
  1. 帶回撥非同步傳送( async ****)

回撥函數會在 producer 收到 ack 時呼叫,為非同步呼叫,該方法有兩個引數,分別是 RecordMetadataException,如果 Exceptionnull,說明訊息傳送成功,如果 Exception 不為 null,說明訊息傳送失敗。

注意:訊息傳送失敗會自動重試,不需要我們在回撥函數中手動重試。

for (int i = 0; i < 5; i++) {
    kafkaProducer.send(new
            ProducerRecord<>("first", Integer.toString(i), "hello " + i), new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception == null) {
                // 沒有異常,輸出資訊到控制檯
                System.out.println(" 主題: " +
                        metadata.topic() + "->" + "分割區:" + metadata.partition());
            } else {
                // 出現異常列印
                exception.printStackTrace();
            }
        }
    });
}

生產者傳送核心機制

生產者分割區機制

kafka設計上存在分割區的,它有下面兩個好處:

  • 便於合理使用儲存資源,每個Partition在一個Broker上儲存,可以把海量的資料按照分割區切割成一塊一塊資料儲存在多臺Broker上。合理控制分割區的任務,可以實現負載均衡的效果。
  • 提高並行度和吞吐量,生產者可以以分割區為單位傳送資料;消費者可以以分割區為單位進行消費資料。

那究竟生產者是按照什麼樣的策略發往到不同的分割區呢?

根據生產者的傳送流程,其中會經過分割區器,預設情況下是使用DefaultPartitioner,具體邏輯如下:

  1. 按指定分割區傳送

kafka傳送訊息的時候構造訊息物件ProducerRecord,可以傳入指定的partition, 那麼訊息就會傳送這個指定的分割區。例如partition=0,所有資料寫入分割區0。

 // 傳送訊息到0號分割區
kafkaProducer.send(new
        ProducerRecord<>("first", 0, Integer.toString(i), "hello " + i));
  1. 沒有指明partition值但有key的情況下,將keyhash值與topicpartition數進行取餘得到partition值;

例如:key1hash值=5, key2hash值=6 ,topicpartition數=2,那麼key1 對應的value1寫入1號分割區,key2對應的value2寫入0號分割區。

  1. 既沒有partition值又沒有key值的情況下,Kafka採用Sticky Partition(黏性分割區器),會隨機選擇一個分割區,並儘可能一直使用該分割區,待該分割區的batch已滿或者已完成,Kafka再隨機一個分割區進行使用(和上一次的分割區不同)。

例如:第一次隨機選擇0號分割區,等0號分割區當前批次滿了(預設16k)或者linger.ms設定的時間到, Kafka再隨機一個分割區進行使用(如果還是0會繼續隨機)。

  1. 自定義分割區器

如果預設的分割區規則不滿足需求,我們也可以自定義一個分割區器。比如我們實現一個分割區器實現,傳送過來的資料中如果包含 alvin,就發往 0 號分割區,不包含 alvin,就發往 1 號分割區。

  • 實現分割區器介面Partitioner
/**
 * 1. 實現介面 Partitioner
 * 2. 實現 3 個方法:partition,close,configure
 * 3. 編寫 partition 方法,返回分割區號
 */
public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 獲取訊息
        String msgValue = value.toString();
        // 建立 partition
        int partition;
        // 判斷訊息是否包含 alvin
        if (msgValue.contains("alvin")){
            partition = 0;
        }else {
            partition = 1;
        }
        // 返回分割區號
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
  • 設定分割區器
// 新增自定義分割區器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.alvin.kafka.producer.MyPartitioner");

// 傳送訊息 略~~

如何提高生產者吞吐量?

對比著前面kafka生產者的傳送流程,kafka生產者提供的一些設定引數可以有助於提高生產者的吞吐量。

引數名稱 描述
buffer.memory RecordAccumulator 緩衝區總大小,預設 32m。適當增加該值,可以提高吞吐量。
batch.size 緩衝區一批資料最大值,預設 16k。適當增加該值,可以提高吞吐量,但是如果該值設定太大,會導致資料傳輸延遲增加。
linger.ms 如果資料遲遲未達到 batch.sizesender執行緒等待 linger.time之後就會傳送資料。單位 ms,預設值是 0ms,表示沒有延遲。生產環境建議該值大小為 5-100ms 之間。
compression.type 指定訊息的壓縮方式,預設值為「none ",即預設情況下,訊息不會被壓縮。該引數還可以設定為 "gzip","snappy" 和 "lz4"。對訊息進行壓縮可以極大地減少網路傳輸、降低網路 I/O,從而提高整體的效能 。

如何保證生產者訊息的可靠性?

為了保證訊息傳送的可靠性,kafkaproducer 裡面提供了訊息確認機制。我們可以通過設定來決 定訊息傳送到對應分割區的幾個副本才算訊息傳送成功。可以在定義 producer 時通過 acks 引數指定。

  • acks=0

生產者傳送過來的資料,不需要等資料落盤應答。

  • acks=1(預設值)

生產者傳送過來的資料,Leader收到資料後應答。

  • acks=-1或者all

生產者傳送過來的資料,LeaderISR佇列裡面的所有節點收齊資料後應答。

ISR 概念:(同步副本)。每個分割區的 leader 會維護一個 ISR 列表,ISR 列表裡面就是 follower 副本 的 Borker 編 號 , 只 有 跟 得 上 Leaderfollower 副 本 才 能 加 入 到 ISR 裡 面 , 這 個 是 通 過 replica.lag.time.max.ms =30000(預設值)引數設定的,只有 ISR 裡的成員才有被選為 leader 的可能。

如果Leader收到資料,所有Follower都開始同步資料,但有一個Follower ,因為某種故障,遲遲不能與Leader進行同步,那這個問題怎麼解決呢?

Leader維護了一個動態的in-sync replica setISR),意為和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。如果Follower長時間未向Leader傳送通訊請求或同步資料,則該Follower將被踢出ISR。該時間閾值由replica.lag.time.max.ms引數設定,預設30s

小結:資料完全可靠條件 = ACK級別設定為-1 + 分割區副本大於等於2 + ISR裡應答的最小副本數量大於等於2。

  • acks=0,生產者傳送過來資料就不管了,可靠性差,效率高;
  • acks=1,生產者傳送過來資料Leader應答,可靠性中等,效率中等;
  • acks=-1或者all,生產者傳送過來資料LeaderISR佇列裡面所有Follwer應答,可靠性高,效率低;

在生產環境中,acks=0很少使用;acks=1,一般用於傳輸普通紀錄檔,允許丟個別資料;acks=-1,一般用於傳輸和錢相關的資料,對可靠性要求比較高的場景。

如何保證訊息只傳送一次?

kafka作為分散式訊息系統,難免會出現重複訊息或者丟訊息的情況,會存在3種資料傳遞語意。

  • 最多一次(At Most Once)

ack級別設定為0, 可以保證資料不重複,但是不能保證資料不丟失, 所以叫做最多一次。

  • 至少一次(At Least Once)

ack級別設定為-1 + 分割區副本大於等於2 + ISR裡應答的最小副本數量大於等於2可能會出現至少一次的訊息。比如下圖中在傳送過程Leader節點宕機,訊息就會重試,就有可能出現訊息的重複。

At Least Once可以保證資料不丟失,但是不能保證資料不重複。

  • 精確一次(Exactly Once)

對於一些非常重要的資訊,比如和錢相關的資料,要求資料既不能重複也不丟失。這在kafka中可以通過冪等性和事務的特性實現。

精確一次(Exactly Once) = 冪等性 + 至少一次( ack=-1 + 分割區副本數>=2 + ISR最小副本數量>=2) 。

冪等性,簡單來說,就是一個操作重複做,每次的結果都一樣。開啟冪等性功能,引數enable.idempotence 設定為 true即可,在3.x版本中預設情況下也是true。具體實現原理如下:

  1. 每一個 producer 在初始化時會生成一個 producer_id,併為每個目標 partition 維護一個「序列號」。
  2. producer 每傳送一條訊息,會將<producer_id,分割區>對應的「序列號」加 1。
  3. broker 伺服器端端會為每一對<producer_id,分割區>維護一個序列號,對於每收到的一條訊息,會判斷伺服器端 的 SN_old 和接收到的訊息中的 SN_new 進行對比:
    • 如果 SN_OLD+1 = SN_NEW,正常情況
    • 如果 SN_old+1>SN_new,說明是重複寫入的資料,直接丟棄
    • 如果 SN_old+1<SN_new,說明中間有資料尚未寫入,或者是發生了亂序,或者是資料丟失,將丟擲嚴重異常:OutOfOrderSequenceException

如何保證生產者訊息的順序?

根據前面的生產者傳送流程可以知道,要想保證訊息投遞的順序性:

  1. 首先要保證單分割區,因為單分割區內是有序的,多分割區,分割區與分割區間無序。
  2. kafka在1.x版本之前保證資料單分割區有序,條件如下:
  • max.in.flight.requests.per.connection=1
  1. kafka在1.x及以後版本保證資料單分割區有序,條件如下:
  • 未開啟冪等性,max.in.flight.requests.per.connection需要設定為1。
  • 開啟冪等性,max.in.flight.requests.per.connection需要設定小於等於5。

因為在kafka1.x以後,啟用冪等後,kafka伺服器端會快取producer發來的最近5個request的後設資料,故無論如何,都可以保證最近5個request的資料都是有序的。

總結

本文總結了kafka生產者整個訊息傳送的流程,只有明白了這個流程以後,那麼我們對於一些生產者訊息傳送的一些問題才有更加深刻的理解。

歡迎關注個人公眾號【JAVA旭陽】交流學習