介紹 postgres logical decoding 的概念與示範

在之前的文章有介紹過 logical replication 的概念,但其實 logical 這個概念是從 Postgres 9.4 開始提供 logical decoding 功能而來的,也是因為在 9.4 提供這個功能後,開始能夠利用這個特性實現自己的 logical replication。因此今天來介紹一下,什麼是 logical decoding,它的運作原理是什麼以及示範一下用法。

logical decoding concept

概念說起來可以很簡單:

  1. Decode WAL 資料來取得 DML changes,也就是可以拿到 INSERT、DELETE、UPDATE SQL 操作的資料,有點類似 trigger event 的意思,透過接收這種 event 你可以拿到相關的資料

  2. 將 changes stream 傳給 receiver,這邊的 receiver 也就是接收 replication conn 的 client,後面會在提到更詳細。

  3. 而 receiver 拿到 changes 後就可以選擇傳給下游的 external consumers,這就是為何可以透過這種概念來實作自己的 logical replication。而官方提供的 publish & subscribe 的 logical replication 機制是在 11 才推出的,因此在 11 之前就有透過這樣的方式來實現 logical replication,例如知名的 pglogical,那比 9.4 更早的版本則是會透過 trigger event 來實現,例如知名的 slony

logical replication slot

那我們來一一拆解這三個步驟是怎麼運作的:

首先要先認識 replication slots 的概念,如果有用過 Postgres 原生提供的 Physical Replication,就會知道有個難題要處理 archive 的機制,為什麼要處理?是因為當今天 replica 如果跟不上 primary 的速度,但 primary 就已經將過期的 wal 給清掉,那 replica 就無法拿到之前的資料了。因此有兩個方式是透過 archive_command 或是 replication slot 的機制。這邊的關於 physical replication 的機制可以參考這篇 Postgres - 設定 physical replication 教學 | Kenny’s Blog

。replication slot 本身就是存放 WAL 的地方,只有當 replica 有去 consume 後,replication slot 才會把對應的 WAL 給清掉。

因此,回過來說,有 physical replication slot 就會有 logical replication slot,因此 primary 做了任何的 DML changes 就會將對應的 WAL 存在 logical replication slot 裡面。一個 primary 裡面可以有包含多個 logical replication slot,每個 replication slot 有自己的狀態,通常一個 logical replication slot 只會被一個 replica receiver 所接收,你也可以多個 receiver 去對應一個 logical replication slot,但是同一時間只能有一個 receiver 可以去接收 slot 給出來的 changes。

而 logical decoding 的使用時機就是 replication slot -> logical decoding -> replica receiver,而 replica receiver 並不用真的要我們去建立一個新的 PG 去做設定,而是將 receiver 這段可以換成我們自己實作的 external consumer,所以你如果要藉此同步另外一個 PG 也是可行的,一樣的道理。

你可能會問,今天如果當 receiver 這邊斷線了,想要從上次斷掉的數據開始讀 changes 是有辦法的嗎?答案是可以的,原因在於 replication slot 本身有一個 identifier,這是一個標誌且在 PostgreSQL cluster 是唯一的。在啟動 receiver 之前要先連到 postgres://postgres@postgres_source:5432/postgres?sslmode=disable&replication=database 透過帶上 replication=database 參數,讓 PG 知道該連線是關於 replication connection 的連線。其實就是參考這個 Streaming Replication Protocol),PG 收到這個 connection 會進去 wal sender 的模式,接著你可以對 PG 送 IDENTIFY_SYSTEM 命令,PG 會給你當前 Xlogpos,也就是當前 WAL 的 flush location,這個就是 receiver 該拿這個值去跟 PG 説,我要拿這個位置之後的 WAL 資料,而 receiver 的 identifier 如果不符合對應的 replication slot 的 identifier 就無法成功開始 replication 機制。

總的來說這邊 receiver 的實際運作流程會是:

  1. setup replication conn

  2. create replication slot 這邊要帶上要使用哪個 logical decoding plugin name,而如果已經建立的話,則判斷 pg error code 不等於 42710 ,這邊可參考 PostgreSQL Error Codes

    例如:

    1
    SELECT * FROM pg_create_logical_replication_slot('logical_slot', 'test_decoding'); 

    在 primary 的 postgresql.conf 需要設定 max_replication_slots > 0wal_level = logical

  3. identify system

  4. start replication 這邊要帶上當前的 Xlogposreplication slot name、還要對應的 logical decoding plugin 的相關參數

  5. stream receive changes

  6. send changes to external consumers

