pgcapture - DDL 捕捉

延續上一篇講 pgcapture 的架構介紹與使用方式,接下來會開一系列文章講解 pgcapture 重要的實作細節。今天這篇文章先來講講 pgcapture 是如何做到 DDL 捕捉的。

pgcapture 在使用之前本身需要裝兩個 extension,一個是 pglogical 一個是 pgcapture,我們知道 pglogical 是為了要用到 pglogical_output 來做 logical decoding,而 pgcapture extension 就是為了做到 DDL 補捉而誕生的。

pgcapture extension 實作細節

pgcapture extension 所有的實作 code 都在 https://github.com/rueian/pgcapture/tree/master/hack/postgres/extension 裡面。

核心要看的就是 pgcapture.c 跟 pgcapture–0.1.sql:

以下是 pgcapture.c 全部的程式碼,為了方便解釋直接在城市馬上用註解說明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// pgcapture.c 的實作是參考 https://github.com/enova/pgl_ddl_deploy 的實作
// 其實 pgl_ddl_deploy 本身就是在做 DDL logical replication 的行為,可以 support 原生的 logical
// replication 加入可以複製 DDL 的行為,此外也做了很多特製的功能,例如可以設定某些 table 才需要 DDL replication
// 之類的,這邊就不細講。
// 之所以不直接使用 pgl_ddl_deploy 的實作是因為,pgcapture 的功能是為了實作整套的 CDC 並不只是單純的 pg2pg 的
// 這種 logical replication 加上通常需要複製全部 Table DDL replication,所以才不用原本的 pgl_ddl_deploy 實作
// source: https://github.com/enova/pgl_ddl_deploy/blob/master/pgl_ddl_deploy.c

// 這些全部就是實作 pgcapture extension 所需要 lib
#include "postgres.h"
#include "fmgr.h"
#include "catalog/pg_type.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "parser/parser.h"

PG_MODULE_MAGIC;

PG_FUNCTION_INFO_V1(pgl_ddl_deploy_current_query);
PG_FUNCTION_INFO_V1(sql_command_tags);

// 這個常數是為了存 debug_query_string 而存在的,至於什麼是 debug_query_string 後面會說
/* Our own version of debug_query_string - see below */
const char *pgl_ddl_deploy_debug_query_string;

/*
* A near-copy of the current_query postgres function which caches the value, ignoring
* the change if the string is changed to NULL, as is being done in pglogical 2.2.2.
* This allows multiple subsequent calls to pglogical.replicate_ddl_command without
* losing access to current_query.
*
* Please revisit if pglogical changes behavior and stop setting debug_query_string to NULL.
*/
// 這個 function 是為了要獲取當下 query 的 SQL 語句,因為如果我們拿到 raw SQL 就可以繼續解析,看是不是屬於 DDL
// SQL 語句
Datum
pgl_ddl_deploy_current_query(PG_FUNCTION_ARGS)
{
// PostgreSQL 在執行 SQL 時會將當前的 SQL string 存在 debug_query_string global variable 中。
// 這個 global variable 是在 PostgreSQL 內部定義的,用途就是儲存當前的原始 SQL 字串
// 所以這邊先判斷 debug_query_string 有沒有設定,有的話將值 cache 在
// pgl_ddl_deploy_debug_query_string,並且 return debug_query_string 的值
/* If debug_query_string is set, we always want the same value */
if (debug_query_string)
{
pgl_ddl_deploy_debug_query_string = debug_query_string;
PG_RETURN_TEXT_P(cstring_to_text(pgl_ddl_deploy_debug_query_string));
}
// 這邊主要是為了解決 debug_query_string 可能會被設定成 null 是因為 pglogical 本身的實作關係
// 所以如果 debug_query_string is NULL 但是 pgl_ddl_deploy_debug_query_string 有值
// 那就直接將 pgl_ddl_deploy_debug_query_string 的值回傳即可
/* If it is NULL, we want to take pgl_ddl_deploy_debug_query_string instead,
which in most cases in this code path is used in pgl_ddl_deploy we expect
is because pglogical has reset the string to null. But we still want to
return the same value in this SQL statement we are executing.
*/
else if (pgl_ddl_deploy_debug_query_string)
{
PG_RETURN_TEXT_P(cstring_to_text(pgl_ddl_deploy_debug_query_string));
}
// 如果 debug_query_string == null && pgl_ddl_deploy_debug_query_string == null
// 那就回傳 null 即可。
else
/* If both are NULL, that is legit and we want to return NULL. */
{
PG_RETURN_NULL();
}
}

