Postgres - Copy Operation 細節探討

今天要來介紹的是 Copy Operation in Postgres 的用法,此外也比較與 InsertBatch Insert 之間的速度比較。之所以想講這個是因為最近在工作上有需要大量 insert record 的需求,那因為是使用 Postgres 的關係,所以開始研究 Copy 的使用方式,並且深入探討了一下 Copy 使用 simple queryextended query protocol 請求方式的差別。這邊會透過 wireshark 來看封包分析一下~

Simple Query V.S Extended Query Protocol

再深入講 Copy 之前,必須先知道 Postgres 相關的 message flow 的設計。

Message Flow

Postgres 有所謂的 Message Flow,簡單來說這個是指 client 如何對 Postgres Server 進行傳輸,而 Postgres 本身會有定義好的 message type 來做溝通,相關介紹可以看官方文件會更清楚:

https://www.postgresql.org/docs/13/protocol-flow.html

Simple Query

那麼在 message flow 就有幾種 message type 是 for Simple Query 來定義的,Simple Query 可以想成 client 端要對 Postgres 進行 SQL 語句查詢,因此會透過發出 Query message,Postgres 收到 Query Message 就會 response DataRow message type 告訴 client 端有哪些資料。

可以看這張圖會很清楚:

figure1

  • Client 端發出 Query Message 給 Postgres Server,特別注意這邊的 Query Message 都會是 Text Format 的形式呈現,所以可以理解 simple query 的方式,資料的尺寸會比較高。
  • Postgres Server 會回傳 RowDescription 及 DataRow 給 Client 端,Row Description 指的是回傳的 DataRaw 的相關資訊,例如每個欄位的名稱,Type OID,長度等的一些 metadata 資料。
  • 當每一次的 SQL command 被 Postgres Server 完成後也會回傳 CommandComplete and ReadyForQuery
  • 只有當 Postgres Server 確實的發出 ReadyForQuery 後,Postgres Server 才會當作這次的 Simple Query 結束了。所以 client 端要自行判斷這次的 simple query 操作在收到 ReadyForQuery 就結束了,下一次發的新 query 會是新的 simple query。所以通常每個語言的 SQL Driver 會需要針對不同的 database 處理這段的。
  • 而通常 RowDescription and DataRow and CommandComplete and ReadyForQuery (如果沒有 client 端沒有其他 query 指令要傳過來的話) 會一起回傳,所以只會算一次 RTT。
  • 如果中間 SQL 指令有錯誤怎麼辦?Postgres Server 如果這邊處理的時候有檢測到 Error,就會發出 ErrorResponse,接著再發出 ReadyForQuery,代表 Postgres Server 結束了這次的 Simple Query。

Extended Query Protocol

Extended Query 的方式會將 simple query 的步驟拆幾個步驟出來,最主要是為了達到 prepareStatement 的效果:

figure2

為什麼會需要 parpareStatement 的效果呢?對於 client 端而言可以避免 SQL Injection,此外 client 端這邊也可以將被 Postgres Server Parse 成功後的 statement cache 在 client 端上,下一次要使用相同語句的 query 就不必再發出 Parse 等等的操作。對於 Postgres Server 而言也可以重複使用相同的 statement,而不需要對每一次過來的 query 再進行 parse。

