一文教會你用Apache SeaTunnel Zeta離線把資料從MySQL同步到StarRocks

2023-05-26 18:00:19

在上一篇文章中,我們介紹瞭如何下載安裝部署SeaTunnel Zeta服務(3分鐘部署SeaTunnel Zeta單節點Standalone模式環境),接下來我們介紹一下SeaTunnel支援的第一個同步場景:離線批次同步。顧名思意,離線批次同步需要使用者定義好SeaTunnel JobConfig,選擇批次處理模式,作業啟動後開始同步資料,當資料同步完成後作業完成退出。

下面以MySQL離線同步到StarRocks為例,介紹如何使用SeaTunnel進行離線同步作業的定義和執行。

1. 定義作業組態檔

SeaTunnel使用組態檔來定義作業,在這個範例中,作業的組態檔如下,檔案儲存路徑~/seatunnel/apache-seatunnel-incubating-2.3.1/config/mysql_to_sr.config

#定義一些作業的執行引數,具體可以參考 https://seatunnel.apache.org/docs/2.3.1/concept/JobEnvConfig
env {
	job.mode="BATCH"  #作業的執行模式,BATCH=離線批同步,STREAMING=實時同步
	job.name="SeaTunnel_Job"
	checkpoint.interval=10000 #每10000ms進行一次checkpoint,後面會詳細介紹checkpoint對JDBC Source和StarRocks Sink這兩個聯結器的影響
}
source {
	Jdbc {
    	parallelism=5 # 並行度,這裡是啟動5個Source Task來並行的讀取資料
    	partition_column="id" # 使用id欄位來進行split的拆分,目前只支援數位型別的主鍵列,而且該列的值最好是離線的,自增id最佳
    	partition_num="20" # 拆分成20個split,這20個split會被分配給5個Source Task來處理
    	result_table_name="Table9210050164000"
    	query="SELECT `id`, `f_binary`, `f_blob`, `f_long_varbinary`, `f_longblob`, `f_tinyblob`, `f_varbinary`, `f_smallint`, `f_smallint_unsigned`, `f_mediumint`, `f_mediumint_unsigned`, `f_int`, `f_int_unsigned`, `f_integer`, `f_integer_unsigned`, `f_bigint`, `f_bigint_unsigned`, `f_numeric`, `f_decimal`, `f_float`, `f_double`, `f_double_precision`, `f_longtext`, `f_mediumtext`, `f_text`, `f_tinytext`, `f_varchar`, `f_date`, `f_datetime`, `f_timestamp` FROM `sr_test`.`test1`"
    	password="root@123"
    	driver="com.mysql.cj.jdbc.Driver"
    	user=root
    	url="jdbc:mysql://st01:3306/sr_test?enabledTLSProtocols=TLSv1.2&rewriteBatchedStatements=true"
	}
}
transform {
# 在本次範例中我們不需要做任務的Transform操作,所以這裡為空,也可以將transform整個元素刪除
}
sink {
	StarRocks {
    	batch_max_rows=10240 # 
    	source_table_name="Table9210050164000"
    	table="test2"
    	database="sr_test"
    	base-url="jdbc:mysql://datasource01:9030"
    	password="root"
    	username="root"
    	nodeUrls=[
        	"datasource01:8030" #寫入資料是通過StarRocks的Http介面
    	]
	}
}

2. 作業設定說明

在這個作業定義檔案中,我們通過env定義了作業的執行模式是BATCH離線批次處理模式,同時定義了作業的名稱是"SeaTunnel_Job"。checkpoint.interval引數用來定義該作業過程中多久進行一次checkpoint,那什麼是checkpoint,以及checkpoint在Apache SeaTunnel中的作用是什麼呢?

2.1 checkpoint

檢視官方檔案中對Apache SeaTunnel Zeta引擎checkpoint的介紹: https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/checkpoint-storage 發現checkpoint是用來使執行在Apache SeaTunnel Zeta中的作業能定期的將自己的狀態以快照的形式儲存下來,當任務意外失敗時,可以從最近一次儲存的快照中恢復作業,以實現任務的失敗恢復,斷點續傳等功能。其實checkpoint的核心是分散式快照演演算法:Chandy-Lamport 演演算法,是廣泛應用在分散式系統,更多是分散式計算系統中的一種容錯處理理論基礎。這裡不詳細介紹Chandy-Lamport 演演算法,接下來我們重點說明在本範例中checkpoint對這個同步任務的影響。

Apache SeaTunnel Zeta引擎在作業啟動時會啟動一個叫CheckpointManager的執行緒,用來管理這個作業的checkpoint。SeaTunnel Connector API提供了一套checkpoint的API,用於在引擎觸發checkpoint時通知具體的Connector進行相應的處理。SeaTunnel的Source和Sink聯結器都是基於SeaTunnel Connector API開發的,只是不同的聯結器對checkpoint API的實現細節不同,所以能實現的功能也不同。

