pgcapture - 介紹與示範

上次我們講到如何透過 pgoutput plugin 加上 pgx 來實作我們自己的 CDC 功能,但是其缺點是無法複製 DDL 且直到 pg 14 才支援 pgoutput 有 binary format 的功能。

因此今天來介紹我前同事開發的 pgcapture 是如何去解決以上提到的兩個缺點的,且如何使用。

什麼是 pgcapture

pgcapture 主要根據 Netflix 的 DBLog 的文章 去實作的,其目的就是要實作 CDC 的 framework,而 pgcapture 就是針對 Postgres 的 CDC 實作。

我們來看一下整體的架構圖:

overview

我們先看上半部 SourcePG -> Logical Replica 這段:

  1. Source PG 指的是你想要捕捉的資料來源 PG
  2. Event Store 是用來存放每一筆 PG 的 change 資料
  3. Logical Replica 跟 Event Store 拿取 Source PG 的 change 資料進而 apply 到自己 PG 身上,達到 Source PG 可以與 Logical Replica 進行資料同步的操作。因此才叫做 Logical Replica (Logical Replication 的 Replica)。

我們再來看下半部的部分,這邊其實涵蓋兩大功能,建議分開去看比較清楚:

首先是 DBLog Gateway -> Consumers 的部分:

  1. DBLog Gateway 作用是跟 Event Store 讀取 Source PG 的 Change 資料,將這些 Change 資料透過 gRPC streaming 傳給下游的 Consumer
  2. Consumer 的功用,可以是你的 Application 或是叫做 Service,可以透過 gRPC streaming 去拿到 PG 的 change 資料,那你的 Application 就可以做很多應用了:
    1. create/update/delete cache By CDC,例如你今天聽 posts table 的 change event, 聽到 insert 的操作你可以建立 cache,聽到 update 的操作你可以更新 cache,聽到 delete 的操作你可以 invalidate cache。
    2. 資料 sync By CDC,如果今天你需要聽 Source PG 的某個 table 資料變化,一但有變化也要記在自己 Service 的 DB 上,就可以利用這樣的方式達到,其實就如同我們上面說的 Source PG -> Logical Replica,但不同的是你只是想要聽某一個 Table Event 而已。

我們再來看前面 Dump Admin -> DBLog Control API -> DBLog Gateway -> Consumers 這段:

  1. Dump Admin 這邊的 Dump 指的是,今天你想要 Dump 某一段在 Logical Replica 的資料送給 Consumers,因此你可以在 Dump Admin 去決定你想要 Dump 的資料 Range 是哪一些 Page。
  2. DBLog Control 就負責接收你選好的 Page Range,並紀錄現在 client 要做 Schedule Dump 的操作,而 DBLog Gateway 會 streaming 的問 Control 有沒有需要 Dump Range,所以 Control 就會將 Page Range 傳給 DBLog Gateway,Gateway 會根據這些 Page Range 去跟 Logical Replica 拿對應的 Range Page 的所有資料以及該傳給下游哪一個 Consumer,接著將這些資料 streaming 給 Consumer
  3. Consumer 拿到這些 Page 的資料就能做之前資料的回放操作,例如你想要 CDC 過去資料,來做應用的話,就可以 Dump 來做到

因為 Dump 這段是跟 Logical Replica 拿資料,所以不會影響 Source PG 的 Performance,而 Logical Replica 本身就可以開多台來負荷 CDC 的 Dump,假設現在有很多 Client 都要做 Dump 操作的話。另外 Dump 的部分因為可以針對某一個 Consumer 做 Dump CDC 的操作,因此不會影響到其他 Consumers。

而 event store 的部分,在 pgcapture 是採用 pulsar 來進行 event store 來儲存,用 pulsar 的好處是底下的 storage 可以換成 GCS / S3,另外 pulsar 也保證了 message consume 的順序性。關於 pulsar 有機會會想多寫文章講更細,我還需要多加鑽研 ><

而我們提到 pgcapture 解決了兩個大缺點:

  1. 可以複製 DDL
  2. logical decoding plugin 支援 binary format

