Postgres - 使用 Golang 及 pgx 實現 CDC

上一篇文章我們知道 logical decoding 的概念,這次我們來示範如何用 golang + pgx 來實現 CDC 的效果。我們知道從 Postgres 10 開始支援用 pub & sub 這種 logical replication 的方式,因此也提供 pgoutput 的 plugin 來實現 logical decoding。

因此這邊文章是使用 pgoutput + golang client 實現 logical replication 進而實現捕捉 CDC Event 的效果。

此外我們知道 Postgres 14 開始支援 pgoutput binary format 的功能,會讓效能會更好,因為之前只支援 text format。所以今天會拿 Postgres 14 來做示範。

Show me the code

logical replication client 實現

首先在做之前,要先想想如何做 logical replication client 呢?萬幸社群的大神太多,pgx 的作者就做了這個 library 出來:https://github.com/jackc/pglogrepl

有了這個 lib 的幫助,我們不必去處理底層 logical replication protocol 解析的處理。但是會建議對照官方文件提供的 logical replication protocol 來對照 code 會更理解。

那我們來一步一步頗析如何使用:

  1. 建立 streaming replication protocol 的連線:

    1
    2
    3
    4
    5
    conn, err := pgconn.Connect(context.Background(), "postgres://[email protected]/postgres?replication=database")
    if err != nil {
    log.Fatalf("failed to connect to PostgreSQL server: %v", err)
    }
    defer conn.Close(context.Background())

    很簡單在 url 後面多帶 replication=database 會告訴 PG 進入 walsender 模式,這樣才可以讓之後 client 端可以收到 wal data。

  2. 建立測試用的 table

    1
    2
    3
    if _, err := conn.Exec(context.Background(), createTable).ReadAll(); err != nil {
    log.Fatalf("failed to create table: %v", err)
    }

    為了之後要做 CDC event 的捕捉,因此先建立一個 table 來做示範:

    1
    CREATE TABLE IF NOT EXISTS t (id INTEGER PRIMARY KEY, name TEXT, toast TEXT);

    toast 這個欄位是等等要方便示範 toast 性質的 column 的效果。

  3. 建立 publication

    1
    2
    3
    4
    5
    if _, err := conn.Exec(context.Background(), fmt.Sprintf(createPublication, slotName)).ReadAll(); err != nil {
    if pge, ok := err.(*pgconn.PgError); !ok || pge.Code != "42710" {
    log.Fatalf("failed to create publication: %v", err)
    }
    }

    我們知道 pgoutput 的 plugin 使用限制就是必須要搭配 publication,因為其實 pgoutput 是拿來做 pub & sub 的 logical replication 而出來的,但在這邊我們可以只用到 pub 的部分,並不需要用到 sub 的部分。這麼說好了,你可以想像 sub 的部分是只能做在其他 PG 上,也就是省去現在我們自己要做 CDC event 捕捉的實作。但今天我們這個實作是希望可以自己捕捉 CDC event 而轉發給 external 的 consumer。

    create publication 的細節如下:

    1
    CREATE PUBLICATION %s FOR ALL TABLES;
  4. 發出 identify system

    1
    2
    3
    4
    5
    ident, err := pglogrepl.IdentifySystem(context.Background(), conn)
    if err != nil {
    log.Fatalf("failed to identify system: %v", err)
    }
    log.Printf("SystemID: %s Timeline: %d XLogPos: %d DBName: %s\n", ident.SystemID, ident.Timeline, ident.XLogPos, ident.DBName)

    透過先 IdentifySystem 除了可以確保過來的 pg connection 是來自於相同的 pg cluster identifier,簡單來說在做每一次的 initdb 的時候,PG 會自己產生一個 identifier 用來確保是 unique 的,這個特性很重要的是,如果你今天要建立 physical replication,必須確保 replica & primary 的 identifier 必須是一致的,也就是在做 physical replication 的建立時,第一個就會先檢查這部分。

    除了這個重要特性外,還能拿到 XLogPos 這個是代表當前 WAL 刷新位置。這個位置很重要,假設今天連線中斷了,我並不知道之前 replication slot 的 WAL LSN 位置是多少,透過 identify system 我可以知道目前 replication slot 的 WAL 的 LSN 位置是多少,那我就可以拿這個 LSN 繼續做 logical replication。

  5. 建立 Replication Slot

    1
    2
    3
    4
    5
    6
    if _, err = pglogrepl.CreateReplicationSlot(context.Background(), conn, slotName, outputPlugin, pglogrepl.CreateReplicationSlotOptions{}); err != nil {
    if pge, ok := err.(*pgconn.PgError); !ok || pge.Code != "42710" {
    log.Fatalf("failed to create logical replication slot: %v", err)
    }
    }
    log.Println("Created temporary replication slot:", slotName)

    建立 replication slot 是為了將 WAL 資料存在這裡面,再經過 pgoutput decode 的結果後再傳給 client 端。

  6. 開始 Replication 傳輸

    1
    2
    3
    4
    if err = pglogrepl.StartReplication(context.Background(), conn, slotName, ident.XLogPos, pglogrepl.StartReplicationOptions{PluginArgs: []string{"proto_version '1'", fmt.Sprintf("publication_names '%s'", slotName), "binary 'true'"}}); err != nil {
    log.Fatalf("failed to start replication: %v", err)
    }
    log.Println("Logical replication started on slot", slotName)

    這邊要注意的是 ident.XLogPos 就是我們前面提到拿 identify system 拿到的 LSN 的值,透過這樣我們可以從上次斷掉的部分繼續 consume,但如果你想要回放更早之前的 LSN 的資料可不可以?答案是可以的,這邊的 LSN 你不一定要拿 ident.XLogPos 的。

    但是要注意的是,你這邊給的 LSN 必須不是 Primary 認為是過時 replication slot 的資料,原因在於 Primary 會清掉過時 replication slot 的資料,如果清掉了你就不能回放舊資料了。準確來說你給 LSN 不能是小於 confirmed_flush_lsn。這個 LSN 是當 consumer 跟 PG 説,嘿!我現在的 LSN 已經讀到這裡了,跟你說一聲,PG 收到了,就會認為該 LSN 之前的資料都已經過時了,我可以清掉了,釋放硬碟空間。

    confirmed_flush_lsn 的值怎麼來呢?可以透過 query pg_replication_slots 知道:

    https://www.postgresql.org/docs/current/view-pg-replication-slots.html

    The address (LSN) up to which the logical slot’s consumer has confirmed receiving data. Data older than this is not available anymore. NULL for physical slots.

    那 Consumer 要怎麼跟 Primary 説我的 LSN 進度到哪了呢?下面我們會細講。

    再來,這邊要注意的是 PluginArgs 的值到底該給什麼?首先,PluginArgs 必須根據你所使用的 plugin 去給,以我們要用的 pgouput 到底該給什麼值,可以先來看 PG 文件:https://www.postgresql.org/docs/current/protocol-logical-replication.html

    55.5.1. Logical Streaming Replication Parameters

    The logical replication START_REPLICATION command accepts following parameters:

    • proto_version

      Protocol version. Currently versions 1, 2, and 3 are supported.Version 2 is supported only for server version 14 and above, and it allows streaming of large in-progress transactions.Version 3 is supported only for server version 15 and above, and it allows streaming of two-phase commits.

    • publication_names

      Comma separated list of publication names for which to subscribe (receive changes). The individual publication names are treated as standard objects names and can be quoted the same as needed.

    只有 proto_version 跟 publication_names 是必給的參數,其中 version 2 只能用在 PG 14 以上,而 version 3 只能支援 PG 15 以及 streaming 跟 two-phase commits。

    這邊根據我們需求只需要給 version 1 就可以了。

    另外文件有說明 Logical Decoding Output Plugins 的運作流程:

    https://www.postgresql.org/docs/current/logicaldecoding-output-plugin.html

    根據這兩個文件內容,可以來對照 pgoutput 的 source code

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    static void
    parse_output_parameters(List *options, PGOutputData *data)
    {
    ListCell *lc;
    bool protocol_version_given = false;
    bool publication_names_given = false;
    bool binary_option_given = false;
    bool messages_option_given = false;
    bool streaming_given = false;
    bool two_phase_option_given = false;
    bool origin_option_given = false;
    ...

    // 這邊會檢查 binary option 有沒給
    else if (strcmp(defel->defname, "binary") == 0)
    {
    if (binary_option_given)
    ereport(ERROR,
    (errcode(ERRCODE_SYNTAX_ERROR),
    errmsg("conflicting or redundant options")));
    binary_option_given = true;

    data->binary = defGetBoolean(defel);
    }
    }


    在這邊可以找到所有可以用 plugin arguement 有哪些。就我們的情境我們只需要給 protocol_versionpublication_namesbinary_option 就可以。其中 protocol_version 跟 publication_names 是必須給的,而 binary_option 這是代表 pgouput 是否開啟 binary format,如果不給預設是 text format 效能較差。

    而在開始 replication 之後馬上就會 callback 這個 pgoutput_startup function:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    // 在 Start Replication 後會馬上 callback 這個 pgoutput_startup function
    static void
    pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
    bool is_init)
    {
    ...
    if (!is_init)
    {
    // 這邊會進行 parse plugin argument 的實作
    // 也就是上面那段 code
    parse_output_parameters(ctx->output_plugin_options, data);

    // 當前面 parse 完後,這邊就會檢查所需要 plugin argument 是否符合需求
    // 檢查 LOGICALREP_PROTO_MAX_VERSION_NUM <= proto_version <= LOGICALREP_PROTO_MIN_VERSION_NUM
    // 而 min 是 1, max 是 4
    if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
    ereport(ERROR,
    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
    data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));

    if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
    ereport(ERROR,
    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
    data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));

    // 檢查 publication_names 必須不為空
    if (data->publication_names == NIL)
    ereport(ERROR,
    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    errmsg("publication_names parameter missing")));
    ...
    }
    else
    {
    /*
    * Disable the streaming and prepared transactions during the slot
    * initialization mode.
    */
    ctx->streaming = false;
    ctx->twophase = false;
    }
    }

    所以我們可以知道要給的參數是:

    1
    []string{"proto_version '1'", fmt.Sprintf("publication_names '%s'", slotName), "binary 'true'"}}
  7. 進行 client 接收 message 的解析流程

    這段很長,建議要搭配這兩份官方文件的內容:

    https://www.postgresql.org/docs/current/protocol-replication.html

    其中 START_REPLICATION 有說到會收到哪些 message,且這些 message 內容是被包在 CopyData 裡面的有:

    1. XLogData
    2. Primary keepalive message
    3. Standby status update
    4. Hot standby feedback message

    https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html

    其中 XLogData 裡面有哪些 type 的 message 可以收。

    由於程式碼過長,我直接在下方寫成註解解釋會比較容易。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    for {
    // 首先一開始就要檢查是否需要送 StandbyStatusUpdate 這個就是我們前面所說的 client 端如何跟 pg 説,我目前已經讀到哪個 LSN 了。因此透過這樣的實作可以定期的跟 PG 説我讀到哪裡了,過時的 LSN 就幫我清掉吧。
    if time.Now().After(nextStandbyMessageDeadline) {
    // 要送給 PG 的 LSN 就是 clientXLogPos
    if err = pglogrepl.SendStandbyStatusUpdate(context.Background(), conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: clientXLogPos}); err != nil {
    log.Fatalf("failed to send standby status update: %v", err)
    }
    log.Println("sent standby status update")
    nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
    }

    // 收 message 會設定 timeout 是為了上面的 StandbyStatusUpdate 可以定期的發送。
    ctx, cancel := context.WithDeadline(context.Background(), nextStandbyMessageDeadline)
    msg, err := conn.ReceiveMessage(ctx)
    cancel()
    if err != nil {
    // timeout 的話就重來
    if pgconn.Timeout(err) {
    log.Printf("receive message but timeout: %v\n", err)
    continue
    }
    log.Fatalf("failed to receive message: %v", err)
    }

    // 如果有收到 message 就可以檢查 message type 是哪種
    switch msg := msg.(type) {
    // replication protocol 給出的 message 會包在 CopyData message 裡面
    case *pgproto3.CopyData:
    switch msg.Data[0] {
    // 如果你前面有 SendStandbyStatusUpdate,就會收到 PG 給你這個 message
    // 主要是可以告訴你 PG 目前 ServerWaLEnd LSN 為何,這個指的就是 replication slot 的 LSN 目前的位置。
    // 如果 PG 太久沒收到 client 給過來的消息也會給你這個 message,並且要求你馬上 reply
    // 這就是為什麼這邊會檢查 ReplyRequested 並且將 nextStandbyMessageDeadline 設定為
    // time.Time{}
    // 這樣下一個 for loop 就可以馬上 SendStandbyStatusUpdate
    case pglogrepl.PrimaryKeepaliveMessageByteID:
    var pkm pglogrepl.PrimaryKeepaliveMessage
    if pkm, err = pglogrepl.ParsePrimaryKeepaliveMessage(msg.Data[1:]); err == nil && pkm.ReplyRequested {
    nextStandbyMessageDeadline = time.Time{}
    }
    log.Printf("primary keepalive message: ServerWaLEnd: %d ServerTime: %v ReplyRequested: %v\n", pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested)
    // 當收到這個 message 就代表是收到 CDC change event
    // 這邊才是我們所需要的真正 message 內容
    case pglogrepl.XLogDataByteID:
    // pglogrepl 幫我們做好了所有 Parse 的細節處理
    xld, err := pglogrepl.ParseXLogData(msg.Data[1:])
    if err != nil {
    log.Fatalf("failed to parse xlog data: %v", err)
    }
    log.Printf("xlog data: WALStart %s ServerWALEnd %s ServerTime %s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime)

    // 解析原生的 WAL Data
    logicalMsg, err := pglogrepl.Parse(xld.WALData)
    if err != nil {
    log.Fatalf("failed to parse logical replication message: %v", err)
    }

    // 接著就能判斷這個 message 是屬於哪種 type
    // 正常來說 message 給出的順序是一個 transaction 為單位的
    // 也就是他會將 transaction 裡面的 begin, relation, change, change, commit
    // 這些各自拆分為一個 message
    // 所以你會先收到 begin, 下一次 for loop 會收到 relation,下一次 for loop 才會收到裡面
    // 的 SQL 語句內容,再下一次才會收到 commit
    // 所以如果你想做 pg2pg 的同步行為,你就必須在收到 begin 的同時就開啟 transaction 才能保
    // 持 ACID 的特性。
    // 那 Relation 的 message 作用是幹什麼用的呢? 存的內容就是你下一個 change 會更動的 table
    // 及 namespace 等相關內容
    // 假設你今天的 transaction 是 begin, insert t1 table, insert t2 table, commit
    // 那你收到的 message 順序會是:begin, relation, insert, relation, insert, commit
    // 有了這個 relation 你才知道是對哪個 table 做操作,因為 insert/update/delete message
    // 會給 relation id,這個 relation id 其實就是 OID of the relation 是唯一的。
    switch logicalMsg := logicalMsg.(type) {
    case *pglogrepl.RelationMessage:
    // 所以收到 relation 之後要將 id 與 message 關係存起來方便之後做對照
    relations[logicalMsg.RelationID] = logicalMsg
    log.Println("receive relation message")
    case *pglogrepl.BeginMessage:
    log.Printf("receive begin message: xid: %d finalLSN: %d commitTime: %v\n", logicalMsg.Xid, logicalMsg.FinalLSN, logicalMsg.CommitTime)
    case *pglogrepl.CommitMessage:
    log.Printf("receive commit message: commitLSN: %d transactionEndLSN: %d commitTime: %v\n", logicalMsg.CommitLSN, logicalMsg.TransactionEndLSN, logicalMsg.CommitTime)
    // 收到 insertMessage,可以透過 logicalMsg.Tuple 拿到 insert 什麼資料
    case *pglogrepl.InsertMessage:
    rel, ok := relations[logicalMsg.RelationID]
    if !ok {
    log.Fatalf("unknown relation ID %d", logicalMsg.RelationID)
    }
    values := getValues(rel, logicalMsg.Tuple)
    log.Printf("receive insert message: newValues: %v\n", values)
    // 收到 updateMessage,這邊注意 oldValues 只有當 replica identity 設定 full 才會有
    // 而 newValues 裡面如果有 toast 性質的 column 會有值除非 replica identity 設定 full
    // 或是有更新 toast columns 的值才會有
    case *pglogrepl.UpdateMessage:
    rel, ok := relations[logicalMsg.RelationID]
    if !ok {
    log.Fatalf("unknown relation ID %d", logicalMsg.RelationID)
    }

    // oldValues will only have values when the Replica Identity of the Table is set to full mode.
    oldValues := getValues(rel, logicalMsg.OldTuple)
    // toast columns will only have values when the replica identity is set to full mode or update toast columns
    newValues := getValues(rel, logicalMsg.NewTuple)
    log.Printf("receive update message: oldValues: %v newValues: %v\n", oldValues, newValues)
    // 收到 deleteMessage,這邊注意 oldValues 只有當 replica identity 設定 full 才會有
    // 不然 oldValues 只有 PK 才會有值
    case *pglogrepl.DeleteMessage:
    rel, ok := relations[logicalMsg.RelationID]
    if !ok {
    log.Fatalf("unknown relation ID %d", logicalMsg.RelationID)
    }

    // oldValues except primary key column will only have values when the Replica Identity of the Table is set to full mode.
    oldValues := getValues(rel, logicalMsg.OldTuple)
    log.Printf("receive delete message: oldValues: %v\n", oldValues)
    case *pglogrepl.TruncateMessage:
    log.Println("receive truncate message")
    case *pglogrepl.TypeMessage:
    log.Println("receive type message")
    case *pglogrepl.OriginMessage:
    log.Println("receive origin message")
    default:
    log.Printf("receive unknown message type in pgoutput stream: %v\n", logicalMsg)
    }
    // 這邊更新 clientXLogPos 可以拿 xld.WALStart + pglogrepl.LSN(len(xld.WALData))
    // 當作 client 端讀取到的 LSN 位置
    // 但是你也可以拿 commitLSN 去當作最新的 clientXLogPos 也可以,這樣回放會比較方便
    // 而目前更新 clientXLogPos 照理說會是 > commitLSN
    clientXLogPos = xld.WALStart + pglogrepl.LSN(len(xld.WALData))
    log.Printf("clientXLogPos: %d\n", clientXLogPos)
    }
    default:
    log.Fatalf("receive unexpcted message: %v", msg)
    }
    }

    除了上面我解釋的細節還有幾點要注意:

    1. relation message 如果以前同個 table 送過了,下一次就不會送了,除非 table 的 schema 改變了才會再送一次 relation message 出來

    2. begin message 的 FinalLSN 會等於 commit message 的 commitLSN 而 commit message 的 TransactionEndLSN 會 > commitLSN 跟 finalLSN,之所以 FinalLSN 跟 CommitLSN 要一樣是因為要判斷說這是屬於同一個 transaction 的,而送的 message 本身就是有順序性的,也就是送出來的 transaction 本身 PG 就會先重排過了。

      因此這個 commitLSN 一定會比下一個 commitLSN 還要小。

    3. clientXLogPos 到底要怎麼更新比較好:

      • 可以跟上面的程式碼一樣拿 xld.WALStart + pglogrepl.LSN (len (xld.WALData))

      • 也可以拿 commitLSN,原因在於如果你想要回放上一個 transcation 的資料,你就可以拿 commitLSN 去 Start Replication。但是要注意如果你這個 commitLSN 已經透過 SendStandbyStatusUpdate 給 PG 的話,PG 就會認為這 LSN 之前的資料是過時的可以清掉,如果這時候你在想要回放資料有可能是找不到資料而無法回放的。

        因此通常你收到 CDC event 如果你又想回放舊資料,建議是收到 CDC event 確保資料已經存在其他 storage 你再去 SendStandbyStatusUpdate 才是最保險的。

    4. 如果想要 restart 並且從斷掉 LSN 來 Start Replication,需要拿上次斷掉的 LSN,或者直接拿 identify system 給過來的 LSN 也可以。

    整段的程式碼可以參考我的 GitHub:https://github.com/KennyChenFight/pgx-cdc-demo

總結

今天這篇文章主要介紹如何使用 golang + pgx + pgoutput 來實現 CDC,基本上上面的程式碼就是個雛形了。但到這邊一定會有缺點是無法做到的。

首先:

  1. DDL change 抓不到,這是硬傷
  2. Postgres 14 才支援 pgoutput binary format

那下篇文章我們來講講 pgcapture 我強者朋友發明的 CDC Library 如何解決這些缺點。