這邊唯一還沒解釋概念是,那 logical decoding plugin 這段是要怎麼處理?是已經有現成官方工具嗎?答案是的。官方目前有提供 test_decodingpgoutput,後面會來示範怎麼用。而在之前有名的 decoding 有 wal2json 還有現在 pglogical 本身提供的 pglogical_output

  1. test_decoding:test_decoding 本身主要是拿來 debug 或是展示用的而已,可以將 WAL decode 成 text 的格式,source code 可參考:https://github.com/postgres/postgres/blob/master/contrib/test_decoding/test_decoding.c

  2. pgoutput:pgoutput 則是官方主要拿來實作在 Postgres 11 之後提供的 logical replication 的 publication & subscription 的使用方式,因此使用 pgoutput 就一定要搭配 publication

  3. wal2json:顧名思義是 decode 成 json 的格式,屬於第三方的套件

  4. pglogical_output:這個是來自於 pglogical 實現 logical replication 的 decode plugin,其實實作類似於 pgoutput

後面會在做這些 decoding 的示範。

replica identity 概念

在用 logical replication 可能會冒出一個問題是,有沒有辦法知道 update 跟 delete 之前的 old value,方便去做新舊之間的比對的應用?是可以透過 ALTER 每一個 TABLE 的 Replica Identity 的值去設定說有什麼資訊要寫入 WAL Log 來幫助去識別是 update /delete 哪一個 row,而 Replica Identity 有以下四種 mode:

  1. DEFAULT:記錄該舊 row 的 pk 去識別要 update /delete 哪一個舊 row,但是無法拿到其他非 pk 欄位的舊值,只能拿到要更新的新值。預設非 system table 都是採用這種 mode。

  2. USING INDEX index_name:記錄該舊 row 的 index 去識別 update /delete 哪一個舊 row,因此該 index 必須是 unique 的,且不能是 partial 及 deferrable 且 index 的 column 必須是 not null,如果該 index 被刪掉,那行為就會變成是 NOTHING Mode。所以該 mode 只能拿到 index 欄位的 old value 而無法拿到其他非 index 欄位的舊值,只能拿到要更新的新值。

  3. FULL:記錄所有 column 的舊值,想當然這個 mode 既能拿到舊 value 也能拿到新的 value,但就是會更花空間及資源,對於大量更新的表,會消耗不少資源。

  4. NOTHING:這種則是相較於 DEFAULT 少了 old value 的值,只會出現 new value 的值。預設是 system table 都是採用這種 mode。

看到這裡,可能會想到那 toast 性質的 column 呢?詳細可參考: https://www.postgresql.org/docs/current/storage-toast.html

簡單來說,因為 column 本身的資料過大,postgres 會選擇將其資料存在另外一個 table 上,而如果在 Default mode 的情況下,除非是更新對應的欄位,才會將 new value 帶過去,但是 old value 不會帶過去。要解決這個問題一樣是透過開啟 full mode 來迫使會帶上 toast column 的 old value 過去。

test_decoding 示範

官方本身就有提供一些 test_decoding 的範例了,所以這邊的示範就從那邊拿來用。

首先要先將 postgresql.conf 的 wal_level 改為 logical。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
-- 建立 logica
l slot
SELECT * FROM pg_create_logical_replication_slot('myslot', 'test_decoding');
slot_name | lsn
-----------+-----------
myslot | 0/17582A8
(1 row)


-- 檢查目前有哪些 slot
SELECT * FROM pg_replication_slots;
-[ RECORD 1 ]-------+--------------
slot_name | myslot
plugin | test_decoding
slot_type | logical
datoid | 13364
database | postgres
temporary | f
active | f
active_pid |
xmin |
catalog_xmin | 581
restart_lsn | 0/1758270
confirmed_flush_lsn | 0/17582A8


-- 取得當前 slot 的所有 changes 並 consume
SELECT * FROM pg_logical_slot_get_changes('myslot', NULL, NULL);
(0 rows)


-- 建立 data table
CREATE TABLE data(id serial primary key, data text);
CREATE TABLE