// 這個 function 主要是將接收到 SQL 語句進行 command tag 的分類
// 分類的意思是說 parse 整個 sql statement 並且分類出是這個 SQL statement 是 CREATE TABLE 還是 INSERT 的
// command tag
/*
* Return a text array of the command tags in SQL command
*/
Datum
sql_command_tags(PG_FUNCTION_ARGS)
{
text *sql_t = PG_GETARG_TEXT_P(0);
char *sql;
List *parsetree_list;
ListCell *parsetree_item;
const char *commandTag;
ArrayBuildState *astate = NULL;

/*
* Get the SQL parsetree
*/
sql = text_to_cstring(sql_t);
parsetree_list = pg_parse_query(sql);

/*
* Iterate through each parsetree_item to get CommandTag
*/
foreach(parsetree_item, parsetree_list)
{
Node *parsetree = (Node *) lfirst(parsetree_item);
#if PG_VERSION_NUM >= 130000
commandTag = CreateCommandName(parsetree);
#else
commandTag = CreateCommandTag(parsetree);
#endif
astate = accumArrayResult(astate, CStringGetTextDatum(commandTag),
false, TEXTOID, CurrentMemoryContext);
}
if (astate == NULL)
elog(ERROR, "Invalid sql command");
PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate, CurrentMemoryContext));
}

看完這兩個 function 接著來看 pgcapture.sql:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pgcapture" to load this file. \quit

CREATE TABLE pgcapture.ddl_logs (id SERIAL PRIMARY KEY, query TEXT, tags TEXT[], activity JSONB);
CREATE TABLE pgcapture.sources (id TEXT PRIMARY KEY, commit pg_lsn, seq int, commit_ts timestamptz, mid bytea, apply_ts timestamptz DEFAULT CURRENT_TIMESTAMP);

CREATE FUNCTION pgcapture.current_query()
RETURNS TEXT AS
'MODULE_PATHNAME', 'pgl_ddl_deploy_current_query'
LANGUAGE C VOLATILE STRICT;

CREATE FUNCTION pgcapture.sql_command_tags(p_sql TEXT)
RETURNS TEXT[] AS
'MODULE_PATHNAME', 'sql_command_tags'
LANGUAGE C VOLATILE STRICT;

CREATE FUNCTION pgcapture.log_ddl() RETURNS event_trigger AS $$
declare
qstr TEXT;
tags TEXT[];
acti JSONB;
begin
qstr = pgcapture.current_query();
tags = pgcapture.sql_command_tags(qstr);
select row_to_json(a.*) into acti from (select datname,usename,application_name,client_addr,backend_start,xact_start from pg_stat_activity where pid = pg_backend_pid()) a;
insert into pgcapture.ddl_logs(query, tags, activity) values (qstr,tags,acti);
end;
$$ LANGUAGE plpgsql STRICT;

