diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..686d208 --- /dev/null +++ b/.gitignore @@ -0,0 +1,76 @@ +# Created by .ignore support plugin (hsz.mobi) +### Go template +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/modules.xml +# .idea/*.iml +# .idea/modules + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + diff --git a/.travis.yml b/.travis.yml index ecd88e9..54c903a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,4 +2,14 @@ language: go go: - 1.9.x - - 1.10.x \ No newline at end of file + - 1.10.x + - 1.11.x + +before_install: + - go get -t -v ./... + +script: + - go test -coverprofile=coverage.txt -covermode=atomic + +after_success: + - bash <(curl -s https://codecov.io/bash) \ No newline at end of file diff --git a/examples/basic/main.go b/examples/basic/main.go index 262cf50..c9c530f 100644 --- a/examples/basic/main.go +++ b/examples/basic/main.go @@ -1,6 +1,7 @@ // On this example: -// - 10 workers will be running forever +// - 10 workers will be started up // - 30 jobs will be enqueued to be processed by the workers +// - all workers will be killed after the 30 enqueued jobs get processed package main diff --git a/examples/killed_workers_notifications/main.go b/examples/killed_workers_notifications/main.go new file mode 100644 index 0000000..ba8188d --- /dev/null +++ b/examples/killed_workers_notifications/main.go @@ -0,0 +1,71 @@ +// On this example: +// - 10 workers will be started up +// - the execution will wait until all 10 workers are alive +// - 30 jobs will be enqueued to be processed by the workers +// - all workers will be killed after the 30 enqueued jobs get processed +// - a notification will be sent once a worker is killed (10 notifications will be received) + +package main + +import ( + "fmt" + "log" + "time" + + "github.com/enriquebris/goworkerpool" +) + +func main() { + // total workers + totalInitialWorkers := 10 + // max number of pending jobs + maxNumberPendingJobs := 15 + // do not log messages about the pool processing + verbose := false + + pool := goworkerpool.NewPool(totalInitialWorkers, maxNumberPendingJobs, verbose) + + // add the worker handler function + pool.SetWorkerFunc(func(data interface{}) bool { + log.Printf("processing %v\n", data) + // add a 1 second delay (to makes it look as it were processing the job) + time.Sleep(time.Second) + log.Printf("processing finished for: %v\n", data) + + // let the pool knows that the worker was able to complete the task + return true + }) + + // set the channel to receive notifications every time a worker is killed + killedWorkerNotificationChannel := make(chan int) + pool.SetKilledWorkerChan(killedWorkerNotificationChannel) + + // start up the workers + pool.StartWorkers() + + // enqueue jobs in a separate goroutine + go func() { + for i := 0; i < 30; i++ { + pool.AddTask(i) + } + + // kill all workers after the currently enqueued jobs get processed + pool.LateKillAllWorkers() + }() + + // Instead of use pool.Wait() to wait until all workers are down, the following loop will be listening to the signals + // sent once a worker is killed. The loop will exit after all initial workers were killed. + totalKilledWorkers := 0 + // wait until all initial workers are alive + for notification := range killedWorkerNotificationChannel { + totalKilledWorkers = totalKilledWorkers + notification + fmt.Printf("total killed workers: %v\n", totalKilledWorkers) + + if totalKilledWorkers == totalInitialWorkers { + // break the loop once all initial workers are already up + break + } + } + + fmt.Printf("All %v workers are down\n", totalInitialWorkers) +} diff --git a/examples/new_workers_notifications/main.go b/examples/new_workers_notifications/main.go new file mode 100644 index 0000000..a18af42 --- /dev/null +++ b/examples/new_workers_notifications/main.go @@ -0,0 +1,72 @@ +// On this example: +// - 10 workers will be started up +// - the execution will wait until all 10 workers are alive +// - 30 jobs will be enqueued to be processed by the workers +// - all workers will be killed after the 30 enqueued jobs get processed + +package main + +import ( + "log" + "time" + + "github.com/enriquebris/goworkerpool" +) + +func main() { + // total workers + totalInitialWorkers := 10 + // max number of pending jobs + maxNumberPendingJobs := 15 + // do not log messages about the pool processing + verbose := false + + pool := goworkerpool.NewPool(totalInitialWorkers, maxNumberPendingJobs, verbose) + + // add the worker handler function + pool.SetWorkerFunc(func(data interface{}) bool { + log.Printf("processing %v\n", data) + // add a 1 second delay (to makes it look as it were processing the job) + time.Sleep(time.Second) + log.Printf("processing finished for: %v\n", data) + + // let the pool knows that the worker was able to complete the task + return true + }) + + // set the channel to receive notifications every time a new worker is started up + newWorkerNotificationChannel := make(chan int) + pool.SetNewWorkerChan(newWorkerNotificationChannel) + + + // Note that the following lines (45 to 56) could be replaced by pool.StartWorkersAndWait() to achieve the + // same goal: wait until all workers are up. This code is intended as an example. + + // start up the workers + pool.StartWorkers() + + totalWorkersUp := 0 + // wait until all initial workers are alive + for notification := range newWorkerNotificationChannel { + totalWorkersUp = totalWorkersUp + notification + if totalWorkersUp == totalInitialWorkers { + // break the loop once all initial workers are already up + break + } + } + + log.Printf("%v workers are up\n", totalWorkersUp) + + // enqueue jobs in a separate goroutine + go func() { + for i := 0; i < 30; i++ { + pool.AddTask(i) + } + + // kill all workers after the currently enqueued jobs get processed + pool.LateKillAllWorkers() + }() + + // wait while at least one worker is alive + pool.Wait() +} diff --git a/history.md b/history.md index 27462a9..568580c 100644 --- a/history.md +++ b/history.md @@ -1,5 +1,9 @@ ## History +### v0.9.0 + +- Added optional channel to let know that new workers were started + ### v0.8.0 - Enqueue jobs plus callback functions diff --git a/pool.go b/pool.go index 9e12ead..bf10e36 100644 --- a/pool.go +++ b/pool.go @@ -1,6 +1,6 @@ // ** goworkerpool.com ********************************************************************************************** // ** github.com/enriquebris/goworkerpool ** -// ** v0.8.0 ******************************************************************************************************** +// ** v0.9.0 ******************************************************************************************************** // Package goworkerpool provides a pool of concurrent workers with the ability to increment / decrement / pause / resume workers on demand. package goworkerpool @@ -87,6 +87,8 @@ type Pool struct { initialWorkers int // total live workers totalWorkers int + // mutex to control totalWorkers access + totalWorkersRWMutex sync.RWMutex // tells whether the initialWorkers were started workersStarted bool // tells the workers: do not accept / process new jobs @@ -107,6 +109,10 @@ type Pool struct { fnSuccessChan chan bool // channel to send "immediate" action's signals to workers immediateChan chan byte + // channel to send signals once new workers are up + newWorkerChan chan int + // channel to send signals once a worker is killed + killedWorkerChanel chan int // flag to know whether a Wait() function was called waitFor string @@ -173,6 +179,23 @@ func (st *Pool) initialize(initialWorkers int, maxJobsInQueue int, verbose bool) st.broadMessages.Store(broadMessageKillAllWorkers, false) } +// SetNewWorkerChan sets a channel to receive signals after each new worker is started +func (st *Pool) SetNewWorkerChan(ch chan int) { + st.newWorkerChan = ch +} + +// SetKilledWorkerChan sets a channel to receive signals after worker(s) was/were killed +func (st *Pool) SetKilledWorkerChan(ch chan int) { + st.killedWorkerChanel = ch +} + +// sendKilledWorkerSignal sends an integer signal over a channel (the number of killed workers) +func (st *Pool) sendKilledWorkerSignal(value int) { + if st.killedWorkerChanel != nil { + st.killedWorkerChanel <- value + } +} + // workerListener handles all up/down worker operations && keeps workers stats updated (st.totalWorkers) func (st *Pool) workerListener() { keepListening := true @@ -195,6 +218,7 @@ func (st *Pool) workerListener() { // this case is handled by st.fnSuccessListener() default: + // no waitFor signal } // ************************************************************************************ @@ -214,13 +238,23 @@ func (st *Pool) workerListener() { for i := 0; i < message.Value; i++ { // execute the worker function go st.workerFunc(st.totalWorkers) + + // critical section + st.totalWorkersRWMutex.Lock() st.totalWorkers += 1 + st.totalWorkersRWMutex.Unlock() + // critical section END // check whether all workers were started if !st.workersStarted && st.totalWorkers == st.initialWorkers { // the workers were started st.workersStarted = true } + + // send back a signal to let know that a new worker was started + if st.newWorkerChan != nil { + st.newWorkerChan <- 1 + } } // kill all workers @@ -247,6 +281,9 @@ func (st *Pool) workerListener() { } } + // send signal to let know that a worker was killed + st.sendKilledWorkerSignal(message.Value) + // kill worker(s) case workerActionKill: totalWorkers := st.GetTotalWorkers() @@ -262,6 +299,8 @@ func (st *Pool) workerListener() { // the worker was killed because a "immediate kill" signal case workerActionKillConfirmation: st.totalWorkers -= message.Value + // send signal to let know that a worker was killed + st.sendKilledWorkerSignal(message.Value) // late kill worker(s) case workerActionLateKill: @@ -282,6 +321,8 @@ func (st *Pool) workerListener() { // the worker was killed because a "late kill" signal case workerActionLateKillConfirmation: st.totalWorkers -= message.Value + // send signal to let know that a worker was killed + st.sendKilledWorkerSignal(message.Value) // late kill all workers case workerActionLateKillAllWorkers: @@ -299,11 +340,15 @@ func (st *Pool) workerListener() { // the worker was killed because the immediate channel is closed case workerActionImmediateChanelClosedConfirmation: st.totalWorkers -= message.Value + // send signal to let know that a worker was killed + st.sendKilledWorkerSignal(message.Value) // "panic kill worker" confirmation from the worker // the worker was killed because an unhandled panic case workerActionPanicKillConfirmation: st.totalWorkers -= message.Value + // send signal to let know that a worker was killed + st.sendKilledWorkerSignal(message.Value) // SetTotalWorkers(n) case workerActionSetTotalWorkers: @@ -359,6 +404,10 @@ func (st *Pool) Wait() error { return errors.Errorf(errorNoWorkerFuncMsg, "Wait") } + if st.GetTotalWorkers() == 0 { + return nil + } + // set the waitFor flag for Wait() st.waitFor = waitForWait @@ -509,8 +558,12 @@ func (st *Pool) AddCallback(callbackFn PoolCallback) error { // ** Workers operations ************************************************************************************** // ************************************************************************************************************* -// StartWorkers start all workers. The number of workers was set at the Pool instantiation (NewPool(...) function). -// It will return an error if the worker function was not previously set. +// StartWorkers starts all workers. The number of workers was set at the Pool instantiation (NewPool(...) function). +// +// This is an asynchronous operation, but you can be notified through a channel every time a new worker is started. +// The channel (optional) can be defined at SetNewWorkerChan(chan). +// +// StartWorkers will return an error if the worker function was not previously set. func (st *Pool) StartWorkers() error { var err error for i := 0; i < st.initialWorkers; i++ { @@ -522,6 +575,52 @@ func (st *Pool) StartWorkers() error { return nil } +// StartWorkersAndWait starts all workers and waits until them are alive. +// +// This is a synchronous operation, but can be notified through a channel every time a new worker is started. +// In order to receive the signals on realtime, the listener should be running on a different goroutine. +// +// StartWorkersAndWait will return an error if the worker function was not previously set. +func (st *Pool) StartWorkersAndWait() error { + sendSignalToClient := st.newWorkerChan != nil + tmpClientChan := st.newWorkerChan + + // switch back to the client channel + defer func() { + if sendSignalToClient { + st.SetNewWorkerChan(tmpClientChan) + } + }() + + // custom channel to receive signals for each started up worker + tmpCustomChan := make(chan int, st.initialWorkers) + // temporarily switch to our temp channel + st.SetNewWorkerChan(tmpCustomChan) + + // start up the workers + if err := st.StartWorkers(); err != nil { + return err + } + + totalWorkesUp := 0 + // start listening to "new worker" signals + for message := range tmpCustomChan { + totalWorkesUp += message + + // send "new worker" signals to client + if sendSignalToClient { + tmpClientChan <- message + } + + // exit once the amount of workers == st.initialWorkers + if totalWorkesUp == st.initialWorkers { + break + } + } + + return nil +} + // startWorker starts a worker in a separate goroutine // It will return an error if the worker function was not previously set. func (st *Pool) startWorker() error { @@ -759,7 +858,10 @@ func (st *Pool) workerFunc(n int) { // In case it needs to kill some workers (in order to adjust the total based on the given parameter), it will wait until // their current jobs get processed (in case they are processing jobs). // -// It returns an error in the following scenarios: +// This is an asynchronous operation, but you can be notified through a channel every time a new worker is started. +// The channel (optional) can be defined at SetNewWorkerChan(chan). +// +// SetTotalWorkers returns an error in the following scenarios: // - The workers were not started yet by StartWorkers. // - There is a "in course" KillAllWorkers operation. func (st *Pool) SetTotalWorkers(n int) error { @@ -783,7 +885,11 @@ func (st *Pool) SetTotalWorkers(n int) error { } // AddWorker adds a new worker to the pool. -// It returns an error if at least one of the following statements is true: +// +// This is an asynchronous operation, but you can be notified through a channel every time a new worker is started. +// The channel (optional) can be defined at SetNewWorkerChan(chan). +// +// AddWorker returns an error if at least one of the following statements is true: // - the worker could not be started // - there is a "in course" KillAllWorkers operation func (st *Pool) AddWorker() error { @@ -796,7 +902,11 @@ func (st *Pool) AddWorker() error { } // AddWorkers adds n extra workers to the pool. -// It returns an error if at least one of the following statements are true: +// +// This is an asynchronous operation, but you can be notified through a channel every time a new worker is started. +// The channel (optional) can be defined at SetNewWorkerChan(chan). +// +// AddWorkers returns an error if at least one of the following statements are true: // - the worker could not be started // - there is a "in course" KillAllWorkers operation func (st *Pool) AddWorkers(n int) error { @@ -889,6 +999,17 @@ func (st *Pool) KillAllWorkers() { // - AddWorkers // - SetTotalWorkers func (st *Pool) KillAllWorkersAndWait() { + // critical section + st.totalWorkersRWMutex.RLock() + shouldReturn := st.totalWorkers == 0 + st.totalWorkersRWMutex.RUnlock() + // critical section END + + // exit immediately if total workers == 0 + if shouldReturn { + return + } + // the channel acts both as a channel and as a flag (as a flag to let know the pool that it has to send a signal through the channel once all workers are down) st.waitForActionKillAllWorkersAndWait = make(chan bool) @@ -968,5 +1089,8 @@ func (st *Pool) ResumeAllWorkers() { // GetTotalWorkers returns the number of active/live workers. func (st *Pool) GetTotalWorkers() int { + st.totalWorkersRWMutex.RLock() + defer st.totalWorkersRWMutex.RUnlock() + return st.totalWorkers } diff --git a/pool_test.go b/pool_test.go new file mode 100644 index 0000000..7b7c536 --- /dev/null +++ b/pool_test.go @@ -0,0 +1,465 @@ +package goworkerpool + +import ( + "testing" + "time" + + "github.com/stretchr/testify/suite" +) + +const ( + initialWorkers = 10 +) + +type PoolTestSuite struct { + suite.Suite + pool *Pool +} + +func (suite *PoolTestSuite) SetupTest() { + suite.pool = NewPool(initialWorkers, 10, false) +} + +// *************************************************************************************** +// ** initialization +// *************************************************************************************** + +// no worker's func is expected just after the pool is created +func (suite *PoolTestSuite) TestInitializationNoWorkerFunc() { + suite.Nil(suite.pool.fn, "Worker's func must be nil at initialization") +} + +// no workers are expected to be running just after the pool is created +func (suite *PoolTestSuite) TestInitializationNoWorkersUp() { + suite.Equal(suite.pool.GetTotalWorkers(), 0, "No workers should be up at initialization.") +} + +// success / fail counters are expected to be zero just after the pool is created +func (suite *PoolTestSuite) TestInitializationCountersEqualToZero() { + suite.Equal(suite.pool.fnSuccessCounter, 0, "Success counter should be zero at initialization") + suite.Equal(suite.pool.fnFailCounter, 0, "Fail counter should be zero at initialization") +} + +// workers to start up by StartWorkers() must be the same amount passed to initialize(initialWorkers int, ...) +func (suite *PoolTestSuite) TestInitializationNumberOfWorkers() { + suite.Equalf(suite.pool.initialWorkers, initialWorkers, "Number of workers to start up must be: %v", initialWorkers) +} + +// newWorkerChan (channel to send signals on each new worker spin up) must be nil at initialization +func (suite *PoolTestSuite) TestSetNewWorkerChanNoChannel() { + suite.Nil(suite.pool.newWorkerChan, "newWorkerChan must be nil at initialization") +} + +// newWorkerChan (channel to send signals on each new worker spin up) == chan (from SetNewWorkerChan(chan) ) ==> no error +func (suite *PoolTestSuite) TestSetNewWorkerChan() { + tmpChan := make(chan int) + suite.pool.SetNewWorkerChan(tmpChan) + suite.Equal(tmpChan, suite.pool.newWorkerChan, "newWorkerChan must be equal to the chan passed as param in SetNewWorkerChan(chan)") +} + +// *************************************************************************************** +// ** StartWorkers() +// *************************************************************************************** + +// StartWorkers() with a worker's func at first invoke ==> no error +func (suite *PoolTestSuite) TestStartWorkersNoErrors() { + // add a dummy worker's func + suite.pool.SetWorkerFunc(func(data interface{}) bool { return true }) + suite.NoError(suite.pool.StartWorkers(), "StartWorkers() must return no error the first time it is invoked.") +} + +// verifies that the exact amount of workers (initialWorkers) are started up, not more, not less +func (suite *PoolTestSuite) TestStartWorkersExactAmountOfWorkers() { + // add a dummy worker's func + suite.pool.SetWorkerFunc(func(data interface{}) bool { return true }) + // set newWorkerChan to receive new workers' signals + newWorkerChan := make(chan int, 10) + suite.pool.SetNewWorkerChan(newWorkerChan) + // start up the workers + suite.pool.StartWorkers() + + totalWorkersUp := 0 + keepWorking := true + for keepWorking { + select { + case newWorker := <-newWorkerChan: + totalWorkersUp += newWorker + if totalWorkersUp == initialWorkers { + // OK !!!! + keepWorking = false + } + + // This test will fail if the following amount of time is exceeded after the last worker was started up. + // The idea is: the amount of time between two workers initialization should be less than 3 seconds. + // Keep in mind that this amount of time will depend on the host speed. + case <-time.After(3 * time.Second): + suite.Failf("StartWorkers() started up less workers than expected", "Expected new workers: %v. Total workers started up: %v.", initialWorkers, totalWorkersUp) + keepWorking = false + } + } + + // TODO ::: Check for a better way to verify that no extra workers are started up + // verify that no more workers than initialWorkers are started up + if totalWorkersUp == initialWorkers { + select { + case newWorker := <-newWorkerChan: + suite.Failf("StartWorkers() started up more workers than expected", "Expected workers: %v. Total workers started up: %v.", initialWorkers, initialWorkers+newWorker) + + // wait 3 seconds to verify that no extra workers are started up + case <-time.After(3 * time.Second): + break + } + } +} + +// *************************************************************************************** +// ** StartWorkersAndWait() +// *************************************************************************************** + +// StartWorkersAndWait() with a worker's func at first invoke ==> no error +func (suite *PoolTestSuite) TestStartWorkersAndWaitWithWorkerHandlerFunction() { + // add a dummy worker's func + suite.pool.SetWorkerFunc(func(data interface{}) bool { return true }) + suite.NoError(suite.pool.StartWorkersAndWait(), "StartWorkersAndWait() must return no error having a defined worker's handler function.") +} + +// StartWorkersAndWait() without a worker's func at first invoke ==> error +func (suite *PoolTestSuite) TestStartWorkersAndWaitNoWorkerHandlerFunction() { + suite.Error(suite.pool.StartWorkersAndWait(), "StartWorkersAndWait() must return error if no worker's handler func is defined.") +} + +// verifies that the exact amount of workers (initialWorkers) are started up, not more, not less && the elapsed time between each worker initialization +func (suite *PoolTestSuite) TestStartWorkersAndWait() { + // add a dummy worker's func + suite.pool.SetWorkerFunc(func(data interface{}) bool { return true }) + + newWorkerChan := make(chan int, initialWorkers*2) + suite.pool.SetNewWorkerChan(newWorkerChan) + + // start listening for the "new worker" signals + go func() { + totalWorkersUp := 0 + select { + case message := <-newWorkerChan: + totalWorkersUp += message + if totalWorkersUp == initialWorkers { + break + } + case <-time.After(2 * time.Second): + suite.Fail("Too much time waiting for the workers") + } + }() + + // start up all workers and wait until them are 100% alive + suite.NoError(suite.pool.StartWorkersAndWait(), "Unexpected error fromStartWorkersAndWait()") + + totalWorkers := suite.pool.GetTotalWorkers() + suite.Equalf(totalWorkers, initialWorkers, "Expected workers up: %v, Total workers up: %v", initialWorkers, totalWorkers) +} + +// *************************************************************************************** +// ** Wait() +// *************************************************************************************** + +// Wait() without worker's func (worker's handler function) ==> error +func (suite *PoolTestSuite) TestWaitWithoutWorkerFunc() { + suite.Error(suite.pool.Wait(), "Wait() must return error if no worker's func is defined.") +} + +// Wait() with worker's func but before start up workers ==> no error +func (suite *PoolTestSuite) TestWaitBeforeStartWorkers() { + // add a dummy worker's func + suite.pool.SetWorkerFunc(func(data interface{}) bool { return true }) + suite.NoError(suite.pool.Wait(), "Wait() must return nil if no workers are running.") +} + +// Wait() with worker's func and zero workers up ==> nil +func (suite *PoolTestSuite) TestWaitWithZeroWorkers() { + // add a dummy worker's func + suite.pool.SetWorkerFunc(func(data interface{}) bool { return true }) + // mimic total workers == 0 + suite.pool.totalWorkers = 0 + + suite.Nil(suite.pool.Wait(), "Wait() must return nil if no workers are running.") +} + +// *************************************************************************************** +// ** WaitUntilNSuccesses(n) +// *************************************************************************************** + +// WaitUntilNSuccesses(n) without worker's func (worker's handler function) ==> error +func (suite *PoolTestSuite) TestWaitUntilNSuccessesWithoutWorkerFunc() { + suite.Error(suite.pool.WaitUntilNSuccesses(5), "WaitUntilNSuccesses(n) must return error if no worker's func is defined.") +} + +// WaitUntilNSuccesses(n) ==> successes (workers returning true) must be >= n +func (suite *PoolTestSuite) TestWaitUntilNSuccesses() { + // channel to receive successes + successChan := make(chan bool, 5) + successCounter := 0 + minSuccesses := 3 + + // the worker's handler func will send always a signal over the given channel + suite.pool.SetWorkerFunc(func(data interface{}) bool { + successChan <- true + return true + }) + + // goroutine to catch the signals sent by the workers + go func(ch chan bool, counter *int) { + for { + select { + case <-ch: + *counter++ + } + } + }(successChan, &successCounter) + + // start the workers + suite.pool.StartWorkers() + // enqueue minSuccesses + 1 jobs + for i := 0; i < minSuccesses+1; i++ { + suite.pool.AddTask(nil) + } + // wait until minSuccesses jobs were successfully processed + suite.pool.WaitUntilNSuccesses(minSuccesses) + + suite.Truef(successCounter >= minSuccesses, "At least %v jobs have to be successfully processed by the workers, there are only %v", minSuccesses, successCounter) +} + +// *************************************************************************************** +// ** KillWorker() +// *************************************************************************************** + +// KillWorker() ==> error if KillAllWorkersInCourse() in course +func (suite *PoolTestSuite) TestKillWorkerWhileKillAllWorkersInCourse() { + suite.pool.broadMessages.Store(broadMessageKillAllWorkers, true) + suite.Error(suite.pool.KillWorker(), "KillWorker() must return error while KillAllWorkers() in course.") +} + +// KillWorker() ==> nil if KillAllWorkersInCourse() is not in course +func (suite *PoolTestSuite) TestKillWorkerWhileNotKillAllWorkersInCourse() { + suite.pool.broadMessages.Delete(broadMessageKillAllWorkers) + suite.NoError(suite.pool.KillWorker(), "KillWorker() must not return an error while KillAllWorkers() is not in course.") +} + +// only one "killed worker" confirmation must be received after KillWorker() +func (suite *PoolTestSuite) TestKillWorker() { + // dummy worker's handler func + suite.pool.SetWorkerFunc(func(data interface{}) bool { + return true + }) + + // channel to receive "new worker" signals + ch := make(chan int, initialWorkers) + suite.pool.SetNewWorkerChan(ch) + + // start the workers + suite.pool.StartWorkers() + + // wait for the first worker up + select { + case <-ch: + break + case <-time.After(3 * time.Second): + suite.Fail("Too long to spin up workers") + } + + // channel to receive "killed worker" signals + ch = make(chan int, initialWorkers) + suite.pool.SetKilledWorkerChan(ch) + + // kill a worker + err := suite.pool.KillWorker() + suite.NoError(err, "Error trying to kill a worker: '%v'", err) + + // wait for the "worker killed" signal + select { + case total := <-ch: + suite.Truef(total == 1, "%v workers killed, expected: 1", total) + case <-time.After(3 * time.Second): + suite.Fail("Too long time waiting for the \"killed worker\" signal") + } + + // no more "worker killed" signal should be received + select { + case total := <-ch: + suite.Failf("Extra workers killed", "Extra confirmations received with value: %v", total) + case <-time.After(3 * time.Second): + // it's ok, no more signals received + break + } +} + +// *************************************************************************************** +// ** KillWorkers(n) +// *************************************************************************************** + +// KillWorkers(n) ==> error if KillAllWorkersInCourse() in course +func (suite *PoolTestSuite) TestKillWorkersWhileKillAllWorkersInCourse() { + suite.pool.broadMessages.Store(broadMessageKillAllWorkers, true) + suite.Error(suite.pool.KillWorkers(5), "KillWorkers(n) must return error while KillAllWorkers() in course.") +} + +// KillWorkers(n) ==> nil if KillAllWorkersInCourse() is not in course +func (suite *PoolTestSuite) TestKillWorkersWhileNotKillAllWorkersInCourse() { + suite.pool.broadMessages.Delete(broadMessageKillAllWorkers) + suite.NoError(suite.pool.KillWorkers(5), "KillWorkers(n) must not return an error while KillAllWorkers() is not in course.") +} + +// only n "killed worker" confirmations must be received after KillWorkers(n) invoke +func (suite *PoolTestSuite) TestKillWorkers() { + n := initialWorkers - 1 + if n == 0 { + n++ + } + + suite.testKillWorkers(n) +} + +// helper ==> only n (n == workersToKill) "killed worker" confirmations must be received after KillWorkers(n) invoke +func (suite *PoolTestSuite) testKillWorkers(workersToKill int) { + // dummy worker's handler func + suite.pool.SetWorkerFunc(func(data interface{}) bool { + return true + }) + + suite.pool.StartWorkersAndWait() + + // channel to receive "killed worker" signals + ch := make(chan int, initialWorkers) + suite.pool.SetKilledWorkerChan(ch) + + // kill n workers + err := suite.pool.KillWorkers(workersToKill) + suite.NoErrorf(err, "Error trying to kill %v workers: '%v'", workersToKill, err) + + // wait for n "worker killed" signals + keepWaiting := true + totalKilledWorkers := 0 + for keepWaiting { + select { + case total := <-ch: + totalKilledWorkers += total + if totalKilledWorkers == workersToKill { + keepWaiting = false + break + } + + case <-time.After(3 * time.Second): + suite.Failf("Too long time waiting for the \"killed worker\" signals", "Total killed workers: %v, Expected killed workers: %v", totalKilledWorkers, workersToKill) + } + } + + // no more "worker killed" signal should be received + select { + case total := <-ch: + suite.Failf("Extra workers killed", "Extra confirmation received with value: %v", total) + case <-time.After(3 * time.Second): + // it's ok, no more signals received + break + } +} + +// *************************************************************************************** +// ** KillAllWorkers(n) +// *************************************************************************************** + +// only n (n == initialWorkers) "killed worker" confirmations must be received after KillWorkers(n) invoke +// and total workers up must be zero +func (suite *PoolTestSuite) TestKillAllWorkers() { + suite.testKillWorkers(initialWorkers) + + totalWorkers := suite.pool.GetTotalWorkers() + suite.Truef(totalWorkers == 0, "Total workers must be equal to zero, not %v", totalWorkers) +} + +// *************************************************************************************** +// ** KillAllWorkersAndWait() +// *************************************************************************************** + +// worker must be 0 after KillAllWorkersAndWait() (total workers == initialWorkers before invoke KillAllWorkersAndWait()) +func (suite *PoolTestSuite) TestKillAllWorkersAndWait() { + // dummy worker's handler func + suite.pool.SetWorkerFunc(func(data interface{}) bool { + return true + }) + + // start the workers + suite.pool.StartWorkers() + // send the "kill all workers" signal and wait until it takes effect + suite.pool.KillAllWorkersAndWait() + // workers must be 0 + suite.Truef(suite.pool.GetTotalWorkers() == 0, "No workers should be up after a KillAllWorkersAndWait(). There are %v workers up.", suite.pool.GetTotalWorkers()) +} + +// worker must be 0 after KillAllWorkersAndWait() (total workers == 0 before invoke KillAllWorkersAndWait()) +func (suite *PoolTestSuite) TestKillAllWorkersAndWaitWithZeroWorkers() { + // send the "kill all workers" signal and wait until it takes effect + suite.pool.KillAllWorkersAndWait() + + // workers must be 0 + suite.Truef(suite.pool.GetTotalWorkers() == 0, "No workers should be up after a KillAllWorkersAndWait(). There are %v workers up.", suite.pool.GetTotalWorkers()) +} + +// *************************************************************************************** +// ** SetTotalWorkers(n) +// *************************************************************************************** + +// SetTotalWorkers(n) before start workers ==> error +func (suite *PoolTestSuite) TestSetTotalWorkersBeforeStartWorkers() { + suite.Error(suite.pool.SetTotalWorkers(10), "SetTotalWorkers(n) must return error if no workers were started.") +} + +// SetTotalWorkers(n) while StartWorkers is still running => error +func (suite *PoolTestSuite) TestSetTotalWorkersWhileStartWorkersIsStillRunning() { + suite.pool.StartWorkers() + // ensure that StartWorkers() is still running while SetTotalWorkers() is invoked + // suite.pool.workersStarted == false ==> StartWorkers() is still running or it hasn't invoked yet + suite.Equal(suite.pool.workersStarted == false && suite.pool.SetTotalWorkers(10) != nil, true, "SetTotalWorkers(n) must return error if StartWorkers() is in progress.") +} + +// SetTotalWorkers() after StartWorkers() and while KillAllWorkers() is in progress ==> error +func (suite *PoolTestSuite) TestSetTotalWorkersDuringKillAllWorkers() { + // mimic pool.StartWorkers() + suite.pool.workersStarted = true + // internal message to "kill all workers", it is triggered by KillAllWorkers() + suite.pool.broadMessages.Store(broadMessageKillAllWorkers, true) + suite.Error(suite.pool.SetTotalWorkers(10), "SetTotalWorkers(n) must return error while KillAllWorkers() is in progress.") +} + +// SetTotalWorkers() after StartWorkers() and while KillAllWorkers() is NOT in progress ==> not error +func (suite *PoolTestSuite) TestSetTotalWorkers() { + // mimic pool.StartWorkers() + suite.pool.workersStarted = true + // ensure that KillAllWorkers() is not in progress + suite.pool.broadMessages.Store(broadMessageKillAllWorkers, false) + + suite.Nil(suite.pool.SetTotalWorkers(10), "SetTotalWorkers(n) must return nil if workers were started and KillAllWorkers() is not in progress.") +} + +// *************************************************************************************** +// ** AddTaskCallback +// *************************************************************************************** + +// AddTaskCallback ==> error if no new tasks could be enqueued +func (suite *PoolTestSuite) TestAddTaskCallbackDoNotProcess() { + suite.pool.doNotProcess = true + suite.Error(suite.pool.AddTaskCallback(nil, func(data interface{}) {}), "AddTaskCallback must return error if it is invoked when no new tasks could be enqueued.") +} + +// *************************************************************************************** +// ** Helpers +// *************************************************************************************** + +func (suite *PoolTestSuite) waitUntilStartWorkers() { + //suite.NoError(suite.pool.StartWorkers(), "StartWorkers() must return") +} + +// *************************************************************************************** +// ** Run suite +// *************************************************************************************** + +func TestRunSuite(t *testing.T) { + suite.Run(t, new(PoolTestSuite)) +} diff --git a/readme.md b/readme.md index c2048fd..829a450 100644 --- a/readme.md +++ b/readme.md @@ -1,7 +1,7 @@ -[![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goworkerpool) ![version](https://img.shields.io/badge/version-v0.8.0-yellowgreen.svg?style=flat "goworkerpool v0.8.0") [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goworkerpool)](https://goreportcard.com/report/github.com/enriquebris/goworkerpool) [![Build Status](https://travis-ci.org/enriquebris/goworkerpool.svg?branch=master)](https://travis-ci.org/enriquebris/goworkerpool) +[![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goworkerpool) ![version](https://img.shields.io/badge/version-v0.8.0-yellowgreen.svg?style=flat "goworkerpool v0.8.0") [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goworkerpool)](https://goreportcard.com/report/github.com/enriquebris/goworkerpool) [![Build Status](https://travis-ci.org/enriquebris/goworkerpool.svg?branch=master)](https://travis-ci.org/enriquebris/goworkerpool) [![codecov](https://codecov.io/gh/enriquebris/goworkerpool/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goworkerpool) # goworkerpool - Pool of workers -Pool of concurrent workers with the ability to increment / decrement / pause / resume workers on demand. +Pool of concurrent workers with the ability of increment / decrement / pause / resume workers on demand. ## Features @@ -10,6 +10,7 @@ Pool of concurrent workers with the ability to increment / decrement / pause / r - [Wait until at least a worker is alive](#wait-until-at-least-a-worker-is-alive) - [Wait until n jobs get successfully processed](#wait-until-n-jobs-get-successfully-processed) - [Add](#add-workers-on-demand) / [kill](#kill-workers-on-demand) workers on demand +- Be notified once a worker is [started up](#receive-a-notification-every-time-a-new-worker-is-started-up) / [killed](#receive-a-notification-every-time-a-worker-is-killed) - Multiple ways to kill workers: - [On demand](#kill-workers-on-demand) - [After currently enqueued jobs get processed](#kill-workers-after-currently-enqueued-jobs-get-processed) @@ -123,6 +124,37 @@ pool.SetWorkerFunc(func(data interface{}) bool { }) ``` +### Start up the workers + +#### Start up the workers asynchronously +[StartWorkers](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.StartWorkers) spins up the workers. The amount of workers to be started up is defined at the Pool instantiation. + +```go +pool.StartWorkers() +``` + +This is an asynchronous operation, but there is a way to be be notified each time a new worker is started up: through a channel. See [SetNewWorkerChan(chan)](#receive-a-notification-every-time-a-new-worker-is-started-up). + +#### Start up the workers synchronously + +[StartWorkersAndWait](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.StartWorkersAndWait) spins up the workers and wait until all of them are 100% up. The amount of workers to be started up is defined at the Pool instantiation. + +```go +pool.StartWorkersAndWait() +``` + +Although this is an synchronous operation, there is a way to be be notified each time a new worker is started up: through a channel. See [SetNewWorkerChan(chan)](#receive-a-notification-every-time-a-new-worker-is-started-up). Keep in mind that the channel listener should be running on a different goroutine. + +### Receive a notification every time a new worker is started up + +[SetNewWorkerChan(chan)](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.SetNewWorkerChan) sets a channel to receive notifications every time a new worker is started up. + +```go +pool.SetNewWorkerChan(ch chan<- int) +``` + +This is optional, no channel is needed to start up new workers. Basically is just a way to give feedback for the worker's start up operation. + ### Enqueue jobs on demand #### Enqueue a simple job @@ -246,6 +278,8 @@ pool.WaitUntilNSuccesses(n) pool.AddWorker() ``` +This is an asynchronous operation, but there is a [way to be be notified each time a new worker is started up: through a channel](#receive-a-notification-every-time-a-new-worker-is-started-up). + #### Add n workers on demand [AddWorkers](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.AddWorkers) adds n new workers to the pool. @@ -254,6 +288,8 @@ pool.AddWorker() pool.AddWorkers(n) ``` +This is an asynchronous operation, but there is a [way to be be notified each time a new worker is started up: through a channel](#receive-a-notification-every-time-a-new-worker-is-started-up). + ### Multiple ways to kill workers #### Kill workers on demand @@ -265,6 +301,8 @@ pool.AddWorkers(n) pool.KillWorker() ``` +This is an asynchronous operation, but there is a way to be be notified each time a worker is killed: through a channel. See [SetKilledWorkerChan(chan)](#receive-a-notification-every-time-a-worker-is-killed). + ##### Kill n workers [KillWorkers](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.KillWorkers) kills all live workers. For those currently processing jobs, it will wait until the work is done. @@ -273,6 +311,8 @@ pool.KillWorker() pool.KillWorkers(n) ``` +This is an asynchronous operation, but there is a way to be be notified each time a worker is killed: through a channel. See [SetKilledWorkerChan(chan)](#receive-a-notification-every-time-a-worker-is-killed). + ##### Kill all workers [KillAllWorkers](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.KillAllWorkers) kills all live workers once they are idle or after they finish processing their current jobs. @@ -281,6 +321,8 @@ pool.KillWorkers(n) pool.KillAllWorkers() ``` +This is an asynchronous operation, but there is a way to be be notified each time a worker is killed: through a channel. See [SetKilledWorkerChan(chan)](#receive-a-notification-every-time-a-worker-is-killed). + ##### Kill all workers and wait [KillAllWorkersAndWait](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.KillAllWorkersAndWait) triggers an action to kill all live workers and blocks until the action is done (meaning that all live workers are down). @@ -289,6 +331,8 @@ pool.KillAllWorkers() pool.KillAllWorkersAndWait() ``` +This is an asynchronous operation, but there is a way to be be notified each time a worker is killed: through a channel. See [SetKilledWorkerChan(chan)](#receive-a-notification-every-time-a-worker-is-killed). + #### Kill workers after currently enqueued jobs get processed ##### Kill a worker after currently enqueued jobs get processed @@ -301,6 +345,8 @@ By "*currently enqueued jobs*" I mean: the jobs enqueued at the moment this func pool.LateKillWorker() ``` +This is an asynchronous operation, but there is a way to be be notified each time a worker is killed: through a channel. See [SetKilledWorkerChan(chan)](#receive-a-notification-every-time-a-worker-is-killed). + ##### Kill n workers after currently enqueued jobs get processed [LateKillWorkers](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.LateKillWorkers) kills n workers after currently enqueued jobs get processed. @@ -312,6 +358,8 @@ By "*currently enqueued jobs*" I mean: the jobs enqueued at the moment this func pool.LateKillWorkers(n) ``` +This is an asynchronous operation, but there is a way to be be notified each time a worker is killed: through a channel. See [SetKilledWorkerChan(chan)](#receive-a-notification-every-time-a-worker-is-killed). + ##### Kill all workers after currently enqueued jobs get processed [LateKillAllWorkers](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.LateKillAllWorkers) kills all workers after currently enqueued jobs get processed. @@ -323,7 +371,20 @@ By "*currently enqueued jobs*" I mean: the jobs enqueued at the moment this func pool.LateKillAllWorkers() ``` -### Change the number of workers on demand +This is an asynchronous operation, but there is a way to be be notified each time a worker is killed: through a channel. See [SetKilledWorkerChan(chan)](#receive-a-notification-every-time-a-worker-is-killed). + +### Receive a notification every time a worker is killed + +[SetKilledWorkerChan(chan)](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.SetKilledWorkerChan) sets a channel to receive notifications every time a worker is killed. + +```go +pool.SetKilledWorkerChan(ch chan int) +``` + +This is 100% optional. + + +### Update the amount of workers on demand [SetTotalWorkers](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.SetTotalWorkers) adjusts the number of live workers. @@ -333,6 +394,8 @@ pool.SetTotalWorkers(n) In case it needs to kill some workers (in order to adjust the total based on the given parameter), it will wait until their current jobs get processed (if they are processing jobs). +This is an asynchronous operation, but there is a [way to be be notified each time a new worker is started up: through a channel](#receive-a-notification-every-time-a-new-worker-is-started-up). + It returns an error in the following scenarios: - The workers were not started yet by StartWorkers. - There is a "in course" KillAllWorkers operation. @@ -356,6 +419,12 @@ pool.ResumeAllWorkers() ## History +### v0.9.0 + +- Added a way to know that new workers were started (using an optional channel) +- Added a way to know if a worker was killed (using an optional channel) +- StartWorkersAndWait() to start workers (for first time) and wait until all of them are alive + ### v0.8.0 - Enqueue jobs plus callback functions