所以根據以上這張圖可以知道:

  • client 端會需要先發出 Parse 並讓 Postgres Server 回傳 ParseOK,代表成功 Parse statement。
  • client 端接著會發出 Bind 將參數值與 Parsed Statement 進行綁定,並回傳 BindOK,而通常這時候 Postgres 就會計算該 Query 的 execute plan 並 cache 起來以待之後 client 端發出 Execute Message 就可以直接執行。
  • client 端可以選擇要不要發出 Describe,主要是可以跟 Postgres Server 拿到前面所說的 Row Description 等資訊。
  • client 端會發出 Execute,要求 Postgres Server 執行 Bind 過後的 statement,並回傳 DataRow,接著回傳 CommandComplete。
  • client 端會發出 Sync,來當做 Extended Query 的操作結束,當 Postgres Server 收到後才會回傳 ReadyForQuery,才真的代表該次 Query 正式結束。Sync 的目的是為了當 Postgres Server 這邊遇到錯誤的時候可回復正常狀態,例如當在處理 Extended Query Protocol 時遇到錯誤時,Postrgres Server 會發出 ErrorResponse,接著這時 client 發出的任何 message Postgres Server 這邊都會讀取並忽略,直到有收到 Sync,才會發出 ReadyForQuery 並返回到正常的消息處理。
  • 最後過程中的參數 bind 與 return 的 dataRow 都可以是 Binary Format 而不需要是 Text Format,詳細的 message format 可以參考:https://www.postgresql.org/docs/13/protocol-message-formats.html

最後要知道你所使用程式語言 client library 是使用 simple query 還是 extended query protocol,除了看它們的官方文件也是可以透過 wireshark 的方式去看封包就可以知道了,如果對於 Golang 的一些 library 的比較,可以看大神我同事的文章

而今天的 library 測試 Copy 是 simple query 還是 extended query protocol 就是使用 pgx,用這個 library 好處是會自動選用 extended query protocol 的方式來傳輸以及提供多達 70 多種的 type 都是可以 encode 成 binary format 的,所以可以大幅的節省頻寬。

Copy Operation

在看 Copy Message 傳輸方式,先來講講 Copy 的介紹,Copy 本身指令來由就是 Postgres 為了快速的 input/output data 而用的,所以它的 operation 比起 insert 底層實作是不太一樣的,有進行特別的優化。

Copy 操作本身分為三種方式:

  • Copy-In:用來 import data
  • Copy-Out:用來 export data
  • Copy-Both:用來雙向傳輸資料之用

相關細節可以參考官方文件:https://www.postgresql.org/docs/13/sql-copy.html

優點

  • 速度比 Batch Insert 更快
  • client 不需要組 sql string
  • 用法上更簡單
  • copy 只當作一個 operation

缺點

  • 擷取官方文件說明:COPY stops operation at the first error. This should not lead to problems in the event of a COPY TO, but the target table will already have received earlier rows in a COPY FROM. These rows will not be visible or accessible, but they still occupy disk space. This might amount to a considerable amount of wasted disk space if the failure happened well into a large copy operation. You might wish to invoke VACUUM to recover the wasted space.

    Copy 如果遇到錯誤,例如 conflict (constraint error),之前 insert 進去的 record 不會存在,但是還是會佔據 disk space,如果在大量的 copy 下遇到錯誤,可能要使用 VACUUM 來清理

  • Copy 無法做到 upsert 的操作

那麼 Copy 的 message flow 又長怎樣呢?來看一下:

figure3

  • client 端會需要發出 Copy table FROM STDIN 告訴 Postgres Server 現在要走 copy-in,並回傳 CopyInResponse。
  • client 端接著就會發出多筆的 CopyData 將資料傳輸過去,如果資料傳輸完畢就會再發出 CopyComplete 給 Postgres Server,Postgres Server 收到 CopyData 後就會開始 import data 進去 target table,當收到 CopyComplete 就會發出 CommandComplete 及 ReadyForQuery,來結束這次的 copy operation。

那 copy 在 simple query 與 extended query protocol 差別在哪?

擷取官方文件說明:In the event of a backend-detected error during copy-in mode (including receipt of a CopyFail message), the backend will issue an ErrorResponse message. If the COPY command was issued via an extended-query message, the backend will now discard frontend messages until a Sync message is received, then it will issue ReadyForQuery and return to normal processing. If the COPY command was issued in a simple Query message, the rest of that message is discarded and ReadyForQuery is issued. In either case, any subsequent CopyData, CopyDone, or CopyFail messages issued by the frontend will simply be dropped.