-- 因為上一個 change 是 DDL 操作,所以這邊只能看到 BEGIN & COMMIT 無法看到實際的內容,所以 Postgres logical replication 是不支援 --DL 的
SELECT * FROM pg_logical_slot_get_changes('myslot', NULL, NULL);
-[ RECORD 1 ]----
lsn | 0/17583D8
xid | 581
data | BEGIN 581
-[ RECORD 2 ]----
lsn | 0/1779670
xid | 581
data | COMMIT 581


-- 因為上一個動作取得 change 並且 consume 了,所以在 get change 會沒有資料
SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
(0 rows)

-- 插入兩筆資料
BEGIN;
INSERT INTO data(data) VALUES('1');
INSERT INTO data(data) VALUES('2');
COMMIT;

-- 再次 get change,可以取得 Begin 到 commit 之間所有的 insert 資料
SELECT * FROM pg_logical_slot_get_changes('myslot', NULL, NULL);
-[ RECORD 1 ]-------------------------------------------------
lsn | 0/177B708
xid | 582
data | BEGIN 582
-[ RECORD 2 ]-------------------------------------------------
lsn | 0/177B770
xid | 582
data | table public.data: INSERT: id[integer]:1 data[text]:'1'
-[ RECORD 3 ]-------------------------------------------------
lsn | 0/177B888
xid | 582
data | table public.data: INSERT: id[integer]:2 data[text]:'2'
-[ RECORD 4 ]-------------------------------------------------
lsn | 0/177B938
xid | 582
data | COMMIT 582

-- 插入第三筆資料
INSERT INTO data(data) VALUES('3');

-- 使用 peek 來取得 change 但是不 consume
SELECT * FROM pg_logical_slot_peek_changes('myslot', NULL, NULL);
-[ RECORD 1 ]-------------------------------------------------
lsn | 0/177B970
xid | 583
data | BEGIN 583
-[ RECORD 2 ]-------------------------------------------------
lsn | 0/177B970
xid | 583
data | table public.data: INSERT: id[integer]:3 data[text]:'3'
-[ RECORD 3 ]-------------------------------------------------
lsn | 0/177BA20
xid | 583
data | COMMIT 583

-- 在 peek 一次一樣可以拿到 change
SELECT * FROM pg_logical_slot_peek_changes('myslot', NULL, NULL);
-[ RECORD 1 ]-------------------------------------------------
lsn | 0/177B970
xid | 583
data | BEGIN 583
-[ RECORD 2 ]-------------------------------------------------
lsn | 0/177B970
xid | 583
data | table public.data: INSERT: id[integer]:3 data[text]:'3'
-[ RECORD 3 ]-------------------------------------------------
lsn | 0/177BA20
xid | 583
data | COMMIT 583

-- 取得 binary format 的 change
SELECT * FROM pg_logical_slot_get_binary_changes('myslot', NULL, NULL);
-[ RECORD 1 ]----------------------------------------------------------------------------------------------------------
lsn | 0/177B970
xid | 583
data | \x424547494e20353833
-[ RECORD 2 ]----------------------------------------------------------------------------------------------------------
lsn | 0/177B970
xid | 583
data | \x7461626c65207075626c69632e646174613a20494e534552543a2069645b696e74656765725d3a3320646174615b746578745d3a273327
-[ RECORD 3 ]----------------------------------------------------------------------------------------------------------
lsn | 0/177BA20
xid | 583
data | \x434f4d4d495420353833

-- update data
UPDATE data set data = 4 where id = 3;

-- 取得 change 可以發現帶過來的值只有 pk 及 new value 是沒有 old value 的
SELECT * FROM pg_logical_slot_get_changes('myslot', NULL, NULL);
-[ RECORD 1 ]-------------------------------------------------
lsn | 0/177BB38
xid | 584
data | BEGIN 584
-[ RECORD 2 ]-------------------------------------------------
lsn | 0/177BB38
xid | 584
data | table public.data: UPDATE: id[integer]:3 data[text]:'4'
-[ RECORD 3 ]-------------------------------------------------
lsn | 0/177BC60
xid | 584
data | COMMIT 584

-- delete data
DELETE FROM data where id = 3;