CREATE EVENT TRIGGER pgcapture_ddl_command_start ON ddl_command_start WHEN tag IN (
'SELECT INTO',
'DROP TRIGGER',
'DROP FUNCTION',
'CREATE TABLE AS'
) EXECUTE PROCEDURE pgcapture.log_ddl();
CREATE EVENT TRIGGER pgcapture_ddl_command_end ON ddl_command_end WHEN TAG IN (
'CREATE TABLE AS',
'ALTER AGGREGATE',
'ALTER COLLATION',
'ALTER CONVERSION',
'ALTER DOMAIN',
'ALTER DEFAULT PRIVILEGES',
'ALTER EXTENSION',
'ALTER FOREIGN DATA WRAPPER',
'ALTER FOREIGN TABLE',
'ALTER FUNCTION',
'ALTER LANGUAGE',
'ALTER LARGE OBJECT',
'ALTER MATERIALIZED VIEW',
'ALTER OPERATOR',
'ALTER OPERATOR CLASS',
'ALTER OPERATOR FAMILY',
'ALTER POLICY',
'ALTER SCHEMA',
'ALTER SEQUENCE',
'ALTER SERVER',
'ALTER TABLE',
'ALTER TEXT SEARCH CONFIGURATION',
'ALTER TEXT SEARCH DICTIONARY',
'ALTER TEXT SEARCH PARSER',
'ALTER TEXT SEARCH TEMPLATE',
'ALTER TRIGGER',
'ALTER TYPE',
'ALTER USER MAPPING',
'ALTER VIEW',
'COMMENT',
'CREATE ACCESS METHOD',
'CREATE AGGREGATE',
'CREATE CAST',
'CREATE COLLATION',
'CREATE CONVERSION',
'CREATE DOMAIN',
'CREATE EXTENSION',
'CREATE FOREIGN DATA WRAPPER',
'CREATE FOREIGN TABLE',
'CREATE FUNCTION',
'CREATE INDEX',
'CREATE LANGUAGE',
'CREATE MATERIALIZED VIEW',
'CREATE OPERATOR',
'CREATE OPERATOR CLASS',
'CREATE OPERATOR FAMILY',
'CREATE POLICY',
'CREATE RULE',
'CREATE SCHEMA',
'CREATE SEQUENCE',
'CREATE SERVER',
'CREATE TABLE',
'CREATE TEXT SEARCH CONFIGURATION',
'CREATE TEXT SEARCH DICTIONARY',
'CREATE TEXT SEARCH PARSER',
'CREATE TEXT SEARCH TEMPLATE',
'CREATE TRIGGER',
'CREATE TYPE',
'CREATE USER MAPPING',
'CREATE VIEW',
'DROP ACCESS METHOD',
'DROP AGGREGATE',
'DROP CAST',
'DROP COLLATION',
'DROP CONVERSION',
'DROP DOMAIN',
'DROP EXTENSION',
'DROP FOREIGN DATA WRAPPER',
'DROP FOREIGN TABLE',
'DROP INDEX',
'DROP LANGUAGE',
'DROP MATERIALIZED VIEW',
'DROP OPERATOR',
'DROP OPERATOR CLASS',
'DROP OPERATOR FAMILY',
'DROP OWNED',
'DROP POLICY',
'DROP RULE',
'DROP SCHEMA',
'DROP SEQUENCE',
'DROP SERVER',
'DROP TABLE',
'DROP TEXT SEARCH CONFIGURATION',
'DROP TEXT SEARCH DICTIONARY',
'DROP TEXT SEARCH PARSER',
'DROP TEXT SEARCH TEMPLATE',
'DROP TYPE',
'DROP USER MAPPING',
'DROP VIEW',
'GRANT',
'IMPORT FOREIGN SCHEMA',
'REVOKE',
'SECURITY LABEL'
) EXECUTE PROCEDURE pgcapture.log_ddl();

核心的話就是兩個 Table,分別是 ddl_logs 跟 sources,這兩者的作用分別是:

  1. ddl_logs table 主要是為了紀錄所有 DDL 操作,藉此丟到下游的 consumer 達到可以複製 DDL 的操作
  2. sources 則是下游的 PG 紀錄當前 apply 到哪一個 message 是為了方便做之後 resume 知道從哪一個 message 開始 apply 下去

前面我們提到那兩個 function 就是為了 ddl_logs table 做鋪路,你可以看到下面有兩個 trigger 分別是:pgcapture_ddl_command_start 跟 pgcapture_ddl_command_end 就是設定當聽到有 DDL 的操作的時候,啟動 trigger 並且執行 pgcapture.log_ddl () 這個 stored procedure,而其 stored proceduer 其實用到就是前面提到那兩個 c function。

這邊可能會疑惑為什麼 DDL 操作的 trigger 需要分 start 跟 end 的兩種情況。

原因是 DDL 操作裡面有比較特別兩種操作:CREATE TABLE AS 跟 SELECT INTO。這兩個 SQL 操作是包含三步驟的:

  1. CREATE TABLE
  2. SELECT 其他的 TABLE 的資料
  3. INSERT 這些資料到新建立的 TABLE

因為 pgcapture 設計方式是這樣的,希望在聽 logical decoding 的時候,總是先聽到 DDL 操作在聽到 DML 操作,原因在於 pgcapture 中有一個 PGXSchemaLoader 會用來存所有 table 底下的每一個 column 對應的 oid,而 oid 是來自於 pg_catalog schema 中的 pg_type table 的 oid 其紀錄就是 oid 所對應的 type 類型,例如 bool, bytea 等等的。

並且 oid 會跟著 change 傳到下游,根據 source code 可以發現 oid 是拿來做下游的 INSERT/UPDATE/DELETE 的時候,會呼叫 pgx 的 ExecParams function,這邊的 ExecParams 是 low level 包裝 pg connection 的,可以直接提供 oid 讓 PG 不用處理 parse 這個 column 是什麼 type 的操作以及可以指令每一個參數要用 binary 還是 text format。

詳細的 message protocol 可以參考:https://www.postgresql.org/docs/12/protocol-message-formats.html