如果是 simple query 遇到錯誤 Postgres Server 會忽略之後過來的 message 並發出 ReadyForQuery 給 client 端之後才可以當作結束 Copy Operation。extended query 則是遇到錯誤之後,除非有收到 client 端的 sync message ,Postgres Server 才會發 ReadyForQuery 給 client 端,否則就會一直忽略過來的 message,就無法結束 Copy Operation。

而 Copy 在 simple query 或是 extended query protocol 下都可以使用 binary format 的方式,與前面我們說的一般的 query 的方式是不同的,主要原因我個人覺得是因為在 Copy command 本身就可以指定 format 的格式,主要是三種:

  1. Text format
  2. CSV format
  3. Binary format

所以其實只要你所使用的 client library 有指定 binary format 並且有幫你 encode 成 binary 基本上不管使用 simple query 還是 extended query protocol 都可以達到節省頻寬的效果。

來看 Copy Operation In pgx library 的效果

我們先透過一段 code 來做 Copy 指令操作,並且開啟 wireshark 來看一下封包的傳輸 message flow 是否符合前面所說的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
const pgURL = "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable"

type KV struct {
Key pgtype.UUID
Value pgtype.Bytea
Timestamp pgtype.Timestamptz
}

func main() {
ctx := context.Background()
conn, err := pgx.Connect(ctx, pgURL)
if err != nil {
panic(err)
}
defer conn.Close(ctx)

kvs := genKVs(1)
err = Copy(ctx, conn, kvs)
if err != nil {
panic(err)
}

kvs = genKVs(1)
err = Copy(ctx, conn, kvs)
if err != nil {
panic(err)
}
}

func Copy(ctx context.Context, conn *pgx.Conn, kvs []KV) error {
start := time.Now()
rows := make([][]interface{}, len(kvs))
for i := range kvs {
rows[i] = []interface{}{
kvs[i].Key,
kvs[i].Value,
kvs[i].Timestamp,
}
}
_, err := conn.CopyFrom(ctx, pgx.Identifier{"kvs"}, []string{"key", "value", "timestamp"}, pgx.CopyFromRows(rows))
if err != nil {
return err
}
fmt.Println("Copy timing:", time.Since(start))
return nil
}

func genKVs(length int) []KV {
kvs := make([]KV, 0, length)
for i := 0; i < length; i++ {
key := pgtype.UUID{}
key.Set(uuid.NewString())
value := pgtype.Bytea{Bytes: make([]byte, 10), Status: pgtype.Present}
rand.Read(value.Bytes)
timestamp := pgtype.Timestamptz{}
timestamp.Set(time.Now())
kvs = append(kvs, KV{
Key: key,
Value: value,
Timestamp: timestamp,
})
}
return kvs
}

KV struct 代表的是 kvs table 所包含哪些欄位,這邊可以注意我使用的是 pgtype,這個是由 pgx 所提供的 library,之所以要用這樣是因為它提供的 type 有 encode binary format 的方式,這樣傳輸的過程中才能使用正確的 binary format。

Copy operation 這邊封裝在 Copy func 裡面,這樣需要先 for loop 封裝 row data 來餵給 CopyFrom func。

在 main func 這邊可以注意到我執行兩次 Copy function,這是因為等等要給大家看在 wireshark 擷取到的封包兩次的 copy operation 並且使用同一個 conn 有沒有做 cache 的操作,來加速下一次 copy operation。

執行之後來看一下 wireshark 的封包:

figure4

前面的 No. 47 ~ 56 分別是一開始連接 Postgres 的 message type,真正的 copyFrom func 所帶來的封包從 58 ~ 84,總計兩次 copy operation。

但是 No. 58 的封包卻是:

figure5

這邊 Copy operation,竟然也發出 Parse Message,事實上這個是不必要的,但是 pgx library 卻先發出這個 message。

No. 60 回的封包是:

figure6

因為有 Parse 及 Describe 所以這邊 Postgres 會回傳 Parse completion and Row description。

No. 62 回的封包是:

figure7

從這邊可以知道開始回 Copy Message Flow,並且這邊可以看出來是使用 Simple Query 的方式,且有指定 Copy From 的 data 要是 binary format 的形式。