怎麼解決的呢?細節實作,等下篇文章在做講解,直接講結論是:

  1. 實作一個 postgres plugin -> pgcapture plugin 負責 listen DDL 操作,並且將 DDL 操作記錄到 pgcapture.ddl_logs table 裡面,而在聽 logical decoding 的時候將 ddl_logs table 裡面的資料視為是 DDL 操作,並且將這些 change 也一樣的傳到 pulsar event store 上,所以 logical replica 才可以 apply DDL 操作
  2. logical decoding plugin 使用 pglogical 提供的 pglogical_output plugin 來進行 decode,並且 pglogical_output 本身就支援 binary format,因此可以提高效率。

pgcapture 內的元件介紹

講完了 pgcapture 整體的架構流程圖,那我們實際來看 pgcapture 提供了哪些 cmd 元件可以用:

  1. agent:agent 主要是拿來當作可以接受 remote config 的設定進而去開啟哪些功能的元件,例如 agent 有一個 Configure function 可以用來接受 grpc request 來觸發:

    1. pg2pulsar
    2. pulsar2pg
    3. Status

    這三種功能。

  2. configure:configure 是拿來一直戳 agent 的,也就是說我可以啟動 configure 並且帶上 pg2pulsar 所需要的參數,接著透過 configure 去戳 agent grpc service,達到啟動 pg2pulsar 功能的效果,另外 configure 會一直 for loop 去戳 agent,而 agent 那邊有設計一但 pg2pulsar 或是 pulsar2pg 已經有啟動了,那麼 configure 只會拿到這些功能的 report status。

    其主要用途就是讓你可以透過看 configure 定期檢查其他功能的目前狀態如何。

  3. controller:controller 其用途就是上面我們架構提到的 DBLog Controller 的用途是一樣的。

  4. gateway:gateway 其用途也是等上面我們架構提到的 DBLog Gateway 的用途是一樣的。

  5. pg2pulsar:pg2pulsar 指的是 SourcePG -> Event Store 這塊的實作,用來聽 Postgres logical decoding event 接著送到 Pulsar。

  6. pulsar2pg:pulsar2pg 指的是 Event Store -> Logical Replica 這塊的實作,用來 read pulsar message 並且 apply 到 logical Replica Postgres。

從上面的元件,如果我們想要換 eventStore,假設換成 kafka 好了,我們就可以實作自己的 pg2kafka,或者是今天我們想做 MySQL 的話,可以做自己的 mysql2pulsar 等等。這些 interface 其實都要定義好,所以可以無痛的轉換其他的實作。

pgcapture 示範

這篇文章還不會很詳細的講到裡面的 source code 的實作細節,下一篇我們才會針對重要的實作去做了解,而這邊主要是講解你如何架設一整套的 pgcapture,因為 pgcapture 的元件其實算多也算有點複雜,一開始可能會有點迷惘其用途,以及該如何使用。但如果你有從我前面的 Logical Replication -> Logical Decoding -> pgoutput cdc 實作,一路看下來,那對於 pgcapture 的實作就會清楚了不少。

我自己為了快速了解 pgcapture 實作細節,並且方便自己 debug,因此建立 docker-pgcapiure,方便用 docker 快速架設出 pgcapture 的實驗環境,歡迎大家取用。截至目前爲止,pgcapture 0.40 的版本算是比較穩定的版本。這之間當然加了不少功能及修正一些 bug,之後我在寫幾篇文章講講吧。

我們直接來看 docker-compose.yaml 的內容吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
version: "3.8"
services:
postgres_source:
build: postgres
image: "kennychenfight/postgres:11-logical"
container_name: "postgres_source"
ports:
- "5432:5432"
command: ["postgres", "-c", "config_file=/pgc/postgresql.conf", "-c","hba_file=/pgc/pg_hba.conf"]
environment:
POSTGRES_HOST_AUTH_METHOD: trust
volumes:
- ./postgres:/pgc

postgres_sink:
image: "kennychenfight/postgres:11-logical"
container_name: "postgres_sink"
ports:
- "5433:5432"
command: [ "postgres", "-c", "config_file=/pgc/postgresql.conf", "-c","hba_file=/pgc/pg_hba.conf" ]
environment:
POSTGRES_HOST_AUTH_METHOD: trust
volumes:
- ./postgres:/pgc

pulsar:
image: apachepulsar/pulsar:2.7.1
container_name: pulsar
command: ["bin/pulsar", "standalone"]
ports:
- 6650:6650
- 8080:8080

