Summary
這篇文章探討了如何利用Flink CDC和Apache Paimon搭建一個能夠應對Schema演化挑戰的IoT數據管道,並分享了我在實作過程中的一些啟發與體會。 Key Points:
- 探索如何運用Confluent Schema Registry來管理Avro Schema,提升IoT數據管道的兼容性與效率。
- 深入剖析Flink CDC在Schema演化過程中的容錯機制,確保數據一致性與穩定性。
- 展示Apache Paimon如何與Flink CDC及Schema Registry協同工作,構建支援Schema演化的高效數據湖。
從Kafka到Flink CDC的概述
在這篇部落格的第三部分中,我們將探討如何通過Flink CDC Action框架,將來自Kafka的IoT數據以JSON格式包裝,經Avro字節流序列化後,推送至Kafka主題,再利用Flink CDC提取並存儲到Apache Paimon上。這一系列操作最終將數據寫入到S3上的Apache Parquet文件中。
回顧之前的文章,我們曾經展示過如何將JSON格式的IoT生成數據插入到MongoDB集合中,隨後使用Apache Flink的flink_paimon_action框架進行變更數據捕獲(CDC),從而將數據轉存至Apache Paimon資料庫。最近,我們又把同樣的数据发布到了Confluent Kafka主题,并利用Flink CDC再次提取数据。这次,我们直接从Kafka主题获取数据,并将其存储为基于Apache Paimon的表中的Parquet文件,这些文件最終會被寫入由MinIO服務器提供的S3物件儲存中。
值得注意的是,之前僅僅處理了簡單的JSON字節流,但這次涉及到了更為複雜和豐富的IOT數據。在這個過程中,我們看到Kafka與Flink CDC之間是如何高效地協同工作的,它們共同實現了即時數據處理,使得業務能夠快速響應市場需求。此外,通過具體使用案例,可以讓讀者更加明白這種技術在實際應用中的價值。而且,在考慮資料的一致性和容錯機制方面,也彰顯出使用Flink CDC所帶來的優勢和保障措施。
回顧之前的文章,我們曾經展示過如何將JSON格式的IoT生成數據插入到MongoDB集合中,隨後使用Apache Flink的flink_paimon_action框架進行變更數據捕獲(CDC),從而將數據轉存至Apache Paimon資料庫。最近,我們又把同樣的数据发布到了Confluent Kafka主题,并利用Flink CDC再次提取数据。这次,我们直接从Kafka主题获取数据,并将其存储为基于Apache Paimon的表中的Parquet文件,这些文件最終會被寫入由MinIO服務器提供的S3物件儲存中。
值得注意的是,之前僅僅處理了簡單的JSON字節流,但這次涉及到了更為複雜和豐富的IOT數據。在這個過程中,我們看到Kafka與Flink CDC之間是如何高效地協同工作的,它們共同實現了即時數據處理,使得業務能夠快速響應市場需求。此外,通過具體使用案例,可以讓讀者更加明白這種技術在實際應用中的價值。而且,在考慮資料的一致性和容錯機制方面,也彰顯出使用Flink CDC所帶來的優勢和保障措施。
如何使用Avro序列化改進IoT數據流
在這個真實世界的物聯網流程中,我們總是希望能夠改進、加快或優化。因此,這次我們將對之前的數據載荷進行「打磨」,首要步驟是使用Avro進行序列化。為了達成這一目標,我們需要在Confluent Schema Registry上註冊基於Avro的模式和鍵。此時,我們生成的數據仍然來自於一個位於_app_iot1/_中的Python IoT程式(現在已經容器化),該程式是在上篇博客中創建的。
首先,在_app_iot1_中,我們創造出最簡單的IoT JSON載荷。這一步驟通過設定_TSHUMAN、STRUCMOD與DEVICETYPE等值為0來完成。而在接下來的_app_iot2_中,我們則擴展了載荷,其中_TSHUMAN被設置為1,而_STRUCMOD也設置為1。
值得注意的是,使用Avro序列化有幾個顯著優勢:首先,它具備自描述特性,使得當數據結構變更時仍能保持良好的兼容性;其次,Avro支持壓縮技術,可以有效降低數據傳輸量,提高整體效率。此外,它與Flink CDC的集成讓即時數據處理更加順暢。因此,引入Schema Registry來管理不同版本的模式,有助於確保在維護和演進過程中的一致性,也是一種值得考慮的方法。
首先,在_app_iot1_中,我們創造出最簡單的IoT JSON載荷。這一步驟通過設定_TSHUMAN、STRUCMOD與DEVICETYPE等值為0來完成。而在接下來的_app_iot2_中,我們則擴展了載荷,其中_TSHUMAN被設置為1,而_STRUCMOD也設置為1。
值得注意的是,使用Avro序列化有幾個顯著優勢:首先,它具備自描述特性,使得當數據結構變更時仍能保持良好的兼容性;其次,Avro支持壓縮技術,可以有效降低數據傳輸量,提高整體效率。此外,它與Flink CDC的集成讓即時數據處理更加順暢。因此,引入Schema Registry來管理不同版本的模式,有助於確保在維護和演進過程中的一致性,也是一種值得考慮的方法。
Extended Perspectives Comparison:
結論標題 | 內容 |
---|---|
數據管道概述 | 本篇文章探討如何利用Apache Flink實現Kafka與Paimon之間的數據同步,強調有效載荷的處理及模式演變。 |
CDC支持 | 使用Debezium Avro格式以捕捉變更數據,並確保系統在面對不同測試模式下仍保持穩定性能。 |
檢查點機制 | 配置檢查點以確保數據的一致性和可靠性,是高可用性系統中不可或缺的重要步驟。 |
模式註冊重要性 | 正確設置schema registry URL及相關參數是成功運行流程的關鍵,有助於自動化管理資料結構變更。 |
最佳實踐建議 | 在進入生產環境之前,詳細測試各項配置,以達成最佳效果並避免潛在問題。 |