figure8

接著 Postgres Server 回傳 CopyIn response,並且 Format 指定為 Binary。

figure9

再來是 Copy data,將資料傳輸過去,這邊要注意的是 Length 為 71,但是仔細看下面的資料表示是由 hex 表示,這是 wireshark 為了方便 user 看而轉的,實際上 hex size 會是 71 的兩倍左右,代表 71 是已經轉成 binary format 的長度。

Copy Data 完成後就會發出 Copy completion

figure9

接著 Postgres 就會發出 Command completion and Ready for query

figure9

這樣就結束一整個 copy operation 了。

而第二次的 copy operation,根據前面的圖你會知道又發出相同的 Parse message,由此可知 parse 過後的 statement,pgx library 並沒有進行 cache 的操作,也就是說即使是相同的 statement 也會多花一次 RTT 的時間來進行 Parse。

追蹤 pgx source code 來找原因

那追蹤 pgx source code,就可以知道為什麼需要先發 Parse message:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (ct *copyFrom) run(ctx context.Context) (int64, error) {
quotedTableName := ct.tableName.Sanitize()
cbuf := &bytes.Buffer{}
for i, cn := range ct.columnNames {
if i != 0 {
cbuf.WriteString(", ")
}
cbuf.WriteString(quoteIdentifier(cn))
}
quotedColumnNames := cbuf.String()

sd, err := ct.conn.Prepare(ctx, "", fmt.Sprintf("select %s from %s", quotedColumnNames, quotedTableName))
if err != nil {
return 0, err
}
...
}

這邊是再跑 copy 指令前,先跑了一個 Prepare statement 的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (c *Conn) Prepare(ctx context.Context, name, sql string) (sd *pgconn.StatementDescription, err error) {
if name != "" {
var ok bool
if sd, ok = c.preparedStatements[name]; ok && sd.SQL == sql {
return sd, nil
}
}

if c.shouldLog(LogLevelError) {
defer func() {
if err != nil {
c.log(ctx, LogLevelError, "Prepare failed", map[string]interface{}{"err": err, "name": name, "sql": sql})
}
}()
}

sd, err = c.pgConn.Prepare(ctx, name, sql, nil)
if err != nil {
return nil, err
}

if name != "" {
c.preparedStatements[name] = sd
}

return sd, nil
}

這邊進來之後,發現 prepareStatement 是 unnamed 所以也無法從 map cache 起來,要到 c.pgConn.Prepare 這邊會發出真正的 Parse message 出去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (pgConn *PgConn) Prepare(ctx context.Context, name, sql string, paramOIDs []uint32) (*StatementDescription, error) {
if err := pgConn.lock(); err != nil {
return nil, err
}
defer pgConn.unlock()

if ctx != context.Background() {
select {
case <-ctx.Done():
return nil, newContextAlreadyDoneError(ctx)
default:
}
pgConn.contextWatcher.Watch(ctx)
defer pgConn.contextWatcher.Unwatch()
}

buf := pgConn.wbuf
buf = (&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs}).Encode(buf)
buf = (&pgproto3.Describe{ObjectType: 'S', Name: name}).Encode(buf)
buf = (&pgproto3.Sync{}).Encode(buf)
...
}

可以發現有發出 Parse,Describe,Sync,如同在 wireshark 看到的一樣。

那為什麼 pgx 要先發出 Parse Message 呢?原因在於 Library 想做貼心的設計:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
moreRows := true
for moreRows {
var err error
moreRows, buf, err = ct.buildCopyBuf(buf, sd)
if err != nil {
w.CloseWithError(err)
return
}

if ct.rowSrc.Err() != nil {
w.CloseWithError(ct.rowSrc.Err())
return
}

if len(buf) > 0 {
_, err = w.Write(buf)
if err != nil {
w.Close()
return
}
}

buf = buf[:0]
}

