今天要來介紹的是 Copy Operation in Postgres 的用法,此外也比較與 Insert ,Batch Insert 之間的速度比較。之所以想講這個是因為最近在工作上有需要大量 insert record 的需求,那因為是使用 Postgres 的關係,所以開始研究 Copy 的使用方式,並且深入探討了一下 Copy 使用 simple query 及 extended query protocol 請求方式的差別。這邊會透過 wireshark 來看封包分析一下~
Simple Query V.S Extended Query Protocol
再深入講 Copy 之前,必須先知道 Postgres 相關的 message flow 的設計。
Message Flow
Postgres 有所謂的 Message Flow ,簡單來說這個是指 client 如何對 Postgres Server 進行傳輸,而 Postgres 本身會有定義好的 message type 來做溝通,相關介紹可以看官方文件會更清楚:
https://www.postgresql.org/docs/13/protocol-flow.html
Simple Query
那麼在 message flow 就有幾種 message type 是 for Simple Query 來定義的,Simple Query 可以想成 client 端要對 Postgres 進行 SQL 語句查詢,因此會透過發出 Query message,Postgres 收到 Query Message 就會 response DataRow message type 告訴 client 端有哪些資料。
可以看這張圖會很清楚:
Client 端發出 Query Message 給 Postgres Server,特別注意這邊的 Query Message 都會是 Text Format 的形式呈現,所以可以理解 simple query 的方式,資料的尺寸會比較高。
Postgres Server 會回傳 RowDescription 及 DataRow 給 Client 端,Row Description 指的是回傳的 DataRaw 的相關資訊,例如每個欄位的名稱,Type OID,長度等的一些 metadata 資料。
當每一次的 SQL command 被 Postgres Server 完成後也會回傳 CommandComplete and ReadyForQuery
只有當 Postgres Server 確實的發出 ReadyForQuery 後,Postgres Server 才會當作這次的 Simple Query 結束了。所以 client 端要自行判斷這次的 simple query 操作在收到 ReadyForQuery 就結束了,下一次發的新 query 會是新的 simple query。所以通常每個語言的 SQL Driver 會需要針對不同的 database 處理這段的。
而通常 RowDescription and DataRow and CommandComplete and ReadyForQuery (如果沒有 client 端沒有其他 query 指令要傳過來的話) 會一起回傳,所以只會算一次 RTT。
如果中間 SQL 指令有錯誤怎麼辦?Postgres Server 如果這邊處理的時候有檢測到 Error,就會發出 ErrorResponse,接著再發出 ReadyForQuery,代表 Postgres Server 結束了這次的 Simple Query。
Extended Query Protocol
Extended Query 的方式會將 simple query 的步驟拆幾個步驟出來,最主要是為了達到 prepareStatement 的效果:
為什麼會需要 parpareStatement 的效果呢?對於 client 端而言可以避免 SQL Injection,此外 client 端這邊也可以將被 Postgres Server Parse 成功後的 statement cache 在 client 端上,下一次要使用相同語句的 query 就不必再發出 Parse 等等的操作。對於 Postgres Server 而言也可以重複使用相同的 statement,而不需要對每一次過來的 query 再進行 parse。
所以根據以上這張圖可以知道:
client 端會需要先發出 Parse 並讓 Postgres Server 回傳 ParseOK,代表成功 Parse statement。
client 端接著會發出 Bind 將參數值與 Parsed Statement 進行綁定,並回傳 BindOK,而通常這時候 Postgres 就會計算該 Query 的 execute plan 並 cache 起來以待之後 client 端發出 Execute Message 就可以直接執行。
client 端可以選擇要不要發出 Describe,主要是可以跟 Postgres Server 拿到前面所說的 Row Description 等資訊。
client 端會發出 Execute,要求 Postgres Server 執行 Bind 過後的 statement,並回傳 DataRow,接著回傳 CommandComplete。
client 端會發出 Sync,來當做 Extended Query 的操作結束,當 Postgres Server 收到後才會回傳 ReadyForQuery,才真的代表該次 Query 正式結束。Sync 的目的是為了當 Postgres Server 這邊遇到錯誤的時候可回復正常狀態,例如當在處理 Extended Query Protocol 時遇到錯誤時,Postrgres Server 會發出 ErrorResponse,接著這時 client 發出的任何 message Postgres Server 這邊都會讀取並忽略,直到有收到 Sync,才會發出 ReadyForQuery 並返回到正常的消息處理。
最後過程中的參數 bind 與 return 的 dataRow 都可以是 Binary Format 而不需要是 Text Format,詳細的 message format 可以參考:https://www.postgresql.org/docs/13/protocol-message-formats.html
最後要知道你所使用程式語言 client library 是使用 simple query 還是 extended query protocol,除了看它們的官方文件也是可以透過 wireshark 的方式去看封包就可以知道了,如果對於 Golang 的一些 library 的比較,可以看大神我同事的文章
而今天的 library 測試 Copy 是 simple query 還是 extended query protocol 就是使用 pgx ,用這個 library 好處是會自動選用 extended query protocol 的方式來傳輸以及提供多達 70 多種的 type 都是可以 encode 成 binary format 的,所以可以大幅的節省頻寬。
Copy Operation
在看 Copy Message 傳輸方式,先來講講 Copy 的介紹,Copy 本身指令來由就是 Postgres 為了快速的 input/output data 而用的,所以它的 operation 比起 insert 底層實作是不太一樣的,有進行特別的優化。
Copy 操作本身分為三種方式:
Copy-In:用來 import data
Copy-Out:用來 export data
Copy-Both:用來雙向傳輸資料之用
相關細節可以參考官方文件:https://www.postgresql.org/docs/13/sql-copy.html
優點
速度比 Batch Insert 更快
client 不需要組 sql string
用法上更簡單
copy 只當作一個 operation
缺點
擷取官方文件說明:COPY
stops operation at the first error. This should not lead to problems in the event of a COPY TO
, but the target table will already have received earlier rows in a COPY FROM
. These rows will not be visible or accessible, but they still occupy disk space. This might amount to a considerable amount of wasted disk space if the failure happened well into a large copy operation. You might wish to invoke VACUUM
to recover the wasted space.
Copy 如果遇到錯誤,例如 conflict (constraint error),之前 insert 進去的 record 不會存在,但是還是會佔據 disk space,如果在大量的 copy 下遇到錯誤,可能要使用 VACUUM 來清理
Copy 無法做到 upsert 的操作
那麼 Copy 的 message flow 又長怎樣呢?來看一下:
client 端會需要發出 Copy table FROM STDIN 告訴 Postgres Server 現在要走 copy-in,並回傳 CopyInResponse。
client 端接著就會發出多筆的 CopyData 將資料傳輸過去,如果資料傳輸完畢就會再發出 CopyComplete 給 Postgres Server,Postgres Server 收到 CopyData 後就會開始 import data 進去 target table,當收到 CopyComplete 就會發出 CommandComplete 及 ReadyForQuery,來結束這次的 copy operation。
那 copy 在 simple query 與 extended query protocol 差別在哪?
擷取官方文件說明:In the event of a backend-detected error during copy-in mode (including receipt of a CopyFail message), the backend will issue an ErrorResponse message. If the COPY
command was issued via an extended-query message, the backend will now discard frontend messages until a Sync message is received, then it will issue ReadyForQuery and return to normal processing. If the COPY
command was issued in a simple Query message, the rest of that message is discarded and ReadyForQuery is issued. In either case, any subsequent CopyData, CopyDone, or CopyFail messages issued by the frontend will simply be dropped.
如果是 simple query 遇到錯誤 Postgres Server 會忽略之後過來的 message 並發出 ReadyForQuery 給 client 端之後才可以當作結束 Copy Operation。extended query 則是遇到錯誤之後,除非有收到 client 端的 sync message ,Postgres Server 才會發 ReadyForQuery 給 client 端,否則就會一直忽略過來的 message,就無法結束 Copy Operation。
而 Copy 在 simple query 或是 extended query protocol 下都可以使用 binary format 的方式,與前面我們說的一般的 query 的方式是不同的,主要原因我個人覺得是因為在 Copy command 本身就可以指定 format 的格式,主要是三種:
Text format
CSV format
Binary format
所以其實只要你所使用的 client library 有指定 binary format 並且有幫你 encode 成 binary 基本上不管使用 simple query 還是 extended query protocol 都可以達到節省頻寬的效果。
來看 Copy Operation In pgx library 的效果
我們先透過一段 code 來做 Copy 指令操作,並且開啟 wireshark 來看一下封包的傳輸 message flow 是否符合前面所說的方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 const pgURL = "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable" type KV struct { Key pgtype.UUID Value pgtype.Bytea Timestamp pgtype.Timestamptz } func main () { ctx := context.Background() conn, err := pgx.Connect(ctx, pgURL) if err != nil { panic (err) } defer conn.Close(ctx) kvs := genKVs(1 ) err = Copy(ctx, conn, kvs) if err != nil { panic (err) } kvs = genKVs(1 ) err = Copy(ctx, conn, kvs) if err != nil { panic (err) } } func Copy (ctx context.Context, conn *pgx.Conn, kvs []KV) error { start := time.Now() rows := make ([][]interface {}, len (kvs)) for i := range kvs { rows[i] = []interface {}{ kvs[i].Key, kvs[i].Value, kvs[i].Timestamp, } } _, err := conn.CopyFrom(ctx, pgx.Identifier{"kvs" }, []string {"key" , "value" , "timestamp" }, pgx.CopyFromRows(rows)) if err != nil { return err } fmt.Println("Copy timing:" , time.Since(start)) return nil } func genKVs (length int ) []KV { kvs := make ([]KV, 0 , length) for i := 0 ; i < length; i++ { key := pgtype.UUID{} key.Set(uuid.NewString()) value := pgtype.Bytea{Bytes: make ([]byte , 10 ), Status: pgtype.Present} rand.Read(value.Bytes) timestamp := pgtype.Timestamptz{} timestamp.Set(time.Now()) kvs = append (kvs, KV{ Key: key, Value: value, Timestamp: timestamp, }) } return kvs }
KV struct 代表的是 kvs table 所包含哪些欄位,這邊可以注意我使用的是 pgtype,這個是由 pgx 所提供的 library,之所以要用這樣是因為它提供的 type 有 encode binary format 的方式,這樣傳輸的過程中才能使用正確的 binary format。
Copy operation 這邊封裝在 Copy func 裡面,這樣需要先 for loop 封裝 row data 來餵給 CopyFrom func。
在 main func 這邊可以注意到我執行兩次 Copy function,這是因為等等要給大家看在 wireshark 擷取到的封包兩次的 copy operation 並且使用同一個 conn 有沒有做 cache 的操作,來加速下一次 copy operation。
執行之後來看一下 wireshark 的封包:
前面的 No. 47 ~ 56 分別是一開始連接 Postgres 的 message type,真正的 copyFrom func 所帶來的封包從 58 ~ 84,總計兩次 copy operation。
但是 No. 58 的封包卻是:
這邊 Copy operation,竟然也發出 Parse Message,事實上這個是不必要的,但是 pgx library 卻先發出這個 message。
No. 60 回的封包是:
因為有 Parse 及 Describe 所以這邊 Postgres 會回傳 Parse completion and Row description。
No. 62 回的封包是:
從這邊可以知道開始回 Copy Message Flow,並且這邊可以看出來是使用 Simple Query 的方式,且有指定 Copy From 的 data 要是 binary format 的形式。
接著 Postgres Server 回傳 CopyIn response,並且 Format 指定為 Binary。
再來是 Copy data,將資料傳輸過去,這邊要注意的是 Length 為 71,但是仔細看下面的資料表示是由 hex 表示,這是 wireshark 為了方便 user 看而轉的,實際上 hex size 會是 71 的兩倍左右,代表 71 是已經轉成 binary format 的長度。
Copy Data 完成後就會發出 Copy completion
接著 Postgres 就會發出 Command completion and Ready for query
這樣就結束一整個 copy operation 了。
而第二次的 copy operation,根據前面的圖你會知道又發出相同的 Parse message,由此可知 parse 過後的 statement,pgx library 並沒有進行 cache 的操作,也就是說即使是相同的 statement 也會多花一次 RTT 的時間來進行 Parse。
追蹤 pgx source code 來找原因
那追蹤 pgx source code,就可以知道為什麼需要先發 Parse message:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (ct *copyFrom) run(ctx context.Context) (int64 , error ) { quotedTableName := ct.tableName.Sanitize() cbuf := &bytes.Buffer{} for i, cn := range ct.columnNames { if i != 0 { cbuf.WriteString(", " ) } cbuf.WriteString(quoteIdentifier(cn)) } quotedColumnNames := cbuf.String() sd, err := ct.conn.Prepare(ctx, "" , fmt.Sprintf("select %s from %s" , quotedColumnNames, quotedTableName)) if err != nil { return 0 , err } ... }
這邊是再跑 copy 指令前,先跑了一個 Prepare statement 的操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (c *Conn) Prepare(ctx context.Context, name, sql string ) (sd *pgconn.StatementDescription, err error ) { if name != "" { var ok bool if sd, ok = c.preparedStatements[name]; ok && sd.SQL == sql { return sd, nil } } if c.shouldLog(LogLevelError) { defer func () { if err != nil { c.log(ctx, LogLevelError, "Prepare failed" , map [string ]interface {}{"err" : err, "name" : name, "sql" : sql}) } }() } sd, err = c.pgConn.Prepare(ctx, name, sql, nil ) if err != nil { return nil , err } if name != "" { c.preparedStatements[name] = sd } return sd, nil }
這邊進來之後,發現 prepareStatement 是 unnamed 所以也無法從 map cache 起來,要到 c.pgConn.Prepare 這邊會發出真正的 Parse message 出去:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (pgConn *PgConn) Prepare(ctx context.Context, name, sql string , paramOIDs []uint32 ) (*StatementDescription, error ) { if err := pgConn.lock(); err != nil { return nil , err } defer pgConn.unlock() if ctx != context.Background() { select { case <-ctx.Done(): return nil , newContextAlreadyDoneError(ctx) default : } pgConn.contextWatcher.Watch(ctx) defer pgConn.contextWatcher.Unwatch() } buf := pgConn.wbuf buf = (&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs}).Encode(buf) buf = (&pgproto3.Describe{ObjectType: 'S' , Name: name}).Encode(buf) buf = (&pgproto3.Sync{}).Encode(buf) ... }
可以發現有發出 Parse,Describe,Sync,如同在 wireshark 看到的一樣。
那為什麼 pgx 要先發出 Parse Message 呢?原因在於 Library 想做貼心的設計:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 moreRows := true for moreRows { var err error moreRows, buf, err = ct.buildCopyBuf(buf, sd) if err != nil { w.CloseWithError(err) return } if ct.rowSrc.Err() != nil { w.CloseWithError(ct.rowSrc.Err()) return } if len (buf) > 0 { _, err = w.Write(buf) if err != nil { w.Close() return } } buf = buf[:0 ] }
在 buildCopyBuf func 裡面需要將所有的 values 進行 encoding 的動作,也就是前面所說轉成 binary format 的操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (ct *copyFrom) buildCopyBuf(buf []byte , sd *pgconn.StatementDescription) (bool , []byte , error ) { for ct.rowSrc.Next() { values, err := ct.rowSrc.Values() if err != nil { return false , nil , err } if len (values) != len (ct.columnNames) { return false , nil , fmt.Errorf("expected %d values, got %d values" , len (ct.columnNames), len (values)) } buf = pgio.AppendInt16(buf, int16 (len (ct.columnNames))) for i, val := range values { buf, err = encodePreparedStatementArgument(ct.conn.connInfo, buf, sd.Fields[i].DataTypeOID, val) if err != nil { return false , nil , err } } if len (buf) > 65536 { return true , buf, nil } } return false , buf, nil }
這邊可以看到透過我們給進去的 rows data,會進行 for loop 並呼叫 encodePreparedStatementArgument func:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 func encodePreparedStatementArgument (ci *pgtype.ConnInfo, buf []byte , oid uint32 , arg interface {}) ([]byte , error ) { if arg == nil { return pgio.AppendInt32(buf, -1 ), nil } switch arg := arg.(type ) { case pgtype.BinaryEncoder: sp := len (buf) buf = pgio.AppendInt32(buf, -1 ) argBuf, err := arg.EncodeBinary(ci, buf) if err != nil { return nil , err } if argBuf != nil { buf = argBuf pgio.SetInt32(buf[sp:], int32 (len (buf[sp:])-4 )) } return buf, nil case pgtype.TextEncoder: sp := len (buf) buf = pgio.AppendInt32(buf, -1 ) argBuf, err := arg.EncodeText(ci, buf) if err != nil { return nil , err } if argBuf != nil { buf = argBuf pgio.SetInt32(buf[sp:], int32 (len (buf[sp:])-4 )) } return buf, nil case string : buf = pgio.AppendInt32(buf, int32 (len (arg))) buf = append (buf, arg...) return buf, nil } refVal := reflect.ValueOf(arg) if refVal.Kind() == reflect.Ptr { if refVal.IsNil() { return pgio.AppendInt32(buf, -1 ), nil } arg = refVal.Elem().Interface() return encodePreparedStatementArgument(ci, buf, oid, arg) } if dt, ok := ci.DataTypeForOID(oid); ok { value := dt.Value err := value.Set(arg) if err != nil { { if arg, ok := arg.(driver.Valuer); ok { v, err := callValuerValue(arg) if err != nil { return nil , err } return encodePreparedStatementArgument(ci, buf, oid, v) } } return nil , err } sp := len (buf) buf = pgio.AppendInt32(buf, -1 ) argBuf, err := value.(pgtype.BinaryEncoder).EncodeBinary(ci, buf) if err != nil { return nil , err } if argBuf != nil { buf = argBuf pgio.SetInt32(buf[sp:], int32 (len (buf[sp:])-4 )) } return buf, nil } if strippedArg, ok := stripNamedType(&refVal); ok { return encodePreparedStatementArgument(ci, buf, oid, strippedArg) } return nil , SerializationError(fmt.Sprintf("Cannot encode %T into oid %v - %T must implement Encoder or be converted to a string" , arg, oid, arg)) }
這裡的 func 滿長的,但是可以看到第一個 switch case 就是 binary format type 比較,基本上只要用了 pgx 提供的 type,就有支援 binary format,所以其實在第一個 case 就會比對成功並 return 結束該 func。但是之所以需要用到 Parse Message,是因為在最下面的時候:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 if dt, ok := ci.DataTypeForOID(oid); ok { value := dt.Value err := value.Set(arg) if err != nil { { if arg, ok := arg.(driver.Valuer); ok { v, err := callValuerValue(arg) if err != nil { return nil , err } return encodePreparedStatementArgument(ci, buf, oid, v) } } return nil , err } sp := len (buf) buf = pgio.AppendInt32(buf, -1 ) argBuf, err := value.(pgtype.BinaryEncoder).EncodeBinary(ci, buf) if err != nil { return nil , err } if argBuf != nil { buf = argBuf pgio.SetInt32(buf[sp:], int32 (len (buf[sp:])-4 )) } return buf, nil }
如果前面的 switch case 都不符合,那麼 pgx 這邊就會幫你 setType,而要怎麼設定?就是看當初發 Describe Message 拿回來的 RowDescription 的 Type OID 的值來做判斷。
因為 pgx 本身就有把 Postgres 所定義好的 OID Type 存起來了:
1 2 3 4 5 6 7 8 9 10 11 func NewConnInfo () *ConnInfo { ci := newConnInfo() ci.RegisterDataType(DataType{Value: &ACLItemArray{}, Name: "_aclitem" , OID: ACLItemArrayOID}) ci.RegisterDataType(DataType{Value: &BoolArray{}, Name: "_bool" , OID: BoolArrayOID}) ci.RegisterDataType(DataType{Value: &BPCharArray{}, Name: "_bpchar" , OID: BPCharArrayOID}) ci.RegisterDataType(DataType{Value: &ByteaArray{}, Name: "_bytea" , OID: ByteaArrayOID}) ci.RegisterDataType(DataType{Value: &CIDRArray{}, Name: "_cidr" , OID: CIDRArrayOID}) ci.RegisterDataType(DataType{Value: &DateArray{}, Name: "_date" , OID: DateArrayOID}) ... }
所以如果去對照 wireshark 的對應每個欄位的 OID Type,就可以找對應的 type 了,因此 pgx 這樣做可以想成是貼心的設計。但如果對於講求效能而言,就顯得有點多餘了。因為我自己是決定這邊可以不用做 Parse Message 這段並且當 switch case 這邊沒比對到就跳出去就好,並 return error 給 User 看就可以了,這邊的步驟個人覺得不是很必要。
此外,由於 CopyFrom 的操作是透過 pgconn.PgConn type 去做操作的,pgconn.PgConn 是 pgx 封裝更 low-level 的 Postgres Connection 來用的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 type PgConn struct { conn net.Conn pid uint32 secretKey uint32 parameterStatuses map [string ]string txStatus byte frontend Frontend config *Config status byte bufferingReceive bool bufferingReceiveMux sync.Mutex bufferingReceiveMsg pgproto3.BackendMessage bufferingReceiveErr error peekedMsg pgproto3.BackendMessage wbuf []byte resultReader ResultReader multiResultReader MultiResultReader contextWatcher *ctxwatch.ContextWatcher cleanupDone chan struct {} }
所以,是沒有做 parsed statement cache 操作的,但是一般使用的話,都是使用 pgx.Conn Type,是將 pgconn.PgConn 再進行一層封裝,並且是有加上 cache parsed statement 的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 type Conn struct { pgConn *pgconn.PgConn config *ConnConfig preparedStatements map [string ]*pgconn.StatementDescription stmtcache stmtcache.Cache logger Logger logLevel LogLevel notifications []*pgconn.Notification doneChan chan struct {} closedChan chan error connInfo *pgtype.ConnInfo wbuf []byte preallocatedRows []connRows eqb extendedQueryBuilder }
stmtcache 這個就是了。
Benchmark 比較
最後來看一下,Insert 與 Batch Insert 與 Copy 的 benchmark 比較:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 package mainimport ( "context" "fmt" "math/rand" "strings" "time" "github.com/google/uuid" "github.com/jackc/pgtype" "github.com/jackc/pgx/v4" ) const pgURL = "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable" type KV struct { Key pgtype.UUID Value pgtype.Bytea Timestamp pgtype.Timestamptz } func main () { ctx := context.Background() conn, err := pgx.Connect(ctx, pgURL) if err != nil { panic (err) } defer conn.Close(ctx) kvs := genKVs(30 ) err = Insert(ctx, conn, kvs) if err != nil { panic (err) } conn.Exec(ctx, "TRUNCATE kvs" ) err = BatchInsert(ctx, conn, kvs) if err != nil { panic (err) } conn.Exec(ctx, "TRUNCATE kvs" ) err = BatchInsert(ctx, conn, kvs) if err != nil { panic (err) } conn.Exec(ctx, "TRUNCATE kvs" ) err = Copy(ctx, conn, kvs) if err != nil { panic (err) } } func Insert (ctx context.Context, conn *pgx.Conn, kvs []KV) error { start := time.Now() query := "INSERT INTO kvs VALUES ($1::uuid, $2::bytea, $3::timestamptz)" for _, kv := range kvs { _, err := conn.Exec(ctx, query, kv.Key, kv.Value, kv.Timestamp) if err != nil { return err } } fmt.Println("Insert timing:" , time.Since(start)) return nil } func BatchInsert (ctx context.Context, conn *pgx.Conn, kvs []KV) error { query, arguments := buildInsertQuery(kvs) start := time.Now() _, err := conn.Exec(ctx, query, arguments...) if err != nil { return err } fmt.Println("BatchInsert timing:" , time.Since(start)) return nil } func Copy (ctx context.Context, conn *pgx.Conn, kvs []KV) error { start := time.Now() rows := make ([][]interface {}, len (kvs)) for i := range kvs { rows[i] = []interface {}{ kvs[i].Key, kvs[i].Value, kvs[i].Timestamp, } } _, err := conn.CopyFrom(ctx, pgx.Identifier{"kvs" }, []string {"key" , "value" , "timestamp" }, pgx.CopyFromRows(rows)) if err != nil { return err } fmt.Println("Copy timing:" , time.Since(start)) return nil } func buildInsertQuery (kvs []KV) (string , []interface {}) { batchInsertQuery := `INSERT INTO kvs (key,value,timestamp) VALUES %s` arguments := make ([]interface {}, 3 * len (kvs)) builder := strings.Builder{} placeHolderIndex := 0 for _, kv := range kvs { builder.WriteString(fmt.Sprintf("($%d::uuid,$%d::bytea,$%d::timestamptz), " , placeHolderIndex+1 , placeHolderIndex+2 , placeHolderIndex+3 )) arguments[placeHolderIndex] = kv.Key arguments[placeHolderIndex+1 ] = kv.Value arguments[placeHolderIndex+2 ] = kv.Timestamp placeHolderIndex += 3 } values := strings.TrimSuffix(builder.String(), ", " ) query := fmt.Sprintf(batchInsertQuery, values) return query, arguments } func genKVs (length int ) []KV { kvs := make ([]KV, 0 , length) for i := 0 ; i < length; i++ { key := pgtype.UUID{} key.Set(uuid.NewString()) value := pgtype.Bytea{Bytes: make ([]byte , 11 ), Status: pgtype.Present} rand.Read(value.Bytes) timestamp := pgtype.Timestamptz{} timestamp.Set(time.Now()) kvs = append (kvs, KV{ Key: key, Value: value, Timestamp: timestamp, }) } return kvs }
這邊 BatchInsert 去除了要建立 sql string 的時間,可以直接將不同 size 的 sql string 直接存在 memory,這樣會更快。實務上這樣用也是很不錯的。此外,這邊我多加測試了 Batch Insert 一次,為的就是 demo 當 prepareStatement 被 cache 起來的速度是否會有提升的效果。
而像 copy 這邊比較麻煩的是就還是要先經過 for loop 來將 row value 轉成 slice。
我們分別來測試 30,100,500,1000 的結果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Insert timing: 15.35309 ms BatchInsert timing: 1.940585 ms BatchInsert timing: 1.816376 ms Copy timing: 1.090684 ms Insert timing: 17.309962 ms BatchInsert timing: 4.157046 ms BatchInsert timing: 2.472445 ms Copy timing: 2.953885 ms Insert timing: 82.910251 ms BatchInsert timing: 9.58655 ms BatchInsert timing: 7.784343 ms Copy timing: 5.083384 ms Insert timing: 141.031883 ms BatchInsert timing: 16.636126 ms BatchInsert timing: 13.481752 ms Copy timing: 9.691968 ms
由此可知,cache 過後的 prepareStatement 確實有提升速度,而其實在數量小的情況下 Batch Insert 與 Copy 可以說是非常接近,這樣的情況下是 Copy 多了 Parse 的 RTT,本身 Copy Operation 會需要 2 RTT,所以這樣總計是 3 RTT,而 Batch Insert 如果使用 cache prepareStatement 只需要 1 RTT 就可以完成,這也是為什麼在數量小的情況下 Batch Insert 與 Copy 是差不多快的。
但是在數量大的時候,Copy 的優勢還是展現出來了,而 Postgres 官方也是建議如果有需要大量的 data 批次輸入或是輸出,還是推薦使用 Copy Operation 的,所以我個人是認為如果情境上始終會是小數量,那麼也許 Batch Insert 就足夠用了,但如果未來可能會有擴充並會有大量的資料 insert 的情境,可以使用 Copy 的操作。
最後,其實我有嘗試 fork pgx repo,並且把 Copy 需要用到 Parse 的那段註解掉,順便提一下用 mod replace 可以很方便地將第三方的 lib 轉成是你自己 forked repo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 module github.com/KennyChenFight/postgres-copy -operation go 1.17 require ( github.com/google/uuid v1.3 .0 github.com/jackc/pgtype v1.8 .1 github.com/jackc/pgx/v4 v4.13 .0 ) require ( github.com/jackc/chunkreader/v2 v2.0 .1 github.com/jackc/pgconn v1.10 .0 github.com/jackc/pgio v1.0 .0 github.com/jackc/pgpassfile v1.0 .0 github.com/jackc/pgproto3/v2 v2.1 .1 github.com/jackc/pgservicefile v0.0 .0 -20200714003250 -2 b9c44734f2b golang.org/x/crypto v0.0 .0 -20210711020723 -a769d52b0f97 golang.org/x/text v0.3 .6 ) replace github.com/jackc/pgx/v4 v4.13 .0 => github.com/KennyChenFight/pgx/v4 v4.13 .1
我們再來看一下 benchmark:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Insert timing: 8.49292 ms BatchInsert timing: 1.425321 ms BatchInsert timing: 851.699 µs Copy timing: 799.978 µs Insert timing: 22.671558 ms BatchInsert timing: 2.274748 ms BatchInsert timing: 1.656872 ms Copy timing: 1.109263 ms Insert timing: 81.632275 ms BatchInsert timing: 8.566272 ms BatchInsert timing: 7.255335 ms Copy timing: 4.766602 ms Insert timing: 205.473422 ms BatchInsert timing: 16.566703 ms BatchInsert timing: 12.840035 ms Copy timing: 10.130884 ms
畢竟少了一個 RTT,Copy 的速度還是有提升的,所以我自己決定如果很講求效能的話,是可以 fork 下來再自己改的,或者發個 PR 讓 pgx 作者採用吧~
詳情可以看我的 fork repo:https://github.com/KennyChenFight/pgx
總結
這次是在工作中學到了如何使用 Copy 指令,並且學習了 pgx library 的 source code。在大量資料匯入的時候,有 Insert,Batch Insert,Copy 的選擇,端看想要的情境來做使用,而效能上由於 Batch Insert 與 Copy 有 RTT 次數的差別,但是在底層設計上還是 Copy 的速度會比較快,因此在小數量的情況下,兩者都 ok,如果是大數量的情況下推薦還是使用 Copy 會比較好。
參考資料: