Golang - delayed task 設計思路

今天來講講 delayed task (延遲任務) 的 system design,有哪些方法以及每個方法的適用場景。延遲任務我想應該是常見的需求,舉例來說 CronJob 其實也是延遲任務的一種,只是可能 Cronjob 就是在固定的時間點做同一件事情這樣。那麼本文章的延遲任務指的是,今天有許多 task 是可以設定不同的時間點去執行的,就像是月曆上面可以設定你之後每天的行程是一樣的概念。再延伸想,當你排定好的行程快到了,像是 Google Calendar 就會在十分鐘前提醒你該行程快到了,這也是延遲任務的一種概念。

應用場景

這邊先列出會用到延遲任務的場景:

  1. 當消費者下了訂單,等了多久都沒有支付,系統自動取消
  2. 在 calendar 上排好了行程,當行程快到或是到的時候要提醒使用者
  3. 排程發文,像是 facebook 就有提供類似的功能,可以設定在未來的某個時間點可以自動發文

這些應用場景的共通點就是在未來的某個時間點要做某件事情,關鍵在於在那個時間點如何主動 / 被動觸發?並且當然如果你的場景要求的是任務需要很準確的執行的話,那 latency 就要想辦法降低。

設計思路

我自己覺得所有的 system design 的東西最先搞清楚的就是應用場景的到底多高,因為量的高低會決定你要採用哪種設計方案。有些方案的 demo 實作有放在 github 可以參考。

以下來談談四種設計方案:

方案一:為每一個 delayed task 設定 timer

這個方案應該是最直覺,且理論上任務較能準確地執行。以 Golang 來說你可以為每一個設定 time.NewTimer(2 * time.Second) 或是透過 time.After(2 * time.Second) 搭配 select 來使用。或者是可以用 TimeWheel 演算法 來設計 timer。

優點

  1. 時間較準確
  2. 設計簡單

缺點

  1. 不好 trace 及 debug 每個 task 的 timer
  2. 存在 memory 的 timer 重啟後會不見,底層需要有外部的資料庫去存 task

在量很少及有做 trace 的機制這樣的方案可以考慮。

方案二:Postgres 存 delayed task

前面方案二我們知道需要外部的資料庫去存 delayed task,因此方案二也是最常使用的方法,例如透過 Postgres 去存所有的 delayed task,並且每 5 seconds 去掃一次資料庫,挑出已經過期的 task,並且拿出來執行。這邊要多久掃一次就看你的延遲任務要多精確地執行,當然也可以 for loop 無限循環去掃。

優點

  1. 設計簡單

缺點

  1. 當每一次掃出來 delayed task 的量很多且又要求延遲任務要精確執行,那勢必 Postgres 的 loading 會變重。

    這時就需要加上 index ,假設 task 上面有 status & started_at 這兩個欄位,status = queued 代表要排程的狀態,started_at 代表任務開始時間。這時候 index 要怎麼設計才好?

    • index for (status, started_at):如果常常會將有撈取 status = xxx 的條件,建議將 status 放在前面,而搭配 started_at 的效果則是可以加速找某段時間內的 task。通常 queued task 會比 done queued 少,再加上 started_at 的篩選則可以有效降低每次拉取的量。
  2. 這種方案理論上 scale 會比較困難,因為你一定要用上 Postgres,要提升撈取的速度不外乎就是要降低撈取的數量,那也許可以採用 partition,例如建立每一天的 partition for task,這也是一種方式。

在量屬於中等的話我覺得可以採用這樣的方案。

方案三:在方案二的基底下加上 redis 作為 delayed queue

向 Postgres 撈取畢竟還是會比向 redis 撈取還要慢,那不如 redis 做成 delayed queue,Postgres 則是負責持久化我們 task 的資料,必要的時就是更新 task 的 status 的作用。

如果 redis 要做成 delayed queue,最常用的方法就是使用 ZSET 來做到期時間的排序,例如:

1
2
3
4
5
6
7
8
9
10
11
12
for _, task := range tasks {
member, err := json.Marshal(task)
if err != nil {
panic(err)
}
if _, err := redisClient.ZAdd(ctx, delayKey, &redis.Z{
Score: float64(time.Now().Add(delay).Unix()),
Member: member,
}).Result(); err != nil {
panic(err)
}
}

ZSET 本身是一個 sorted set,底層是 hash table 實現的,所以新增,删除,搜尋的時間複雜度都是 O (1),我們將 delay 作為 Score 來達到有序的效果,Member 則是 task object。