2.1.1 checkpoint對JDBC Source的影響

在本範例中我們通過JDBC Source聯結器的官方檔案https://seatunnel.apache.org/docs/2.3.1/connector-v2/source/Jdbc 可以發現如下內容:

這說明JDBC Source聯結器實現了checkpoint相關的介面,通過原始碼我們可以得知,當checkpoint發生時,JDBC Source會將自己還未處理的split做為狀態的快照傳送給CheckpointManager進行持久化儲存。這樣當作業失敗並恢復時,JDBC Source會從最近一次儲存的快照中讀取哪些split還未處理,然後接著處理這些split。

在該作業中通過partition_num=20,會將query引數中指定的sql語句的結果分成20個split進行處理,每個split會生成讀取它負責的資料的sql,這個sql是由query中指定的sql再加上一些where過濾條件組成的。這20個split會被分配給5個Source Task進行處理,理想情況下,每個Source Task會分配到4個split。假設在一次checkpoint時每個Source Task都只剩下一個split沒有處理,這個split的資訊會被儲存下來,如果這之後作業掛掉了,作業會自動進行恢復,恢復時每個Source Task都會獲取到那個還未處理的split,並接著進行處理。如果作業不再報錯,這些split都處理完成後,作業執行完成。如果作業還是報錯(比如目標端StarRocks掛了,無法寫入資料),最終作業會以失敗狀態結束。

斷點續傳:

如果在作業失敗後,我們修復了問題,並且希望該作業接著之前的進度執行,只處理那些之前沒有被處理過的split,可以使用 sh seatunnel.sh -r jobId來讓作業ID為jobId的作業從斷點中恢復。

回到主題,checkpoint.interval=10000對於從Mysql中讀取資料意味著每過10s,SeaTunnel Zeta引擎就會觸發一次checkpoint操作,然後JDBC Source Task會被要求將自己還未處理的split資訊儲存下來,這裡需求注意的是,JDBC Source Task讀取資料是以split為單位的,如果checkpoint觸發時一個split中的資料正在被讀取還未完全傳送給下游的StarRocks,它會等到這個split的資料處理完成之後才會響應這次checkpoint操作。這裡一定要注意,如果MySQL中的資料量比較大,一個split的資料需要很長的時候才能處理完成,可能會導致checkpoint超時。關於checkpoint的超時時長可以引數https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/checkpoint-storage, 預設是1分鐘。

2.1.2 checkpoint對StarRocks Sink的影響

在Sink聯結器的檔案上,我們也能看到如下圖中的標識:

這個標識代表該Sink聯結器是否實現了精確處理一次的語意,如果該標識被選中,說明這個Sink聯結器能保證發給它的資料它只會往目標端寫入一次,不會漏掉導致目標端資料丟失 ,也不會重複往目標端寫入。這一功能常見的實現方式是兩階段提交,支援事務的聯結器一般會先開啟事務進行資料的寫入。當checkpoint發生時,將事務ID返回給CheckManager進行持久化,當作業中的所有Task都響應了CheckManager的checkpoint請求後,第一階段完成。然後Apache SeaTunnel Zeta引擎會呼叫AggregateCommit的方法讓Sink對其事務進行提交,這個過程被稱為第二階段,第二階段完成後該次checkpoint完成。如果第二階段提交失敗,作業會失敗,然後自動恢復,恢復後會再次從第二階段開始,要求對事務進行提交,直到該事務提交完成,如果事務一直失敗,作業也將失敗。

並不是只有實現了exactly-once特性的Sink聯結器才能保證目標端的資料不丟失不重複,如果目標端的資料庫支援以主鍵去重,那隻要Sink聯結器保證傳送給它的資料至少往目標端寫入一次,無論重複寫入多少次,最終都不會導致目標端資料丟失或重複。在該範例中StarRocks Sink聯結器即是使用了這種方式,StarRocks Sink聯結器會將收到的資料先快取在記憶體中,當快取的行數達到batch_max_rows設定的10240行,就會發起一次寫入請求,將資料寫入到StarRocks中。如果MySQL中的資料量很小,達不到10240行,那就會在checkpoint觸發時進行StarRocks的寫入。

3. 執行作業

我們使用Apache SeaTunnel Zeta引擎來執行該作業

cd ~/seatunnel/apache-seatunnel-incubating-2.3.1
sh bin/seatunnel.sh --config config/mysql_to_sr.config

作業執行完成後可以看到如下資訊,說明作業狀態為FINISHED,讀取20w行資料,寫入StarRocks也是20w行資料,用時6s。