介紹 postgres logical decoding 的概念與示範
在之前的文章有介紹過 logical replication 的概念,但其實 logical 這個概念是從 Postgres 9.4 開始提供 logical decoding 功能而來的,也是因為在 9.4 提供這個功能後,開始能夠利用這個特性實現自己的 logical replication。因此今天來介紹一下,什麼是 logical decoding,它的運作原理是什麼以及示範一下用法。
logical decoding concept
概念說起來可以很簡單:
-
Decode WAL 資料來取得 DML changes,也就是可以拿到 INSERT、DELETE、UPDATE SQL 操作的資料,有點類似 trigger event 的意思,透過接收這種 event 你可以拿到相關的資料
-
將 changes stream 傳給 receiver,這邊的 receiver 也就是接收 replication conn 的 client,後面會在提到更詳細。
-
而 receiver 拿到 changes 後就可以選擇傳給下游的 external consumers,這就是為何可以透過這種概念來實作自己的 logical replication。而官方提供的 publish & subscribe 的 logical replication 機制是在 11 才推出的,因此在 11 之前就有透過這樣的方式來實現 logical replication,例如知名的 pglogical,那比 9.4 更早的版本則是會透過 trigger event 來實現,例如知名的 slony。
logical replication slot
那我們來一一拆解這三個步驟是怎麼運作的:
首先要先認識 replication slots 的概念,如果有用過 Postgres 原生提供的 Physical Replication,就會知道有個難題要處理 archive 的機制,為什麼要處理?是因為當今天 replica 如果跟不上 primary 的速度,但 primary 就已經將過期的 wal 給清掉,那 replica 就無法拿到之前的資料了。因此有兩個方式是透過 archive_command 或是 replication slot 的機制。這邊的關於 physical replication 的機制可以參考這篇 Postgres - 設定 physical replication 教學 | Kenny’s Blog
。replication slot 本身就是存放 WAL 的地方,只有當 replica 有去 consume 後,replication slot 才會把對應的 WAL 給清掉。
因此,回過來說,有 physical replication slot 就會有 logical replication slot,因此 primary 做了任何的 DML changes 就會將對應的 WAL 存在 logical replication slot 裡面。一個 primary 裡面可以有包含多個 logical replication slot,每個 replication slot 有自己的狀態,通常一個 logical replication slot 只會被一個 replica receiver 所接收,你也可以多個 receiver 去對應一個 logical replication slot,但是同一時間只能有一個 receiver 可以去接收 slot 給出來的 changes。
而 logical decoding 的使用時機就是 replication slot -> logical decoding -> replica receiver,而 replica receiver 並不用真的要我們去建立一個新的 PG 去做設定,而是將 receiver 這段可以換成我們自己實作的 external consumer,所以你如果要藉此同步另外一個 PG 也是可行的,一樣的道理。
你可能會問,今天如果當 receiver 這邊斷線了,想要從上次斷掉的數據開始讀 changes 是有辦法的嗎?答案是可以的,原因在於 replication slot 本身有一個 identifier,這是一個標誌且在 PostgreSQL cluster 是唯一的。在啟動 receiver 之前要先連到 postgres://postgres@postgres_source:5432/postgres?sslmode=disable&replication=database
透過帶上 replication=database
參數,讓 PG 知道該連線是關於 replication connection 的連線。其實就是參考這個 Streaming Replication Protocol),PG 收到這個 connection 會進去 wal sender 的模式,接著你可以對 PG 送 IDENTIFY_SYSTEM
命令,PG 會給你當前 Xlogpos
,也就是當前 WAL 的 flush location,這個就是 receiver 該拿這個值去跟 PG 説,我要拿這個位置之後的 WAL 資料,而 receiver 的 identifier 如果不符合對應的 replication slot 的 identifier 就無法成功開始 replication 機制。
總的來說這邊 receiver 的實際運作流程會是:
-
setup replication conn
-
create replication slot 這邊要帶上要使用哪個
logical decoding plugin name
,而如果已經建立的話,則判斷 pg error code 不等於42710
,這邊可參考 PostgreSQL Error Codes例如:
1
SELECT * FROM pg_create_logical_replication_slot('logical_slot', 'test_decoding');
在 primary 的 postgresql.conf 需要設定
max_replication_slots > 0
跟wal_level = logical
。 -
identify system
-
start replication 這邊要帶上當前的
Xlogpos
、replication slot name
、還要對應的logical decoding plugin 的相關參數
-
stream receive changes
-
send changes to external consumers
這邊唯一還沒解釋概念是,那 logical decoding plugin 這段是要怎麼處理?是已經有現成官方工具嗎?答案是的。官方目前有提供 test_decoding
及 pgoutput
,後面會來示範怎麼用。而在之前有名的 decoding 有 wal2json 還有現在 pglogical 本身提供的 pglogical_output。
-
test_decoding:test_decoding 本身主要是拿來 debug 或是展示用的而已,可以將 WAL decode 成 text 的格式,source code 可參考:https://github.com/postgres/postgres/blob/master/contrib/test_decoding/test_decoding.c
-
pgoutput:pgoutput 則是官方主要拿來實作在 Postgres 11 之後提供的 logical replication 的 publication & subscription 的使用方式,因此使用 pgoutput 就一定要搭配 publication
-
wal2json:顧名思義是 decode 成 json 的格式,屬於第三方的套件
-
pglogical_output:這個是來自於 pglogical 實現 logical replication 的 decode plugin,其實實作類似於 pgoutput
後面會在做這些 decoding 的示範。
replica identity 概念
在用 logical replication 可能會冒出一個問題是,有沒有辦法知道 update 跟 delete 之前的 old value,方便去做新舊之間的比對的應用?是可以透過 ALTER 每一個 TABLE 的 Replica Identity 的值去設定說有什麼資訊要寫入 WAL Log 來幫助去識別是 update /delete 哪一個 row,而 Replica Identity 有以下四種 mode:
-
DEFAULT:記錄該舊 row 的 pk 去識別要 update /delete 哪一個舊 row,但是無法拿到其他非 pk 欄位的舊值,只能拿到要更新的新值。預設非 system table 都是採用這種 mode。
-
USING INDEX index_name:記錄該舊 row 的 index 去識別 update /delete 哪一個舊 row,因此該 index 必須是 unique 的,且不能是 partial 及 deferrable 且 index 的 column 必須是 not null,如果該 index 被刪掉,那行為就會變成是 NOTHING Mode。所以該 mode 只能拿到 index 欄位的 old value 而無法拿到其他非 index 欄位的舊值,只能拿到要更新的新值。
-
FULL:記錄所有 column 的舊值,想當然這個 mode 既能拿到舊 value 也能拿到新的 value,但就是會更花空間及資源,對於大量更新的表,會消耗不少資源。
-
NOTHING:這種則是相較於 DEFAULT 少了 old value 的值,只會出現 new value 的值。預設是 system table 都是採用這種 mode。
看到這裡,可能會想到那 toast 性質的 column 呢?詳細可參考: https://www.postgresql.org/docs/current/storage-toast.html
簡單來說,因為 column 本身的資料過大,postgres 會選擇將其資料存在另外一個 table 上,而如果在 Default mode 的情況下,除非是更新對應的欄位,才會將 new value 帶過去,但是 old value 不會帶過去。要解決這個問題一樣是透過開啟 full mode 來迫使會帶上 toast column 的 old value 過去。
test_decoding 示範
官方本身就有提供一些 test_decoding 的範例了,所以這邊的示範就從那邊拿來用。
首先要先將 postgresql.conf 的 wal_level 改為 logical。
1 | -- 建立 logica |
pgoutput 示範
pgoutput 比較特別它需要先建立 publication,是因為 pgoutput 本身就是拿來做 pub & sub 的 logical replication 的機制的
1 | -- 建立 publication |
因此在 PG 14 之前其實 binary format 是不支援在 logical replication,它的效能會比較差。
總結
這篇文章是介紹了 logical decoding 的概念,要先知道有這個概念才可以知道之後的 logical replication 是怎樣變化而來的,而透過 logical decoding 的方式,我們就可以撰寫自己的 plugin,同時也可以自己捕捉 CDC event 去實作自己的 feature。
但根據這個特性我們可以知道 logical replication 始終有個硬傷就是無法支援 DDL 操作的複製,因此下篇文章,我們會先來帶帶如和透過 go 來實現 CDC event 的捕捉。