回過頭來說如果 CREATE TABLE AS 放在 end 的話,會變成 logical decoding 那邊先聽到 INSERT 操作在聽到 ddl_logs 的操作,如此以來會因為無法找到 oid 所以 decode 那邊會 ignore 掉:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (p *PGLogicalDecoder) makePBTuple(rel Relation, src []Field, noNull bool) (fields []*pb.Field) {
if src == nil {
return nil
}
fields = make([]*pb.Field, 0, len(src))
for i, s := range src {
if noNull && s.Datum == nil {
continue
}
// 這段會找不到 oid 而 ignore 掉這個 field
oid, err := p.schema.GetTypeOID(rel.NspName, rel.RelName, rel.Fields[i])
if err != nil {
// TODO: add optional logging, because it will generate a lot of logs when refreshing materialized view
continue
}
fmt.Println("oid:", oid)
switch s.Format {
case 'b':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: &pb.Field_Binary{Binary: s.Datum}})
case 'n':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: nil})
case 't':
fields = append(fields, &pb.Field{Name: rel.Fields[i], Oid: oid, Value: &pb.Field_Text{Text: string(s.Datum)}})
case 'u':
continue // unchanged toast field should be exclude
}
}
return fields
}

而 DDL 總是希望能在 DML 之前的實作是在這:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if msg := m.GetChange(); msg != nil {
if decode.Ignore(msg) {
return change, nil
} else if decode.IsDDL(msg) {
// 如果是 DDL 會執行 refresh type 將 oid 那些資訊更新在 memory 上
if err = p.schema.RefreshType(); err != nil {
return change, err
}
}
p.currentSeq++
} else if b := m.GetBegin(); b != nil {
p.currentLsn = b.FinalLsn
p.currentSeq = 0
} else if c := m.GetCommit(); c != nil {
p.currentLsn = c.CommitLsn
p.currentSeq++
}

這就是為什麼 CREATE TABLE AS 跟 SELECT INTO 比較特別要放在 start 的原因。而另外兩個 ‘DROP TRIGGER’, ‘DROP FUNCTION’ 應該是可以改放到 end 的,跟原作者討論後感覺沒其他 side effect。

而怎麼判斷是 DDL 操作的 change 呢?其實只要判斷是來自於 ddl_logs 即可:

1
2
3
func IsDDL(m *pb.Change) bool {
return m.Schema == ExtensionSchema && m.Table == ExtensionDDLLogs
}

另外還有一件事情值得討論的是,如果是執行 CREATE TABLE AS 的操作的話,目前的 change 順序會是:

  1. ddl_logs change
  2. insert change

而下游在拿到 ddl_logs change 的時候就會執行 CREATE TABLE AS 指令,所以也跑了 INSERT 了,但問題在於如何避免後面來到 INSERT change 呢?這邊不能繼續接著 insert 會造成 double insert,會造成這個原因是我們無法避免 PG Logical decode 產生既有的 INSERT change,但是我們又希望能複製 DDL 操作,但因為 CREATE TABLE AS 的特性,紀錄在 ddl_logs table 的資料是完整的 query,現階段無法只做 CREATE TABLE 的部分,而不做 INSERT。

所以在 sink postgres 做了 skip 的判斷,來判斷要不要 skip 後續 insert 操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
case *pb.Message_Change:
if !p.inTX {
p.log.WithFields(logrus.Fields{
"MessageLSN": change.Checkpoint.LSN,
"MidHex": hex.EncodeToString(change.Checkpoint.Data),
}).Warn("receive incomplete transaction: change")
break
}
if decode.IsDDL(msg.Change) {
err = p.handleDDL(msg.Change)
} else {
if len(p.skip) != 0 && p.skip[fmt.Sprintf("%s.%s", msg.Change.Schema, msg.Change.Table)] {
p.log.WithFields(logrus.Fields{
"MessageLSN": change.Checkpoint.LSN,
"MidHex": hex.EncodeToString(change.Checkpoint.Data),
"Message": change.Message.String(),
}).Warn("message skipped due to previous commands mixed with DML and DDL")
return nil
}
err = p.handleChange(msg.Change)
}
  1. 在拿到 change 的時候會先判斷是否是 DDL 的 change,如果是的話會執行 handleDDL
  2. 如果不是則會判斷 skip 是否需要 ignore change,而 skip 裡面存的是一個 map 其 key 是 change 的 schema + table name

那 skip 的值怎麼拿到的?