在 buildCopyBuf func 裡面需要將所有的 values 進行 encoding 的動作,也就是前面所說轉成 binary format 的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (ct *copyFrom) buildCopyBuf(buf []byte, sd *pgconn.StatementDescription) (bool, []byte, error) {

for ct.rowSrc.Next() {
values, err := ct.rowSrc.Values()
if err != nil {
return false, nil, err
}
if len(values) != len(ct.columnNames) {
return false, nil, fmt.Errorf("expected %d values, got %d values", len(ct.columnNames), len(values))
}

buf = pgio.AppendInt16(buf, int16(len(ct.columnNames)))
for i, val := range values {
buf, err = encodePreparedStatementArgument(ct.conn.connInfo, buf, sd.Fields[i].DataTypeOID, val)
if err != nil {
return false, nil, err
}
}

if len(buf) > 65536 {
return true, buf, nil
}
}

return false, buf, nil
}

這邊可以看到透過我們給進去的 rows data,會進行 for loop 並呼叫 encodePreparedStatementArgument func:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
func encodePreparedStatementArgument(ci *pgtype.ConnInfo, buf []byte, oid uint32, arg interface{}) ([]byte, error) {
if arg == nil {
return pgio.AppendInt32(buf, -1), nil
}

switch arg := arg.(type) {
case pgtype.BinaryEncoder:
sp := len(buf)
buf = pgio.AppendInt32(buf, -1)
argBuf, err := arg.EncodeBinary(ci, buf)
if err != nil {
return nil, err
}
if argBuf != nil {
buf = argBuf
pgio.SetInt32(buf[sp:], int32(len(buf[sp:])-4))
}
return buf, nil
case pgtype.TextEncoder:
sp := len(buf)
buf = pgio.AppendInt32(buf, -1)
argBuf, err := arg.EncodeText(ci, buf)
if err != nil {
return nil, err
}
if argBuf != nil {
buf = argBuf
pgio.SetInt32(buf[sp:], int32(len(buf[sp:])-4))
}
return buf, nil
case string:
buf = pgio.AppendInt32(buf, int32(len(arg)))
buf = append(buf, arg...)
return buf, nil
}

refVal := reflect.ValueOf(arg)

if refVal.Kind() == reflect.Ptr {
if refVal.IsNil() {
return pgio.AppendInt32(buf, -1), nil
}
arg = refVal.Elem().Interface()
return encodePreparedStatementArgument(ci, buf, oid, arg)
}

if dt, ok := ci.DataTypeForOID(oid); ok {
value := dt.Value
err := value.Set(arg)
if err != nil {
{
if arg, ok := arg.(driver.Valuer); ok {
v, err := callValuerValue(arg)
if err != nil {
return nil, err
}
return encodePreparedStatementArgument(ci, buf, oid, v)
}
}

return nil, err
}

sp := len(buf)
buf = pgio.AppendInt32(buf, -1)
argBuf, err := value.(pgtype.BinaryEncoder).EncodeBinary(ci, buf)
if err != nil {
return nil, err
}
if argBuf != nil {
buf = argBuf
pgio.SetInt32(buf[sp:], int32(len(buf[sp:])-4))
}
return buf, nil
}

if strippedArg, ok := stripNamedType(&refVal); ok {
return encodePreparedStatementArgument(ci, buf, oid, strippedArg)
}
return nil, SerializationError(fmt.Sprintf("Cannot encode %T into oid %v - %T must implement Encoder or be converted to a string", arg, oid, arg))
}

這裡的 func 滿長的,但是可以看到第一個 switch case 就是 binary format type 比較,基本上只要用了 pgx 提供的 type,就有支援 binary format,所以其實在第一個 case 就會比對成功並 return 結束該 func。但是之所以需要用到 Parse Message,是因為在最下面的時候:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if dt, ok := ci.DataTypeForOID(oid); ok {
value := dt.Value
err := value.Set(arg)
if err != nil {
{
if arg, ok := arg.(driver.Valuer); ok {
v, err := callValuerValue(arg)
if err != nil {
return nil, err
}
return encodePreparedStatementArgument(ci, buf, oid, v)
}
}

return nil, err
}

sp := len(buf)
buf = pgio.AppendInt32(buf, -1)
argBuf, err := value.(pgtype.BinaryEncoder).EncodeBinary(ci, buf)
if err != nil {
return nil, err
}
if argBuf != nil {
buf = argBuf
pgio.SetInt32(buf[sp:], int32(len(buf[sp:])-4))
}
return buf, nil
}