那如果要取得 ready task 呢?

1
2
3
4
5
6
7
local message = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'WITHSCORES');
if #message > 0 then
redis.call('ZREM', KEYS[1], unpack(message));
return message;
else
return nil;
end

因為如果取得 ready task 之後就需要刪除 task,那麼包在 lua 可以保證這兩個操作可以 atomic。那麼 ZRANGEBYSCORE 指令主要是需要給上兩個參數,也就是想取得 score 的區間,以我們情境應該會是:-inf ~ NOW (),NOW () 這個參數就會是上面的 ARGV [1]。也就是在過去無限小的時間到現在的時間內,所有的 member 都應該是要執行的 task。

藉由這樣的方式撈取出來並且使用 ZREM 進行移除。

這樣的 ZRANGEBYSCORE 的時間複雜度為:O (log (N)+M),因此可以想辦法來減少 sorted set 的量:

通常延遲任務我們只需要排今天的就可以了,未來的 task 不需要現在就塞到 redis,因此可以固定一個 cronjob 在半夜的時候將今天的延遲任務塞進去。減少 sorted set 的量。

優點

  1. 取得 task 的速度會更快
  2. 未來 scale 的彈性很大,redis 也可以改用 cluster mode 來提升 redis 可用性,甚至 sorted set 可以根據 task id 來做分散,必且開多個 consumer 去 get ready task,來提升效率。

缺點

  1. 架構複雜
  2. redis 仍然有掉資料的風險,並且資料存在 memory 上,需要做一些 recovery 的機制來讓 task 重新排程上

方案四:在方案二的基底下,使用 RabbitMQ 作為 delayed queue

那麼如果使用 RabbitMQ 來實現 delayed queue 有沒有機會?有機會。實現原理是透過 dead letter 來達到,因為我們知道 queue 的 message 會有所謂的 TTL 或是 expiration 等參數可以設定,一但過期的 message 就會被丟到 dead letter exchange,那麼如果 task 的 delay task 就是 TTL 呢?這樣是不是我們的 consumer 只要去讀 dead letter exchange 就可以實現 delayed queue 的效果了。

雖然這樣很完美,但是有個致命的缺點是,每個 message 雖然可以設定不同的 expiration,但是 RabbitMQ 的設計是只有當最前面的 message 被消費時才會判斷是否過期,也才會丟到 dead letter。也就是說今天如果是:

A (3 second) -> B (5 second) -> C (10 second) 那麼只有當過期 10 second 後,才會被判斷過期,這時候 B 跟 A 才會跟著出來。所以即便 A 跟 B 應該要優先被丟到 dead letter,在這種設計下無法達到。

但是社群中總有大神去發明相關的 plugin 來達到我們前面說的效果:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

但是實現原理其實是來自於前面的方案一 + 方案二的組合:

  1. 底層是將 message 存到 mnesia 的分散式資料庫,所以會需要考量 message 是否會成功存在 disk,不然重啟後就會不見。這邊可以參考:

    Delayed messages are stored in a Mnesia table (also see Limitations below) with a single disk replica on the current node. They will survive a node restart. While timer(s) that triggered scheduled delivery are not persisted, it will be re-initialised during plugin activation on node start. Obviously, only having one copy of a scheduled message in a cluster means that losing that node or disabling the plugin on it will lose the messages residing on that node.

  2. 而之所以根據時間去觸發 message 則是透過 Erlang 本身自帶的 timers 去實現的,因此:

    After a certain number of long lived timers in the system they begin to compete for scheduler resources and time drift accumulates.

    相關討論可以參考該 issue:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72

    但是總結來說如果你的量是少於數十萬或是數百萬以內,我想應該是沒問題。

優點

  1. 設計簡單
  2. scale 也是方便,可以輕易擴充 publisher & consumer

缺點

  1. plugin 本身有其限制:在數量不能太多的情況下,是不是可以採用只丟當天的 task 到 queue 就可以了呢?可以的。所以相對來說是可以解決 plugin 本身的限制問題

總結

整體來說,我還是很推薦方案四的,因為我覺得他能應付的量級應該符合大多數的需求,且使用上也方便。但如果要極致的優化我自己覺得方案二會是量級很大的最終解。再來其實 task 到底要不要很即時的觸發也是一個關鍵的點,如果大可以延遲好幾分鐘那麼如果有做 partition 或是 index 的優化,那麼簡單方案二不失為一個好方法。