設置Confluent Schema Registry的步驟
這段文字中,我們為元數據標籤部分新增了一個可讀的日期欄位及位置物件。在_app_iot3_當中,我們更進一步將_DEVICETYPE=1_加入到有效載荷之中,這樣做可以為設備類型定義一個文本字符串。這展示了JSON在物聯網數據中的靈活性,以及我們如何能夠在整個流程中,從數據來源到數據存儲,都適應動態的數據結構。對於目錄服務,我們將使用[Apache Hive]及其[Metastore]功能,這是在之前的博客中已經介紹過的(不過最近進行了一些版本更新)。如果您有關注我之前的文章,可能會注意到我已經升級了我的[Confluent] Kafka集群(現在是7.7.1),以及[Apache Flink]環境(現在是1.19.1)。同時,[Apache Paimon]套件也升級到了0.9.0。
利用Python應用生成IoT數據流
如往常一樣,所有的程式碼都可以在[GIT儲存庫]中找到。是的,我們仍然使用大量的Makefile、Docker-compose.yml和Dockerfile。我決定不再偏離主題,這次將集中於如何利用我們的Python應用來創建IoT數據流,模擬在工廠內設備上讀取感測器的情況。這些[JSON]文件將被推送到我們名為_factory_iot_的Kafka主題上,而主題的_key_則設定為各個工廠的_siteId_。我把這些工廠分成三組:北區、南區和東區,以此來模擬某種區域分佈。
接下來,我們需要了解一些關鍵概念。首先,**數據生成原理**是核心所在,我們會解釋如何模擬現實世界中的IoT設備行為,以便生成高效且可靠的數據流。例如,可以考慮傳輸溫度、濕度等不同類型資料,每種資料都有其特定範例代碼以供參考。
然後,在**材質選擇**方面,不同場景適合不同數據類型,因此我們會提供相關示例,幫助更好地理解如何選擇合適的數據格式。此外,我們也會討論到**參數化設計**,說明該如何通過配置文件或命令列參數靈活調整數據產生頻率與內容,使之能夠更好地適應不斷變化的需求。
最後,我們將利用Kafka CDC功能透過action frame直接將這些資料複製到Apache Paimon表中。
接下來,我們需要了解一些關鍵概念。首先,**數據生成原理**是核心所在,我們會解釋如何模擬現實世界中的IoT設備行為,以便生成高效且可靠的數據流。例如,可以考慮傳輸溫度、濕度等不同類型資料,每種資料都有其特定範例代碼以供參考。
然後,在**材質選擇**方面,不同場景適合不同數據類型,因此我們會提供相關示例,幫助更好地理解如何選擇合適的數據格式。此外,我們也會討論到**參數化設計**,說明該如何通過配置文件或命令列參數靈活調整數據產生頻率與內容,使之能夠更好地適應不斷變化的需求。
最後,我們將利用Kafka CDC功能透過action frame直接將這些資料複製到Apache Paimon表中。