如果前面的 switch case 都不符合,那麼 pgx 這邊就會幫你 setType,而要怎麼設定?就是看當初發 Describe Message 拿回來的 RowDescription 的 Type OID 的值來做判斷。

因為 pgx 本身就有把 Postgres 所定義好的 OID Type 存起來了:

1
2
3
4
5
6
7
8
9
10
11
func NewConnInfo() *ConnInfo {
ci := newConnInfo()

ci.RegisterDataType(DataType{Value: &ACLItemArray{}, Name: "_aclitem", OID: ACLItemArrayOID})
ci.RegisterDataType(DataType{Value: &BoolArray{}, Name: "_bool", OID: BoolArrayOID})
ci.RegisterDataType(DataType{Value: &BPCharArray{}, Name: "_bpchar", OID: BPCharArrayOID})
ci.RegisterDataType(DataType{Value: &ByteaArray{}, Name: "_bytea", OID: ByteaArrayOID})
ci.RegisterDataType(DataType{Value: &CIDRArray{}, Name: "_cidr", OID: CIDRArrayOID})
ci.RegisterDataType(DataType{Value: &DateArray{}, Name: "_date", OID: DateArrayOID})
...
}

所以如果去對照 wireshark 的對應每個欄位的 OID Type,就可以找對應的 type 了,因此 pgx 這樣做可以想成是貼心的設計。但如果對於講求效能而言,就顯得有點多餘了。因為我自己是決定這邊可以不用做 Parse Message 這段並且當 switch case 這邊沒比對到就跳出去就好,並 return error 給 User 看就可以了,這邊的步驟個人覺得不是很必要。

此外,由於 CopyFrom 的操作是透過 pgconn.PgConn type 去做操作的,pgconn.PgConn 是 pgx 封裝更 low-level 的 Postgres Connection 來用的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// PgConn is a low-level PostgreSQL connection handle. It is not safe for concurrent usage.
type PgConn struct {
conn net.Conn // the underlying TCP or unix domain socket connection
pid uint32 // backend pid
secretKey uint32 // key to use to send a cancel query message to the server
parameterStatuses map[string]string // parameters that have been reported by the server
txStatus byte
frontend Frontend

config *Config

status byte // One of connStatus* constants

bufferingReceive bool
bufferingReceiveMux sync.Mutex
bufferingReceiveMsg pgproto3.BackendMessage
bufferingReceiveErr error

peekedMsg pgproto3.BackendMessage

// Reusable / preallocated resources
wbuf []byte // write buffer
resultReader ResultReader
multiResultReader MultiResultReader
contextWatcher *ctxwatch.ContextWatcher

cleanupDone chan struct{}
}

所以,是沒有做 parsed statement cache 操作的,但是一般使用的話,都是使用 pgx.Conn Type,是將 pgconn.PgConn 再進行一層封裝,並且是有加上 cache parsed statement 的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Conn is a PostgreSQL connection handle. It is not safe for concurrent usage. Use a connection pool to manage access
// to multiple database connections from multiple goroutines.
type Conn struct {
pgConn *pgconn.PgConn
config *ConnConfig // config used when establishing this connection
preparedStatements map[string]*pgconn.StatementDescription
stmtcache stmtcache.Cache
logger Logger
logLevel LogLevel

notifications []*pgconn.Notification

doneChan chan struct{}
closedChan chan error

connInfo *pgtype.ConnInfo

wbuf []byte
preallocatedRows []connRows
eqb extendedQueryBuilder
}

stmtcache 這個就是了。

Benchmark 比較

最後來看一下,Insert 與 Batch Insert 與 Copy 的 benchmark 比較:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package main