-- 取得 change 可以發現只會帶上 pk 也沒有 old value 的
SELECT * FROM pg_logical_slot_get_changes('myslot', NULL, NULL);
-[ RECORD 1 ]----------------------------------
lsn | 0/177BC98
xid | 585
data | BEGIN 585
-[ RECORD 2 ]----------------------------------
lsn | 0/177BC98
xid | 585
data | table public.data: DELETE: id[integer]:3
-[ RECORD 3 ]----------------------------------
lsn | 0/177BD08
xid | 585
data | COMMIT 585

-- alter table replica identity = full
ALTER TABLE data REPLICA IDENTITY FULL;

-- update data
UPDATE data set data = 3 where id = 2;

-- 取得 change 可以發現帶過來值直接分為 old & new tuple 因為需要用全部的 column 做標識,所以 old value 都會帶過來
SELECT * FROM pg_logical_slot_get_changes('myslot', NULL, NULL);
-[ RECORD 1 ]--------------------------------------------------------------------------------------------------
lsn | 0/177BE20
xid | 586
data | BEGIN 586
-[ RECORD 2 ]--------------------------------------------------------------------------------------------------
lsn | 0/177CBB8
xid | 586
data | COMMIT 586
-[ RECORD 3 ]--------------------------------------------------------------------------------------------------
lsn | 0/177CBF0
xid | 587
data | BEGIN 587
-[ RECORD 4 ]--------------------------------------------------------------------------------------------------
lsn | 0/177CBF0
xid | 587
data | table public.data: UPDATE: old-key: id[integer]:2 data[text]:'2' new-tuple: id[integer]:2 data[text]:'3'
-[ RECORD 5 ]--------------------------------------------------------------------------------------------------
lsn | 0/177CD48
xid | 587
data | COMMIT 587

-- delete data
delete from data where id = 2;

-- 取得 change,可以發現帶上了 old value
SELECT * FROM pg_logical_slot_get_changes('myslot', NULL, NULL);
-[ RECORD 1 ]-------------------------------------------------
lsn | 0/177CD80
xid | 588
data | BEGIN 588
-[ RECORD 2 ]-------------------------------------------------
lsn | 0/177CD80
xid | 588
data | table public.data: DELETE: id[integer]:2 data[text]:'3'
-[ RECORD 3 ]-------------------------------------------------
lsn | 0/177CDF8
xid | 588
data | COMMIT 588

-- 實驗完畢,drop replication slot
SELECT pg_drop_replication_slot('myslot');
-[ RECORD 1 ]------------+-
pg_drop_replication_slot |

pgoutput 示範

pgoutput 比較特別它需要先建立 publication,是因為 pgoutput 本身就是拿來做 pub & sub 的 logical replication 的機制的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-- 建立 publication
CREATE PUBLICATION pub FOR ALL TABLES;
CREATE PUBLICATION

-- 建立 slot
SELECT * FROM pg_create_logical_replication_slot('myslot', 'pgoutput');
-[ RECORD 1 ]--------
slot_name | myslot
lsn | 0/177CF48

-- 將 table 資料 drop 掉;
DROP table data;

-- 取的 binary change 這邊比較特別的是 pgoutput 支援的是 binary format protocol
-- 但是在 PG 14 之前其實底層還是用 text format,只是到這邊有轉成 binary format
-- 需要指定 proto_version = 1 & publication_names 是指哪一個 pub
SELECT * FROM pg_logical_slot_get_binary_changes('myslot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub');
lsn | xid | data
-----------+-----+--------------------------------------------------------
0/1788150 | 590 | \x420000000001788c00000297f400e3cab70000024e
0/1788C78 | 590 | \x43000000000001788c000000000001788c78000297f400e3cab7

因此在 PG 14 之前其實 binary format 是不支援在 logical replication,它的效能會比較差。

總結

這篇文章是介紹了 logical decoding 的概念,要先知道有這個概念才可以知道之後的 logical replication 是怎樣變化而來的,而透過 logical decoding 的方式,我們就可以撰寫自己的 plugin,同時也可以自己捕捉 CDC event 去實作自己的 feature。

但根據這個特性我們可以知道 logical replication 始終有個硬傷就是無法支援 DDL 操作的複製,因此下篇文章,我們會先來帶帶如和透過 go 來實現 CDC event 的捕捉。