pulsar-ui:
image: apachepulsar/pulsar-manager:v0.2.0
container_name: pulsar-ui
ports:
- 9527:9527
- 7750:7750
depends_on:
- pulsar
environment:
SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties
volumes:
- ./pulsar/application.properties:/pulsar-manager/pulsar-manager/application.properties

pg2pulsar:
image: rueian/pgcapture:v0.0.40
container_name: "pg2pulsar"
command: [ "pg2pulsar", "--PGConnURL=postgres://postgres@postgres_source:5432/postgres?sslmode=disable", "--PGReplURL=postgres://postgres@postgres_source:5432/postgres?sslmode=disable&replication=database", "--PulsarURL=pulsar://pulsar:6650", "--PulsarTopic=persistent://public/pgcapture/postgres"]

pulsar2pg:
image: rueian/pgcapture:v0.0.40
container_name: "pulsar2pg"
command: [ "pulsar2pg", "--PGConnURL=postgres://postgres@postgres_sink:5432/postgres?sslmode=disable", "--PulsarURL=pulsar://pulsar:6650", "--PulsarTopic=persistent://public/pgcapture/postgres" ]

gateway:
image: rueian/pgcapture:v0.0.40
container_name: "gateway"
ports:
- 10001:10001
command: gateway --ControllerAddr=controller:10000 --ResolverConfig='{"postgres_cdc":{"PulsarURL":"pulsar://pulsar:6650","PulsarTopic":"persistent://public/pgcapture/postgres","PulsarSubscription":"postgres_cdc","AgentURL":"agent:10000"}}'

controller:
image: rueian/pgcapture:v0.0.40
container_name: "controller"
command: [ "controller" ]
ports:
- 10000:10000

agent:
image: rueian/pgcapture:v0.0.40
container_name: "agent"
command: [ "agent" ]

configure:
image: rueian/pgcapture:v0.0.40
container_name: configure
command: ["configure", "--AgentAddr=agent:10000", "--AgentCommand=pulsar2pg", "--PGConnURL=postgres://postgres@postgres_sink:5432/postgres?sslmode=disable", "--PulsarURL=pulsar://pulsar:6650", "--PulsarTopic=persistent://public/pgcapture/postgres"]

wait-demo-consumer-deps:
image: dadarek/wait-for-dependencies
depends_on:
- pulsar
- pulsar-ui
- postgres_source
- postgres_sink
- controller
- gateway
command: ["pulsar:8080", "pulsar:6650", "pulsar-ui:9527", "pulsar-ui:7750", "postgres_source:5432", "postgres_sink:5432", "controller:10000", "gateway:10001"]

