最近 Golang でジョブを非同期にスケジューリング実行するような仕組みを実装していて,要件に合っていた JobRunner を使った.特徴としては,様々なスケジューリングを定義できる点と,API を起動してプロセスを常駐させる点にある.
基本実装
最初に動く実装を載せておく.ザッと書いてみると,このようになる.5秒間隔で MyJob
を実行していて,標準出力にログを吐いている.また Gin を起動してプロセスを常駐させている.さらに JobRunner のステータスを API から取得できるようにもしている.
package main import ( "fmt" "net/http" "github.com/bamzi/jobrunner" "github.com/gin-gonic/gin" ) // MyJob ... type MyJob struct { } func main() { jobrunner.Start() jobrunner.Schedule("@every 5s", MyJob{}) gin.SetMode(gin.ReleaseMode) r := gin.Default() r.GET("/jobrunner/status", JobJSON) r.Run(":8080") } // JobJSON ... func JobJSON(c *gin.Context) { c.JSON(http.StatusOK, jobrunner.StatusJson()) } // Run ... func (e MyJob) Run() { fmt.Println("Run MyJob!") }
実行すると,以下のように5秒間隔でログが出力できる.
$ go run main.go [JobRunner] 2017/08/13 - 11:10:00 Started... Run MyJob! Run MyJob! Run MyJob! Run MyJob! Run MyJob! Run MyJob!
ステータスは以下のように取得できる.
$ curl -s http://localhost:8080/jobrunner/status | jq . { "jobrunner": [ { "Id": 1, "JobRunner": { "Name": "MyJob", "Status": "IDLE", "Latency": "77.204µs" }, "Next": "2017-08-13T11:15:14+09:00", "Prev": "2017-08-13T11:15:09+09:00" } ] }
スケジューリング
JobRunner では,様々なスケジューリングを定義できる.内部的には gopkg.in/robfig/cron.v2
を利用しているため,cron 形式でスケジューリングを定義することができたり,シンプルに一定間隔で実行することもできる.例えば,以下のように定義することができる.
// Schedule : 5秒間隔で実行する jobrunner.Schedule("@every 5s", MyJob{}) // Schedule : 1時間間隔で実行する jobrunner.Schedule("@hourly", MyJob{}) // Schedule : 2分間隔で実行する (cron 形式) jobrunner.Schedule("* */2 * * * *", MyJob{}) // Every : 10分間隔で実行する jobrunner.Every(10*time.Minute, MyJob{}) // In : 10秒後に実行する jobrunner.In(10*time.Second, MyJob{}) // Now : 即時実行する jobrunner.Now(MyJob{})
詳しくは cron.v2 のドキュメントに載っている.
API
今回は Gin で実装をしたが,JobRunner では README に書かれている通り,特定の API に依存することはなく,好きなものを選択することができる.
- Gin
- Echo
- Martini
- Beego
などなど.
並列実行
README にあまり詳しく書かれていないが,JobRunner は並列実行をサポートしている.init.go
を読んで,実際に動作確認をして,やっと理解できた.
func Start(v ...int) { MainCron = cron.New() if len(v) > 0 { if v[0] > 0 { workPermits = make(chan struct{}, v[0]) } else { workPermits = make(chan struct{}, DEFAULT_JOB_POOL_SIZE) } } if len(v) > 1 { if v[1] > 0 { selfConcurrent = true } else { selfConcurrent = false } } MainCron.Start() fmt.Printf("%s[JobRunner] %v Started... %s \n", magenta, time.Now().Format("2006/01/02 - 15:04:05"), reset) }
まず jobrunner.Start()
の引数は int の可変長引数になっている.結局のところ,第二引数を指定しない限りは selfConcurrent = false
となるため,並列実行はされないようになっていた.
- 第一引数 : プールサイズ(デフォルト : 10)
- 第二引数 : 並列実行数(デフォルト : 並列実行なし)
検証 : jobrunner.Start(10)
まず,ログ出力を拡張して,30秒待機するようにしてみた.
package main import ( "fmt" "net/http" "time" "github.com/bamzi/jobrunner" "github.com/gin-gonic/gin" ) // MyJob ... type MyJob struct { } func main() { jobrunner.Start(10) jobrunner.Schedule("@every 5s", MyJob{}) gin.SetMode(gin.ReleaseMode) r := gin.Default() r.GET("/jobrunner/status", JobJSON) r.Run(":8080") } // JobJSON ... func JobJSON(c *gin.Context) { c.JSON(http.StatusOK, jobrunner.StatusJson()) } // Run ... func (e MyJob) Run() { fmt.Println("[Start] Run MyJob!") time.Sleep(30 * time.Second) fmt.Println("[End] Run MyJob!") }
この状態で jobrunner.Start(10)
を実行すると,以下のようになった.30秒の待機が優先されて,並列実行はされなかった.
$ go run main.go [Start] Run MyJob! [End] Run MyJob! [Start] Run MyJob! [End] Run MyJob! [Start] Run MyJob! [End] Run MyJob! [Start] Run MyJob! [End] Run MyJob!
検証 : jobrunner.Start(10, 5)
次に jobrunner.Start(10, 5)
と書いて,並列実行数を指定してみた.
func main() { jobrunner.Start(10, 5) }
すると,並列実行となった.JobRunner を使うときには,理解しておくべきポイントだと思う.
$ go run main.go [Start] Run MyJob! [Start] Run MyJob! [Start] Run MyJob! [Start] Run MyJob! [Start] Run MyJob! [Start] Run MyJob! [End] Run MyJob! [Start] Run MyJob! [Start] Run MyJob! [End] Run MyJob! [Start] Run MyJob! [End] Run MyJob!
まとめ
- JobRunner を使うと柔軟なスケジューリング定義でジョブを実行することができる
- Gin や Echo など,任意の API を起動してプロセスを常駐させる仕組みになっている
- 並列実行をする場合は
jobrunner.Start()
の引数を意識する