kakakakakku blog

Weekly Tech Blog: Keep on Learning!

Golang でジョブのスケジューリング実行ができる JobRunner を試した

最近 Golang でジョブを非同期にスケジューリング実行するような仕組みを実装していて,要件に合っていた JobRunner を使った.特徴としては,様々なスケジューリングを定義できる点と,API を起動してプロセスを常駐させる点にある.

github.com

基本実装

最初に動く実装を載せておく.ザッと書いてみると,このようになる.5秒間隔で MyJob を実行していて,標準出力にログを吐いている.また Gin を起動してプロセスを常駐させている.さらに JobRunner のステータスを API から取得できるようにもしている.

package main

import (
    "fmt"
    "net/http"

    "github.com/bamzi/jobrunner"
    "github.com/gin-gonic/gin"
)

// MyJob ...
type MyJob struct {
}

func main() {
    jobrunner.Start()
    jobrunner.Schedule("@every 5s", MyJob{})

    gin.SetMode(gin.ReleaseMode)
    r := gin.Default()
    r.GET("/jobrunner/status", JobJSON)
    r.Run(":8080")
}

// JobJSON ...
func JobJSON(c *gin.Context) {
    c.JSON(http.StatusOK, jobrunner.StatusJson())
}

// Run ...
func (e MyJob) Run() {
    fmt.Println("Run MyJob!")
}

実行すると,以下のように5秒間隔でログが出力できる.

$ go run main.go
[JobRunner] 2017/08/13 - 11:10:00 Started...
Run MyJob!
Run MyJob!
Run MyJob!
Run MyJob!
Run MyJob!
Run MyJob!

ステータスは以下のように取得できる.

$ curl -s http://localhost:8080/jobrunner/status | jq .
{
  "jobrunner": [
    {
      "Id": 1,
      "JobRunner": {
        "Name": "MyJob",
        "Status": "IDLE",
        "Latency": "77.204µs"
      },
      "Next": "2017-08-13T11:15:14+09:00",
      "Prev": "2017-08-13T11:15:09+09:00"
    }
  ]
}

スケジューリング

JobRunner では,様々なスケジューリングを定義できる.内部的には gopkg.in/robfig/cron.v2 を利用しているため,cron 形式でスケジューリングを定義することができたり,シンプルに一定間隔で実行することもできる.例えば,以下のように定義することができる.

// Schedule : 5秒間隔で実行する
jobrunner.Schedule("@every 5s", MyJob{})

// Schedule : 1時間間隔で実行する
jobrunner.Schedule("@hourly", MyJob{})

// Schedule : 2分間隔で実行する (cron 形式)
jobrunner.Schedule("* */2 * * * *", MyJob{})

// Every : 10分間隔で実行する
jobrunner.Every(10*time.Minute, MyJob{})

// In : 10秒後に実行する
jobrunner.In(10*time.Second, MyJob{})

// Now : 即時実行する
jobrunner.Now(MyJob{})

詳しくは cron.v2 のドキュメントに載っている.

godoc.org

API

今回は Gin で実装をしたが,JobRunner では README に書かれている通り,特定の API に依存することはなく,好きなものを選択することができる.

  • Gin
  • Echo
  • Martini
  • Beego

などなど.

並列実行

README にあまり詳しく書かれていないが,JobRunner は並列実行をサポートしている.init.go を読んで,実際に動作確認をして,やっと理解できた.

func Start(v ...int) {
    MainCron = cron.New()

    if len(v) > 0 {
        if v[0] > 0 {
            workPermits = make(chan struct{}, v[0])
        } else {
            workPermits = make(chan struct{}, DEFAULT_JOB_POOL_SIZE)
        }
    }

    if len(v) > 1 {
        if v[1] > 0 {
            selfConcurrent = true
        } else {
            selfConcurrent = false
        }
    }

    MainCron.Start()

    fmt.Printf("%s[JobRunner] %v Started... %s \n",
        magenta, time.Now().Format("2006/01/02 - 15:04:05"), reset)

}

まず jobrunner.Start() の引数は int の可変長引数になっている.結局のところ,第二引数を指定しない限りは selfConcurrent = false となるため,並列実行はされないようになっていた.

  • 第一引数 : プールサイズ(デフォルト : 10)
  • 第二引数 : 並列実行数(デフォルト : 並列実行なし)

検証 : jobrunner.Start(10)

まず,ログ出力を拡張して,30秒待機するようにしてみた.

package main

import (
    "fmt"
    "net/http"
    "time"

    "github.com/bamzi/jobrunner"
    "github.com/gin-gonic/gin"
)

// MyJob ...
type MyJob struct {
}

func main() {
    jobrunner.Start(10)
    jobrunner.Schedule("@every 5s", MyJob{})

    gin.SetMode(gin.ReleaseMode)
    r := gin.Default()
    r.GET("/jobrunner/status", JobJSON)
    r.Run(":8080")
}

// JobJSON ...
func JobJSON(c *gin.Context) {
    c.JSON(http.StatusOK, jobrunner.StatusJson())
}

// Run ...
func (e MyJob) Run() {
    fmt.Println("[Start] Run MyJob!")
    time.Sleep(30 * time.Second)
    fmt.Println("[End] Run MyJob!")
}

この状態で jobrunner.Start(10) を実行すると,以下のようになった.30秒の待機が優先されて,並列実行はされなかった.

$ go run main.go
[Start] Run MyJob!
[End] Run MyJob!
[Start] Run MyJob!
[End] Run MyJob!
[Start] Run MyJob!
[End] Run MyJob!
[Start] Run MyJob!
[End] Run MyJob!

検証 : jobrunner.Start(10, 5)

次に jobrunner.Start(10, 5) と書いて,並列実行数を指定してみた.

func main() {
    jobrunner.Start(10, 5)
}

すると,並列実行となった.JobRunner を使うときには,理解しておくべきポイントだと思う.

$ go run main.go
[Start] Run MyJob!
[Start] Run MyJob!
[Start] Run MyJob!
[Start] Run MyJob!
[Start] Run MyJob!
[Start] Run MyJob!
[End] Run MyJob!
[Start] Run MyJob!
[Start] Run MyJob!
[End] Run MyJob!
[Start] Run MyJob!
[End] Run MyJob!

まとめ

  • JobRunner を使うと柔軟なスケジューリング定義でジョブを実行することができる
  • Gin や Echo など,任意の API を起動してプロセスを常駐させる仕組みになっている
  • 並列実行をする場合は jobrunner.Start() の引数を意識する