pgcapture - DDL 捕捉
延續上一篇講 pgcapture 的架構介紹與使用方式,接下來會開一系列文章講解 pgcapture 重要的實作細節。今天這篇文章先來講講 pgcapture 是如何做到 DDL 捕捉的。
pgcapture 在使用之前本身需要裝兩個 extension,一個是 pglogical 一個是 pgcapture,我們知道 pglogical 是為了要用到 pglogical_output 來做 logical decoding,而 pgcapture extension 就是為了做到 DDL 補捉而誕生的。
pgcapture extension 實作細節
pgcapture extension 所有的實作 code 都在 https://github.com/rueian/pgcapture/tree/master/hack/postgres/extension 裡面。
核心要看的就是 pgcapture.c 跟 pgcapture–0.1.sql:
以下是 pgcapture.c 全部的程式碼,為了方便解釋直接在城市馬上用註解說明:
1 | // pgcapture.c 的實作是參考 https://github.com/enova/pgl_ddl_deploy 的實作 |
看完這兩個 function 接著來看 pgcapture.sql:
1 | -- complain if script is sourced in psql, rather than via CREATE EXTENSION |
核心的話就是兩個 Table,分別是 ddl_logs 跟 sources,這兩者的作用分別是:
- ddl_logs table 主要是為了紀錄所有 DDL 操作,藉此丟到下游的 consumer 達到可以複製 DDL 的操作
- sources 則是下游的 PG 紀錄當前 apply 到哪一個 message 是為了方便做之後 resume 知道從哪一個 message 開始 apply 下去
前面我們提到那兩個 function 就是為了 ddl_logs table 做鋪路,你可以看到下面有兩個 trigger 分別是:pgcapture_ddl_command_start 跟 pgcapture_ddl_command_end 就是設定當聽到有 DDL 的操作的時候,啟動 trigger 並且執行 pgcapture.log_ddl () 這個 stored procedure,而其 stored proceduer 其實用到就是前面提到那兩個 c function。
這邊可能會疑惑為什麼 DDL 操作的 trigger 需要分 start 跟 end 的兩種情況。
原因是 DDL 操作裡面有比較特別兩種操作:CREATE TABLE AS 跟 SELECT INTO。這兩個 SQL 操作是包含三步驟的:
- CREATE TABLE
- SELECT 其他的 TABLE 的資料
- INSERT 這些資料到新建立的 TABLE
因為 pgcapture 設計方式是這樣的,希望在聽 logical decoding 的時候,總是先聽到 DDL 操作在聽到 DML 操作,原因在於 pgcapture 中有一個 PGXSchemaLoader 會用來存所有 table 底下的每一個 column 對應的 oid,而 oid 是來自於 pg_catalog schema 中的 pg_type table 的 oid 其紀錄就是 oid 所對應的 type 類型,例如 bool, bytea 等等的。
並且 oid 會跟著 change 傳到下游,根據 source code 可以發現 oid 是拿來做下游的 INSERT/UPDATE/DELETE 的時候,會呼叫 pgx 的 ExecParams function,這邊的 ExecParams 是 low level 包裝 pg connection 的,可以直接提供 oid 讓 PG 不用處理 parse 這個 column 是什麼 type 的操作以及可以指令每一個參數要用 binary 還是 text format。
詳細的 message protocol 可以參考:https://www.postgresql.org/docs/12/protocol-message-formats.html
回過頭來說如果 CREATE TABLE AS 放在 end 的話,會變成 logical decoding 那邊先聽到 INSERT 操作在聽到 ddl_logs 的操作,如此以來會因為無法找到 oid 所以 decode 那邊會 ignore 掉:
1 | func (p *PGLogicalDecoder) makePBTuple(rel Relation, src []Field, noNull bool) (fields []*pb.Field) { |
而 DDL 總是希望能在 DML 之前的實作是在這:
1 | if msg := m.GetChange(); msg != nil { |
這就是為什麼 CREATE TABLE AS 跟 SELECT INTO 比較特別要放在 start 的原因。而另外兩個 ‘DROP TRIGGER’, ‘DROP FUNCTION’ 應該是可以改放到 end 的,跟原作者討論後感覺沒其他 side effect。
而怎麼判斷是 DDL 操作的 change 呢?其實只要判斷是來自於 ddl_logs 即可:
1 | func IsDDL(m *pb.Change) bool { |
另外還有一件事情值得討論的是,如果是執行 CREATE TABLE AS 的操作的話,目前的 change 順序會是:
- ddl_logs change
- insert change
而下游在拿到 ddl_logs change 的時候就會執行 CREATE TABLE AS 指令,所以也跑了 INSERT 了,但問題在於如何避免後面來到 INSERT change 呢?這邊不能繼續接著 insert 會造成 double insert,會造成這個原因是我們無法避免 PG Logical decode 產生既有的 INSERT change,但是我們又希望能複製 DDL 操作,但因為 CREATE TABLE AS 的特性,紀錄在 ddl_logs table 的資料是完整的 query,現階段無法只做 CREATE TABLE 的部分,而不做 INSERT。
所以在 sink postgres 做了 skip 的判斷,來判斷要不要 skip 後續 insert 操作:
1 | case *pb.Message_Change: |
- 在拿到 change 的時候會先判斷是否是 DDL 的 change,如果是的話會執行 handleDDL
- 如果不是則會判斷 skip 是否需要 ignore change,而 skip 裡面存的是一個 map 其 key 是 change 的 schema + table name
那 skip 的值怎麼拿到的?
來自於 handleDDL 裡面會做 parseDDL 的步驟,接著 parseDDL 會去解析裡面是否有 DDL + DML mixed 的情況,並且將裡面的每一個 statement 的 schema + table name 存在 skip 裡面:
1 | func (p *PGXSink) parseDDL(fields []*pb.Field) (command string, relations map[string]bool, count int, err error) { |
最後在 handleDDL 裡面會將這些 relations 存在 skip 上:
1 | func (p *PGXSink) handleDDL(m *pb.Change) (err error) { |
這邊也講一下為什麼會需要判斷 checksum,原因是為了避免 double 執行 DDL 行為,這種情況是因為 DDL 操作是來自於 event trigger 的設計,假設你今天是一個 query 內含多個 DDL 行為,其 ddl_logs 就會跟著多個 record,可是就會一併傳給下遊,為了避免重複執行導致資料有錯,或是 PG 噴錯,因此才透過 checksum 的方式去避免。
pgcapture extension 的限制
前面我們介紹了 pgcapture extension 之所以可以複製 DDL 的原理,那現在來說說現在版本的限制:
-
如果你是 DDL + DML mixed 的情況:
1
2
3
4
5
6
7CREATE TABLE IF NOT EXISTS foo (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
name TEXT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO foo (name) VALUES ('kenny');如果是這種 case 的話,因為 ddl_logs 會把整個 query 記錄下來,來讓下遊執行這個 query 會導致 default value 的值無法被確定。
一般來說 logical decoding 拿到的 change 是可以拿到實值的,所以不會有 default value 的問題,但因為 DDL 這邊複製的實作關係,沒辦法將 DML 與 DDL 拆開,所以導致 default value 會上下游不一致的問題產生
-
stored procedure 中一樣很難去防範 DDL + DML 的操作
總結
這篇文章主要講解了 pgcapture extension 是怎麼做到複製 DDL 操作的,也講了當前版本的一些限制,不過相信還是有改善的空間的。
下一篇文章來談談 logical decoding 那邊處理的細節。