Golang - delayed task 設計思路
今天來講講 delayed task (延遲任務) 的 system design,有哪些方法以及每個方法的適用場景。延遲任務我想應該是常見的需求,舉例來說 CronJob 其實也是延遲任務的一種,只是可能 Cronjob 就是在固定的時間點做同一件事情這樣。那麼本文章的延遲任務指的是,今天有許多 task 是可以設定不同的時間點去執行的,就像是月曆上面可以設定你之後每天的行程是一樣的概念。再延伸想,當你排定好的行程快到了,像是 Google Calendar 就會在十分鐘前提醒你該行程快到了,這也是延遲任務的一種概念。
應用場景
這邊先列出會用到延遲任務的場景:
- 當消費者下了訂單,等了多久都沒有支付,系統自動取消
- 在 calendar 上排好了行程,當行程快到或是到的時候要提醒使用者
- 排程發文,像是 facebook 就有提供類似的功能,可以設定在未來的某個時間點可以自動發文
這些應用場景的共通點就是在未來的某個時間點要做某件事情,關鍵在於在那個時間點如何主動 / 被動觸發?並且當然如果你的場景要求的是任務需要很準確的執行的話,那 latency 就要想辦法降低。
設計思路
我自己覺得所有的 system design 的東西最先搞清楚的就是應用場景的量到底多高,因為量的高低會決定你要採用哪種設計方案。有些方案的 demo 實作有放在 github 可以參考。
以下來談談四種設計方案:
方案一:為每一個 delayed task 設定 timer
這個方案應該是最直覺,且理論上任務較能準確地執行。以 Golang 來說你可以為每一個設定 time.NewTimer(2 * time.Second)
或是透過 time.After(2 * time.Second)
搭配 select
來使用。或者是可以用 TimeWheel 演算法 來設計 timer。
優點
- 時間較準確
- 設計簡單
缺點
- 不好 trace 及 debug 每個 task 的 timer
- 存在 memory 的 timer 重啟後會不見,底層需要有外部的資料庫去存 task
在量很少及有做 trace 的機制這樣的方案可以考慮。
方案二:Postgres 存 delayed task
前面方案二我們知道需要外部的資料庫去存 delayed task,因此方案二也是最常使用的方法,例如透過 Postgres 去存所有的 delayed task,並且每 5 seconds 去掃一次資料庫,挑出已經過期的 task,並且拿出來執行。這邊要多久掃一次就看你的延遲任務要多精確地執行,當然也可以 for loop 無限循環去掃。
優點
- 設計簡單
缺點
-
當每一次掃出來 delayed task 的量很多且又要求延遲任務要精確執行,那勢必 Postgres 的 loading 會變重。
這時就需要加上 index ,假設 task 上面有 status & started_at 這兩個欄位,status = queued 代表要排程的狀態,started_at 代表任務開始時間。這時候 index 要怎麼設計才好?
- index for (status, started_at):如果常常會將有撈取 status = xxx 的條件,建議將 status 放在前面,而搭配 started_at 的效果則是可以加速找某段時間內的 task。通常 queued task 會比 done queued 少,再加上 started_at 的篩選則可以有效降低每次拉取的量。
-
這種方案理論上 scale 會比較困難,因為你一定要用上 Postgres,要提升撈取的速度不外乎就是要降低撈取的數量,那也許可以採用 partition,例如建立每一天的 partition for task,這也是一種方式。
在量屬於中等的話我覺得可以採用這樣的方案。
方案三:在方案二的基底下加上 redis 作為 delayed queue
向 Postgres 撈取畢竟還是會比向 redis 撈取還要慢,那不如 redis 做成 delayed queue,Postgres 則是負責持久化我們 task 的資料,必要的時就是更新 task 的 status 的作用。
如果 redis 要做成 delayed queue,最常用的方法就是使用 ZSET 來做到期時間的排序,例如:
1 | for _, task := range tasks { |
ZSET 本身是一個 sorted set,底層是 hash table 實現的,所以新增,删除,搜尋的時間複雜度都是 O (1),我們將 delay 作為 Score 來達到有序的效果,Member 則是 task object。
那如果要取得 ready task 呢?
1 | local message = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'WITHSCORES'); |
因為如果取得 ready task 之後就需要刪除 task,那麼包在 lua 可以保證這兩個操作可以 atomic。那麼 ZRANGEBYSCORE 指令主要是需要給上兩個參數,也就是想取得 score 的區間,以我們情境應該會是:-inf ~ NOW (),NOW () 這個參數就會是上面的 ARGV [1]。也就是在過去無限小的時間到現在的時間內,所有的 member 都應該是要執行的 task。
藉由這樣的方式撈取出來並且使用 ZREM 進行移除。
這樣的 ZRANGEBYSCORE 的時間複雜度為:O (log (N)+M),因此可以想辦法來減少 sorted set 的量:
通常延遲任務我們只需要排今天的就可以了,未來的 task 不需要現在就塞到 redis,因此可以固定一個 cronjob 在半夜的時候將今天的延遲任務塞進去。減少 sorted set 的量。
優點
- 取得 task 的速度會更快
- 未來 scale 的彈性很大,redis 也可以改用 cluster mode 來提升 redis 可用性,甚至 sorted set 可以根據 task id 來做分散,必且開多個 consumer 去 get ready task,來提升效率。
缺點
- 架構複雜
- redis 仍然有掉資料的風險,並且資料存在 memory 上,需要做一些 recovery 的機制來讓 task 重新排程上
方案四:在方案二的基底下,使用 RabbitMQ 作為 delayed queue
那麼如果使用 RabbitMQ 來實現 delayed queue 有沒有機會?有機會。實現原理是透過 dead letter 來達到,因為我們知道 queue 的 message 會有所謂的 TTL 或是 expiration 等參數可以設定,一但過期的 message 就會被丟到 dead letter exchange,那麼如果 task 的 delay task 就是 TTL 呢?這樣是不是我們的 consumer 只要去讀 dead letter exchange 就可以實現 delayed queue 的效果了。
雖然這樣很完美,但是有個致命的缺點是,每個 message 雖然可以設定不同的 expiration,但是 RabbitMQ 的設計是只有當最前面的 message 被消費時才會判斷是否過期,也才會丟到 dead letter。也就是說今天如果是:
A (3 second) -> B (5 second) -> C (10 second) 那麼只有當過期 10 second 後,才會被判斷過期,這時候 B 跟 A 才會跟著出來。所以即便 A 跟 B 應該要優先被丟到 dead letter,在這種設計下無法達到。
但是社群中總有大神去發明相關的 plugin 來達到我們前面說的效果:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
但是實現原理其實是來自於前面的方案一 + 方案二的組合:
-
底層是將 message 存到 mnesia 的分散式資料庫,所以會需要考量 message 是否會成功存在 disk,不然重啟後就會不見。這邊可以參考:
Delayed messages are stored in a Mnesia table (also see Limitations below) with a single disk replica on the current node. They will survive a node restart. While timer(s) that triggered scheduled delivery are not persisted, it will be re-initialised during plugin activation on node start. Obviously, only having one copy of a scheduled message in a cluster means that losing that node or disabling the plugin on it will lose the messages residing on that node.
-
而之所以根據時間去觸發 message 則是透過 Erlang 本身自帶的 timers 去實現的,因此:
After a certain number of long lived timers in the system they begin to compete for scheduler resources and time drift accumulates.
相關討論可以參考該 issue:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72
但是總結來說如果你的量是少於數十萬或是數百萬以內,我想應該是沒問題。
優點
- 設計簡單
- scale 也是方便,可以輕易擴充 publisher & consumer
缺點
- plugin 本身有其限制:在數量不能太多的情況下,是不是可以採用只丟當天的 task 到 queue 就可以了呢?可以的。所以相對來說是可以解決 plugin 本身的限制問題
總結
整體來說,我還是很推薦方案四的,因為我覺得他能應付的量級應該符合大多數的需求,且使用上也方便。但如果要極致的優化我自己覺得方案二會是量級很大的最終解。再來其實 task 到底要不要很即時的觸發也是一個關鍵的點,如果大可以延遲好幾分鐘那麼如果有做 partition 或是 index 的優化,那麼簡單方案二不失為一個好方法。