import (
"context"
"fmt"
"math/rand"
"strings"
"time"

"github.com/google/uuid"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
)

const pgURL = "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable"

type KV struct {
Key pgtype.UUID
Value pgtype.Bytea
Timestamp pgtype.Timestamptz
}

func main() {
ctx := context.Background()
conn, err := pgx.Connect(ctx, pgURL)
if err != nil {
panic(err)
}
defer conn.Close(ctx)

kvs := genKVs(30)

err = Insert(ctx, conn, kvs)
if err != nil {
panic(err)
}

conn.Exec(ctx, "TRUNCATE kvs")

err = BatchInsert(ctx, conn, kvs)
if err != nil {
panic(err)
}

conn.Exec(ctx, "TRUNCATE kvs")

err = BatchInsert(ctx, conn, kvs)
if err != nil {
panic(err)
}

conn.Exec(ctx, "TRUNCATE kvs")

err = Copy(ctx, conn, kvs)
if err != nil {
panic(err)
}
}

func Insert(ctx context.Context, conn *pgx.Conn, kvs []KV) error {
start := time.Now()
query := "INSERT INTO kvs VALUES ($1::uuid, $2::bytea, $3::timestamptz)"
for _, kv := range kvs {
_, err := conn.Exec(ctx, query, kv.Key, kv.Value, kv.Timestamp)
if err != nil {
return err
}
}
fmt.Println("Insert timing:", time.Since(start))
return nil
}

func BatchInsert(ctx context.Context, conn *pgx.Conn, kvs []KV) error {
query, arguments := buildInsertQuery(kvs)
start := time.Now()
_, err := conn.Exec(ctx, query, arguments...)
if err != nil {
return err
}
fmt.Println("BatchInsert timing:", time.Since(start))
return nil
}

func Copy(ctx context.Context, conn *pgx.Conn, kvs []KV) error {
start := time.Now()
rows := make([][]interface{}, len(kvs))
for i := range kvs {
rows[i] = []interface{}{
kvs[i].Key,
kvs[i].Value,
kvs[i].Timestamp,
}
}
_, err := conn.CopyFrom(ctx, pgx.Identifier{"kvs"}, []string{"key", "value", "timestamp"}, pgx.CopyFromRows(rows))
if err != nil {
return err
}
fmt.Println("Copy timing:", time.Since(start))
return nil
}

func buildInsertQuery(kvs []KV) (string, []interface{}) {
batchInsertQuery := `INSERT INTO kvs (key,value,timestamp) VALUES %s`
arguments := make([]interface{}, 3 * len(kvs))
builder := strings.Builder{}
placeHolderIndex := 0
for _, kv := range kvs {
builder.WriteString(fmt.Sprintf("($%d::uuid,$%d::bytea,$%d::timestamptz), ", placeHolderIndex+1, placeHolderIndex+2, placeHolderIndex+3))
arguments[placeHolderIndex] = kv.Key
arguments[placeHolderIndex+1] = kv.Value
arguments[placeHolderIndex+2] = kv.Timestamp
placeHolderIndex += 3
}
values := strings.TrimSuffix(builder.String(), ", ")
query := fmt.Sprintf(batchInsertQuery, values)
return query, arguments
}

func genKVs(length int) []KV {
kvs := make([]KV, 0, length)
for i := 0; i < length; i++ {
key := pgtype.UUID{}
key.Set(uuid.NewString())
value := pgtype.Bytea{Bytes: make([]byte, 11), Status: pgtype.Present}
rand.Read(value.Bytes)
timestamp := pgtype.Timestamptz{}
timestamp.Set(time.Now())
kvs = append(kvs, KV{
Key: key,
Value: value,
Timestamp: timestamp,
})
}
return kvs
}

這邊 BatchInsert 去除了要建立 sql string 的時間,可以直接將不同 size 的 sql string 直接存在 memory,這樣會更快。實務上這樣用也是很不錯的。此外,這邊我多加測試了 Batch Insert 一次,為的就是 demo 當 prepareStatement 被 cache 起來的速度是否會有提升的效果。