wait-demo-scheduler-deps:
image: dadarek/wait-for-dependencies
depends_on:
- pulsar
- pulsar-ui
- postgres_source
- postgres_sink
- agent
- controller
- gateway
command: [ "pulsar:8080", "pulsar:6650", "pulsar-ui:9527", "pulsar-ui:7750", "postgres_source:5432", "postgres_sink:5432", "agent:10000", "controller:10000", "gateway:10001"]
  1. postgres_source 是拿來當作 Source PG 用的,比較麻煩的是,因為 postgres 需要有 pgcapture & pglogical 這兩個 plugin 才可以,而 pgcapture plugin 需要從 PG Source Code 去 install pgcapture plugin 才可以,那就需要在 Linux 裝 PG develop server 的版本才可以。有鑒於此,我開了一個 Dockerfile 的內容如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    FROM ubuntu:18.04 AS extension_builder

    RUN apt update && \
    apt install -y wget gnupg2 && \
    wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - && \
    echo "deb http://apt.postgresql.org/pub/repos/apt bionic-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \
    apt update && \
    apt install -y postgresql-server-dev-11 build-essential libssl-dev libkrb5-dev zlib1g-dev libedit-dev libxml2-dev libxslt-dev libselinux-dev libpam-dev

    RUN wget https://github.com/rueian/pglogical/archive/REL2_3_4_no_filter.tar.gz && \
    wget https://github.com/rueian/pgcapture/archive/v0.0.22.tar.gz && \
    tar -zxvf REL2_3_4_no_filter.tar.gz && \
    tar -zxvf v0.0.22.tar.gz

    RUN cd /pglogical-REL2_3_4_no_filter && make && \
    cd /pgcapture-0.0.22/hack/postgres/extension && make

    RUN cd /pglogical-REL2_3_4_no_filter && make install && \
    cd /pgcapture-0.0.22/hack/postgres/extension && make install

    FROM postgres:11

    COPY --from=extension_builder /usr/lib/postgresql/11/lib /usr/lib/postgresql/11/lib

    COPY --from=extension_builder /usr/share/postgresql/11/extension /usr/share/postgresql/11/extension

    是從 ubuntu 去 build PG with pglogical & pgcapture 的版本,在將這些 lib 複製到 Postrges 官方的 image 裡面,透過這樣的方式你就可以根據這個 Dockerfile 去 build 你任何版本的 Postgres 了。

  2. postgres_sink 就是我們的 Logical Replica PG

  3. pulsar & pulsar-ui 就是我們的 event store & 對應的 UI 方便我們看 pulsar 上有哪些 topic & message

  4. pg2pulsar 要給的參數包含有 sourcePG 及 pulsar 的連線資訊,還有要送到 pulsar 哪個 topic

  5. pulsar2pg 要給的參數包含有 pulsar & replica pg 的連線資訊,還有要聽 pulsar 哪個 topic

  6. gateway 這邊要給的參數比較特別,吃的是 json 的 format 的 config string,需要給 pulsar 的連線資訊及要吃哪一個 topic 還有 CDC consumer 要聽 pulsar topic 的 subscription 是長怎樣,以及 agent 的連線資訊

    為什麼會需要 agent 呢?

    主要是因為 dump 這段的流程是 gateway 拿到 page range 後會去戳 agent 而 agent 才會去戳 logical replica pg 去拿對應的資料再送給 gateway。

  7. controller 沒什麼特別的參數要給

  8. agent 也是

  9. configure 要給的參數就是你想要戳 agent 什麼指令就帶怎樣的參數,這邊我想要讓 configure 戳 agent 開啟 pulsar2pg 的功能,所以我就給這些參數

  10. wait-demo-consumer-deps 這個是為了 demo CDC consumer 的功能,因此需要先開好這些 dependency,避免有服務還沒起來就 demo consumer 會出錯

  11. wait-demo-scheduler-deps 這個是為了 demo scheduler 的功能,也就是我們前面說 dump 的那段流程,避免有服務還沒起來就 demo scheduler 會出錯

README.md 有寫的很清楚,如果要 demo CDC consumer,只需要跑一下指令即可:

1
2
3
4
5
6
# 這段是 build pgcapture image 出來,因為 pgcapture 目前 docker registry 還沒有更新最新版本,因此提供這個腳本可以 local build pgcapture image 出來
./pgcapture/dockerbuild.sh
# 這段是架設好 demo cdc consumer 所需要的所有 dependency
./demo-consumer.sh
# 這段是跑 consumer 的程式
go run consumer/main.go

接著你在 source pg 去 create table & insert data

你就可以在 consumer 那段程式拿到 insert data 的 event 資料了。

而 demo scheduler 也只需要跑一下指令即可:

1
2
3
4
5
./pgcapture/dockerbuild.sh
# 這個是架設好 demo scheudler 所需要的所有 dependency
./demo-scheduler.sh
# 這段是跑 consumer 的程式
go run consumer/main.go

接著你在 source pg 去 create table & insert data

理論上你 consumer 會拿到這個 event,但此時如果你想要再次回放這個 event 的話,你可以:

1
go run scheduler/main.go

這個 scheduler 程式會去拿某 Table 下所有 Page 資料,並且傳給 Controller,而 Gateway 就會跟 Controller 拿到 page 資料,再去戳 agent 拿到所有 page 下的資料再傳給 gateway 在 streaming 給 Consumer。

因此 Consumer 程式就會再次拿到 event,只是要注意的是 dump 的操作丟給 Consumer 都是 update event 的。所以原本你在 consumer 拿到的會是 insert event 而現在你透過 dump 的話,拿到會是 update event。

總結

這篇文章主要講解了 pgcapture 的架構與 demo 的介紹,下一篇文章會來講講 pgcapture 裡面的重要實作是怎麼做到的。