來自於 handleDDL 裡面會做 parseDDL 的步驟,接著 parseDDL 會去解析裡面是否有 DDL + DML mixed 的情況,並且將裡面的每一個 statement 的 schema + table name 存在 skip 裡面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func (p *PGXSink) parseDDL(fields []*pb.Field) (command string, relations map[string]bool, count int, err error) {
for _, field := range fields {
switch field.Name {
case "query":
command = string(field.GetBinary())
goto parse
}
}

parse:
tree, err := pg_query.Parse(command)
if err != nil {
return "", nil, 0, err
}

var stmts []*pg_query.RawStmt
// remove preceding dml commands, they were already present in previous change messages
// remove refresh materialized view stmt, otherwise they will block logical replication
// 這邊的註解寫的情況是像是這樣:
// INSERT INTO foo ...
// CREATE TABLE foo1
// 因為第一個 INSERT INTO 理論上會在前面的 change 就先執行過了,這邊應該要把它忽略掉。
// 而 refresh view 這個忽略則是怕會 block 下游太多時間而選擇拿掉
preceding := true
for _, stmt := range tree.Stmts {
preceding = preceding && (stmt.Stmt.GetInsertStmt() != nil || stmt.Stmt.GetUpdateStmt() != nil || stmt.Stmt.GetDeleteStmt() != nil)
if preceding || stmt.Stmt.GetRefreshMatViewStmt() != nil {
continue
}
stmts = append(stmts, stmt)
}

// record relations appeared in the statement, following change messages should be ignore if duplicated with them
relations = make(map[string]bool)
// 這邊的 relation 就是判斷 DDL + DML mixed 的特別的情況,所以收集這些 relations
for _, stmt := range stmts {
var relation *pg_query.RangeVar
switch node := stmt.Stmt.Node.(type) {
case *pg_query.Node_InsertStmt:
relation = node.InsertStmt.Relation
case *pg_query.Node_UpdateStmt:
relation = node.UpdateStmt.Relation
case *pg_query.Node_DeleteStmt:
relation = node.DeleteStmt.Relation
case *pg_query.Node_CreateTableAsStmt:
relation = node.CreateTableAsStmt.Into.Rel
case *pg_query.Node_SelectStmt:
if node.SelectStmt.IntoClause != nil {
relation = node.SelectStmt.IntoClause.Rel
}
}
if relation == nil {
continue
}
if relation.Schemaname == "" {
relations[fmt.Sprintf("public.%s", relation.Relname)] = true
} else {
relations[fmt.Sprintf("%s.%s", relation.Schemaname, relation.Relname)] = true
}
}

sb := strings.Builder{}
for _, stmt := range stmts {
if stmt.StmtLen == 0 {
sb.WriteString(command[stmt.StmtLocation:])
} else {
sb.WriteString(command[stmt.StmtLocation : stmt.StmtLocation+stmt.StmtLen])
}
sb.WriteString(";")
}

return sb.String(), relations, len(stmts), nil
}

最後在 handleDDL 裡面會將這些 relations 存在 skip 上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (p *PGXSink) handleDDL(m *pb.Change) (err error) {
command, relations, count, err := p.parseDDL(m.New)
if err != nil {
return err
}
if count == 0 {
return nil
}

checksum := crc32.ChecksumIEEE([]byte(command))
if p.prevDDL != checksum {
p.skip = relations
err = p.performDDL(count, command)
}
p.prevDDL = checksum

return err
}

這邊也講一下為什麼會需要判斷 checksum,原因是為了避免 double 執行 DDL 行為,這種情況是因為 DDL 操作是來自於 event trigger 的設計,假設你今天是一個 query 內含多個 DDL 行為,其 ddl_logs 就會跟著多個 record,可是就會一併傳給下遊,為了避免重複執行導致資料有錯,或是 PG 噴錯,因此才透過 checksum 的方式去避免。

pgcapture extension 的限制

前面我們介紹了 pgcapture extension 之所以可以複製 DDL 的原理,那現在來說說現在版本的限制:

  1. 如果你是 DDL + DML mixed 的情況:

    1
    2
    3
    4
    5
    6
    7
    CREATE TABLE IF NOT EXISTS foo (
    id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
    name TEXT NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
    );
    INSERT INTO foo (name) VALUES ('kenny');

    如果是這種 case 的話,因為 ddl_logs 會把整個 query 記錄下來,來讓下遊執行這個 query 會導致 default value 的值無法被確定。

    一般來說 logical decoding 拿到的 change 是可以拿到實值的,所以不會有 default value 的問題,但因為 DDL 這邊複製的實作關係,沒辦法將 DML 與 DDL 拆開,所以導致 default value 會上下游不一致的問題產生

  2. stored procedure 中一樣很難去防範 DDL + DML 的操作

總結

這篇文章主要講解了 pgcapture extension 是怎麼做到複製 DDL 操作的,也講了當前版本的一些限制,不過相信還是有改善的空間的。

下一篇文章來談談 logical decoding 那邊處理的細節。