推送數據到Kafka主題的操作流程
一旦腳手架搭建完成,接下來的步驟就是這些了。要運行整個堆疊,可以按照以下順序執行命令:首先進入開發實驗室目錄 _cd devlab_,然後運行 _make run_,在執行下一個命令之前,建議等待30秒到1分鐘。接著輸入 _make deploy_,然後轉到 creTopics 目錄,再依次執行 _./creTopics.sh_、_./reg_key.sh_ 和 _./reg_value.sh_。
**接下來的步驟:**首先啟動 _app_iot_。這可以通過容器化版本來完成(在 devlab 內部執行 _make rp1_),或者使用在
**接下來的步驟:**首先啟動 _app_iot_。這可以通過容器化版本來完成(在 devlab 內部執行 _make rp1_),或者使用在
/app_iot1/ 目錄中的 ./run1.sh 腳本。如果選擇通過腳本啟動應用,你還可以類似地運行 _app_iot2_ 和 _app_iot3_,只需分別執行 ./run2.sh 和 ./run3.sh;若是使用 make 指令,那麼則可透過運行 _make rp2_ 和 _make rp3_ 啟動它們。停止應用時,可以使用 make 指令並替換 # 為 1、2 或 3。
以下是一個範例 JSON 負載格式:
{
"ts": 123421452622,
"siteId": 1009,
"metadata": {
"deviceId": 1042,
"sensorId": 10180,
"unit": "Psi",
"ts_human": "2024-10-02T00:00:00.869Z"
},
"measurement": 1013.3997
}
這段代碼將會被發布至我們的 Kafka 主題上。在此,我們先仔細看看這份負載資料。你會注意到,它與之前的版本有些不同。我們需要對原始負載做一些修改,以符合目前展示的格式,而這也是我們在 Python 程式碼中進一步打包時需要考慮的部分。此外,在推送數據至 Kafka 主題時,也強烈建議關注消息格式的一致性與擴展性,例如採用 Avro 或 JSON 格式。同樣重要的是,要詳細說明如何配置 Flink 與 Kafka 的連接,包括重試次數及超時設置等參數,以提高系統穩定性。同時,也值得探討流量控制和錯誤處理機制,以增強整體架構的健壯性。
理解CDC字段和Avro序列化的要求
這一切都是為了滿足CDC字段的要求,以配合我們在引入Avro序列化時,對於有效負載和架構的處理,以及使用模式註冊中心的需求。其中一個主要改動是將_siteId_移到根層級,因為它被指定為有效負載的鍵,同時新增了“op”標籤,預設為“c”,表示創建該記錄,所有這些都包裹在“before”和“after”的區塊中。正如所說,這一小部分花了我相當長的時間才弄明白(最後一步多虧了ChatGPT,哈哈,我們必須善用手邊的資源)……结合下面修改過的/_opt/flink/bin/flink_命令。**請參見** _app_iot1/simulate.py行93到120。
接下來的一點……我不是Java專家,所以這些jar文件如何協同工作對我來說真的是一次探索之旅,有時讓我感到快要崩潰。需要注意的是,在_
接下來的一點……我不是Java專家,所以這些jar文件如何協同工作對我來說真的是一次探索之旅,有時讓我感到快要崩潰。需要注意的是,在_
/devla_b/data/flink/lib/*資料夾中增加了一些jar文件,以便讓這一切能運作,而不再像以前那樣。完整的檔案清單可以在_/devlab/getlibs.sh_找到。**在此我要特別感謝:** [Giannis Polyzos] - Fluss首席流媒體架構師 | [Ververica] 主管 | Apache Flink &> [Jark Wu] - [Fluss] 和阿里巴巴雲[Ververica] 的Flink SQL主管。他們幫助我指出那些「你缺少了什麼」的信息。
等系統運行起來後,只需在_/devlab_目錄中執行_make jm_命令,就能啟動Flink任務管理器並進入命令行界面。

啟動Flink作業管理器並訂閱Kafka主題
要開始我們的“訂閱”流程,將資料從來源主題(_factory_iot_)推送到目標的Apache Paimon數據庫/表格中,你可以直接複製以下指令並執行。在此之前,請確保你已經啟動了資料生成器,因為這樣一來在執行命令時,就會有一些結構化的有效載荷可供推斷架構。如果不這麼做,我們就必須先手動創建表格結構。
這段指令主要是利用Flink來實現Kafka與Paimon之間的數據同步。透過設置相關參數,如Kafka伺服器地址、主題名稱及序列化格式等,我們能夠即時處理和傳輸資料到指定的位置。值得注意的是,在選擇序列化格式時,我們使用了Debezium Avro格式,以便更好地處理變更數據捕捉(CDC)場景。此外,也配置了檢查點機制以確保數據的一致性和可靠性,對於需要高可用性的系統而言,這是一個不可或缺的重要步驟。
整個流程中的每一步都相當關鍵,包括如何管理偏移量、設定消費者組ID及對應schema註冊中心的URL等,都影響著最終服務的穩定性與性能。因此,在進入生產環境前,不妨多花些時間詳細瞭解並測試各項配置,以達到最佳效果。
/opt/flink/bin/flink run \
/opt/flink/lib/paimon/paimon-flink-action-0.9.0.jar \
kafka_sync_table \
-Dexecution.checkpointing.interval=10s \
-Dexecution.checkpointing.num-retained=5 \
-Dstate.checkpoints.num-retained=10 \
-Dpipeline.name='sync-kafka-topic-to-paimon-s3' \
--kafka_conf properties.bootstrap.servers=broker:29092 \
--kafka_conf topic=factory_iot \
--kafka_conf value.format=debezium-avro \
--kafka_conf key.format=debezium-avro \
--kafka_conf key.field=siteId \
--kafka_conf properties.group.id=123456 \
--kafka_conf schema.registry.url=http://schema-registry:9081 \
--kafka_conf scan.startup.mode=earliest-offset \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://metastore:9083 \
--warehouse s3a://warehouse/paimon/ \
--database iot \
--table factory_iot \
--table_conf changelog-producer=input \
--table_conf write-mode=append-only \
--table_conf sink.parallelism=4
這段指令主要是利用Flink來實現Kafka與Paimon之間的數據同步。透過設置相關參數,如Kafka伺服器地址、主題名稱及序列化格式等,我們能夠即時處理和傳輸資料到指定的位置。值得注意的是,在選擇序列化格式時,我們使用了Debezium Avro格式,以便更好地處理變更數據捕捉(CDC)場景。此外,也配置了檢查點機制以確保數據的一致性和可靠性,對於需要高可用性的系統而言,這是一個不可或缺的重要步驟。
整個流程中的每一步都相當關鍵,包括如何管理偏移量、設定消費者組ID及對應schema註冊中心的URL等,都影響著最終服務的穩定性與性能。因此,在進入生產環境前,不妨多花些時間詳細瞭解並測試各項配置,以達到最佳效果。
測試模式演變及其在數據流中的影響
即使文檔中未明確要求設定 `scan.startup.mode=earliest-offset`,但請務必包含此參數,即便是在處理新主題時。那些閱讀過之前博客的人會注意到,為了讓這個過程正常運行,有幾個額外的值是必需的。這些都是與使用 Avro 序列化及模式註冊相關的改進,旨在將有效載荷傳輸到 Kafka 主題上。目前,如果你打開 Apache Flink 控制台,你應該能看到類似以下內容。
深入查看後,你會發現一些有趣且簡單的東西。現在我們可以進行模式演變測試。執行 _app_iot2_ 應用程序,如同你之前所見,它會將 `_ts_human_` 和 `_location_` 物件添加至有效載荷中。
這段 JSON 結構清楚地展示了資料隨著時間而演變的狀態。在實際操作中,Flink CDC 能夠捕捉到數據結構的變更,而 Apache Paimon 則支持動態調整 schema,使得系統在面對不同測試模式下仍能保持性能穩定。不妨考慮加入具體案例或實驗結果,以便讀者更加直觀地理解各種測試模式對系統影響。此外,使用可視化圖表來展示資料流如何隨著 schema 的演變而變化,也將大幅提升文章的可讀性和吸引力。
深入查看後,你會發現一些有趣且簡單的東西。現在我們可以進行模式演變測試。執行 _app_iot2_ 應用程序,如同你之前所見,它會將 `_ts_human_` 和 `_location_` 物件添加至有效載荷中。
{
"ts": 123421452622,
"siteId": 1009,
"metadata": {
"deviceId": 1042,
"sensorId": 10180,
"unit": "Psi",
"ts_human": "2024-10-02T00:00:00.869Z",
"location": {
"latitude": -26.195246,
"longitude": 28.034088
}
},
"measurement": 1013.3997
}
這段 JSON 結構清楚地展示了資料隨著時間而演變的狀態。在實際操作中,Flink CDC 能夠捕捉到數據結構的變更,而 Apache Paimon 則支持動態調整 schema,使得系統在面對不同測試模式下仍能保持性能穩定。不妨考慮加入具體案例或實驗結果,以便讀者更加直觀地理解各種測試模式對系統影響。此外,使用可視化圖表來展示資料流如何隨著 schema 的演變而變化,也將大幅提升文章的可讀性和吸引力。

將擴展後的payload發佈至Kafka主題的方法
這樣的操作會產生以下在主題上的有效負載。最後,隨著我們愈加幸運,執行 _app_iot3_ 會將 _devicetype_ 欄位添加到元資料標籤中。
就這樣...相比之前的文章,這次真的短了不少呢。
{
"timestamp": "2024-10-02T00:00:00.869Z",
"siteId": 1009,
"metadata": {
"deviceId": 1042,
"sensorId": 10180,
"unit": "Psi",
"ts_human": "2024-10-02T00:00:00.869Z",
"location": {
"latitude": -26.195246,
"longitude": 28.034088
},
"deviceType": "Oil Pump"
},
"measurement": 1013.3997
}
就這樣...相比之前的文章,這次真的短了不少呢。
總結構建自動化數據管道的關鍵要素
總結來說,我們建立了一個數據管道,從Kafka主題中提取資料,這個管道能夠處理模式演變,並將數據流入我們的Apache Paimon數據存儲,最終以Apache Parquet格式保存。我覺得這真的很有趣……希望你也享受了這次的探索。祝你好運,這一切都充滿了無數的兔子洞,你可能會輕易迷失其中,但那也是樂趣的一部分。為了執行這篇博客,你可以從根目錄中的README.md開始,它會告訴你確切需要執行的步驟,以便下載所有依賴項並構建整個系統。
Reference Articles
Related Discussions
這些主題聽起來真的很有趣,但我有點疑惑,關於如何處理模式演變的部分,你們是不是能再深入探討一下?畢竟在實際應用中,這可是個棘手的問題啊!
這些文章真的很有幫助!我最近也在學習如何利用IoT數據流,想知道大家對於Kafka和Flink的使用經驗是什麼?感覺這些技術能讓我們的生活變得更便利呢!