而像 copy 這邊比較麻煩的是就還是要先經過 for loop 來將 row value 轉成 slice。

我們分別來測試 30,100,500,1000 的結果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 30 kvs
Insert timing: 15.35309ms
BatchInsert timing: 1.940585ms
BatchInsert timing: 1.816376ms
Copy timing: 1.090684ms
// 100 kvs
Insert timing: 17.309962ms
BatchInsert timing: 4.157046ms
BatchInsert timing: 2.472445ms
Copy timing: 2.953885ms
// 500 kvs
Insert timing: 82.910251ms
BatchInsert timing: 9.58655ms
BatchInsert timing: 7.784343ms
Copy timing: 5.083384ms
// 1000 kvs
Insert timing: 141.031883ms
BatchInsert timing: 16.636126ms
BatchInsert timing: 13.481752ms
Copy timing: 9.691968ms

由此可知,cache 過後的 prepareStatement 確實有提升速度,而其實在數量小的情況下 Batch Insert 與 Copy 可以說是非常接近,這樣的情況下是 Copy 多了 Parse 的 RTT,本身 Copy Operation 會需要 2 RTT,所以這樣總計是 3 RTT,而 Batch Insert 如果使用 cache prepareStatement 只需要 1 RTT 就可以完成,這也是為什麼在數量小的情況下 Batch Insert 與 Copy 是差不多快的。

但是在數量大的時候,Copy 的優勢還是展現出來了,而 Postgres 官方也是建議如果有需要大量的 data 批次輸入或是輸出,還是推薦使用 Copy Operation 的,所以我個人是認為如果情境上始終會是小數量,那麼也許 Batch Insert 就足夠用了,但如果未來可能會有擴充並會有大量的資料 insert 的情境,可以使用 Copy 的操作。

最後,其實我有嘗試 fork pgx repo,並且把 Copy 需要用到 Parse 的那段註解掉,順便提一下用 mod replace 可以很方便地將第三方的 lib 轉成是你自己 forked repo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
module github.com/KennyChenFight/postgres-copy-operation

go 1.17

require (
github.com/google/uuid v1.3.0
github.com/jackc/pgtype v1.8.1
github.com/jackc/pgx/v4 v4.13.0
)

require (
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.10.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.1.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect
golang.org/x/text v0.3.6 // indirect
)

replace github.com/jackc/pgx/v4 v4.13.0 => github.com/KennyChenFight/pgx/v4 v4.13.1

我們再來看一下 benchmark:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 30 kvs
Insert timing: 8.49292ms
BatchInsert timing: 1.425321ms
BatchInsert timing: 851.699µs
Copy timing: 799.978µs
// 100 kvs
Insert timing: 22.671558ms
BatchInsert timing: 2.274748ms
BatchInsert timing: 1.656872ms
Copy timing: 1.109263ms
// 500 kvs
Insert timing: 81.632275ms
BatchInsert timing: 8.566272ms
BatchInsert timing: 7.255335ms
Copy timing: 4.766602ms
// 1000 kvs
Insert timing: 205.473422ms
BatchInsert timing: 16.566703ms
BatchInsert timing: 12.840035ms
Copy timing: 10.130884ms

畢竟少了一個 RTT,Copy 的速度還是有提升的,所以我自己決定如果很講求效能的話,是可以 fork 下來再自己改的,或者發個 PR 讓 pgx 作者採用吧~

詳情可以看我的 fork repo:https://github.com/KennyChenFight/pgx

總結

這次是在工作中學到了如何使用 Copy 指令,並且學習了 pgx library 的 source code。在大量資料匯入的時候,有 Insert,Batch Insert,Copy 的選擇,端看想要的情境來做使用,而效能上由於 Batch Insert 與 Copy 有 RTT 次數的差別,但是在底層設計上還是 Copy 的速度會比較快,因此在小數量的情況下,兩者都 ok,如果是大數量的情況下推薦還是使用 Copy 會比較好。

參考資料: