Skip to content

Commit

Permalink
Merge pull request #22 from enriquebris/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
enriquebris authored Oct 16, 2018
2 parents d34748a + 75c315e commit ea28fae
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 84 deletions.
10 changes: 5 additions & 5 deletions examples/adjust_total_workers_on_demand/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
func main() {
// total workers
totalWorkers := 10
// max number of tasks waiting in the channel
maxNumberJobsInChannel := 15
// max number of pending jobs
maxNumberPendingJobs := 15
// do not show messages about the pool processing
verbose := false

pool := goworkerpool.NewPool(totalWorkers, maxNumberJobsInChannel, verbose)
pool := goworkerpool.NewPool(totalWorkers, maxNumberPendingJobs, verbose)

// add the worker function
// add the worker handler function
pool.SetWorkerFunc(func(data interface{}) bool {
log.Printf("processing %v\n", data)
// add a 1 second delay (to makes it look as it were processing the job)
Expand All @@ -36,7 +36,7 @@ func main() {
// start up the workers (10 workers)
pool.StartWorkers()

// add tasks in a separate goroutine
// enqueue jobs in a separate goroutine
go func() {
for i := 0; i < 30; i++ {
pool.AddTask(i)
Expand Down
8 changes: 4 additions & 4 deletions examples/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (
func main() {
// total workers
totalWorkers := 10
// max number of tasks waiting in the channel
maxNumberJobsInChannel := 15
// max number of pending jobs
maxNumberPendingJobs := 15
// do not log messages about the pool processing
verbose := false

pool := goworkerpool.NewPool(totalWorkers, maxNumberJobsInChannel, verbose)
pool := goworkerpool.NewPool(totalWorkers, maxNumberPendingJobs, verbose)

// add the worker function handler
// add the worker handler function
pool.SetWorkerFunc(func(data interface{}) bool {
log.Printf("processing %v\n", data)
// add a 1 second delay (to makes it look as it were processing the job)
Expand Down
12 changes: 6 additions & 6 deletions examples/complex_jobs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ type JobData struct {
func main() {
// total workers
totalWorkers := 10
// max number of tasks waiting in the channel
maxNumberJobsInChannel := 15
// max number of pending jobs
maxNumberPendingJobs := 15
// do not show messages about the pool processing
verbose := false

pool := goworkerpool.NewPool(totalWorkers, maxNumberJobsInChannel, verbose)
pool := goworkerpool.NewPool(totalWorkers, maxNumberPendingJobs, verbose)

// add the worker function
// add the worker handler function
pool.SetWorkerFunc(worker)

// start up the workers
pool.StartWorkers()

// add tasks in a separate goroutine
// enqueue jobs in a separate goroutine
go func() {
for i := 0; i < 30; i++ {
// the jobs contain a custom struct: JobData
Expand All @@ -52,7 +52,7 @@ func main() {
pool.Wait()
}

// worker is the function each pool's worker will invoke to process jobs
// worker is the function each worker will invoke to process jobs
func worker(data interface{}) bool {
// cast the interface{} back to our custom JobData
if job, ok := data.(JobData); ok {
Expand Down
14 changes: 7 additions & 7 deletions examples/exit_after_n_jobs_get_successfully_processed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// - 100 jobs will be enqueued
// - all workers will exit after 30 jobs are successfully processed

package exit_after_n_jobs_get_successfully_processed
package main

import (
"log"
Expand All @@ -15,14 +15,14 @@ import (
func main() {
// total workers
totalWorkers := 10
// max number of tasks waiting in the channel
maxNumberJobsInChannel := 15
// max number of pending jobs
maxNumberPendingJobs := 15
// do not show messages about the pool processing
verbose := false

pool := goworkerpool.NewPool(totalWorkers, maxNumberJobsInChannel, verbose)
pool := goworkerpool.NewPool(totalWorkers, maxNumberPendingJobs, verbose)

// add the worker function
// add the worker handler function
pool.SetWorkerFunc(func(data interface{}) bool {
log.Printf("processing %v\n", data)
// add a 1 second delay (to makes it look as it were processing the job)
Expand All @@ -36,13 +36,13 @@ func main() {
// start up the workers
pool.StartWorkers()

// add tasks in a separate goroutine
// enqueue jobs in a separate goroutine
go func() {
for i := 0; i < 100; i++ {
pool.AddTask(i)
}
}()

// wait while at least one worker is alive
// wait until 30 jobs get successfully processed
pool.WaitUntilNSuccesses(30)
}
10 changes: 5 additions & 5 deletions examples/exit_after_process_enqueued_jobs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
func main() {
// total workers
totalWorkers := 10
// max number of tasks waiting in the channel
maxNumberJobsInChannel := 15
// max number of pending jobs
maxNumberPendingJobs := 15
// do not show messages about the pool processing
verbose := false

pool := goworkerpool.NewPool(totalWorkers, maxNumberJobsInChannel, verbose)
pool := goworkerpool.NewPool(totalWorkers, maxNumberPendingJobs, verbose)

// add the worker function
// add the worker handler function
pool.SetWorkerFunc(func(data interface{}) bool {
log.Printf("processing %v\n", data)
// add a 1 second delay (to makes it look as it were processing the job)
Expand All @@ -36,7 +36,7 @@ func main() {
// start up the workers
pool.StartWorkers()

// add tasks in a separate goroutine
// enqueue jobs in a separate goroutine
go func() {
for i := 0; i < 30; i++ {
pool.AddTask(i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,24 @@ import (
func main() {
// total workers
totalWorkers := 10
// max number of tasks waiting in the channel
maxNumberJobsInChannel := 15
// max number of pending jobs
maxNumberPendingJobs := 15
// do not show messages about the pool processing
verbose := false

// create the pool
pool := goworkerpool.NewPool(totalWorkers, maxNumberJobsInChannel, verbose)
pool := goworkerpool.NewPool(totalWorkers, maxNumberPendingJobs, verbose)

// CustomWorker
customWorker := NewCustomWorker()

// add the worker function
// add the worker handler function
pool.SetWorkerFunc(customWorker.Worker)

// start up the workers
pool.StartWorkers()

// add tasks in a separate goroutine
// enqueue jobs in a separate goroutine
go func() {
for i := 0; i < 30; i++ {
// enqueue a job
Expand Down
32 changes: 20 additions & 12 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,19 @@ type workerAction struct {
}

// NewPool creates, initializes and return a *Pool
func NewPool(initialWorkers int, maxJobsInChannel int, verbose bool) *Pool {
func NewPool(initialWorkers int, maxJobsInQueue int, verbose bool) *Pool {
ret := &Pool{}

ret.initialize(initialWorkers, maxJobsInChannel, verbose)
ret.initialize(initialWorkers, maxJobsInQueue, verbose)

return ret
}

func (st *Pool) initialize(initialWorkers int, maxJobsInChannel int, verbose bool) {
st.jobsChan = make(chan poolJobData, maxJobsInChannel)
func (st *Pool) initialize(initialWorkers int, maxJobsInQueue int, verbose bool) {
st.jobsChan = make(chan poolJobData, maxJobsInQueue)
st.totalWorkersChan = make(chan workerAction, 100)
// the package will cause deadlock if st.fnSuccessChan is full
st.fnSuccessChan = make(chan bool, maxJobsInChannel)
st.fnSuccessChan = make(chan bool, maxJobsInQueue)

// the workers were not started at this point
st.workersStarted = false
Expand Down Expand Up @@ -400,16 +400,21 @@ func (st *Pool) WaitUntilNSuccesses(n int) error {
return nil
}

// SetWorkerFunc sets the worker's function.
// This function will be invoked each time a worker receives a new job, and should return true to let know that the job
// SetWorkerFunc sets the worker's function handler.
// This function will be invoked each time a worker pulls a new job, and should return true to let know that the job
// was successfully completed, or false in other case.
func (st *Pool) SetWorkerFunc(fn PoolFunc) {
st.fn = fn
}

// SetTotalWorkers sets the number of live workers.
// It adjusts the current number of live workers based on the given number. In case that it have to kill some workers, it will wait until the current jobs get processed.
// It returns an error in case there is a "in course" KillAllWorkers operation.
// SetTotalWorkers adjusts the number of live workers.
//
// In case it needs to kill some workers (in order to adjust the total based on the given parameter), it will wait until
// their current jobs get processed (in case they are processing jobs).
//
// It returns an error in the following scenarios:
// - The workers were not started yet by StartWorkers.
// - There is a "in course" KillAllWorkers operation.
func (st *Pool) SetTotalWorkers(n int) error {
// verify that workers were started by StartWorkers()
if !st.workersStarted {
Expand Down Expand Up @@ -649,14 +654,17 @@ func (st *Pool) workerFunc(n int) {
}
}

// AddTask will enqueue a job (in a FIFO way).
// AddTask will enqueue a job (into a FIFO queue: a channel).
//
// The parameter for the job's data accepts any kind of value (interface{}).
//
// Workers (if alive) will be listening to and picking up jobs from this queue. If no workers are alive nor idle,
// the job will stay in the queue until any worker will be ready to pick it up and start processing it.
//
// The queue in which this function enqueues the jobs has a limit (it was set up at pool initialization). It means that AddTask will wait
// for a free queue slot to enqueue a new job in case the queue is at full capacity.
//
// It will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during
// AddTask will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during
// a certain amount of time when WaitUntilNSuccesses meet the stop condition.
func (st *Pool) AddTask(data interface{}) error {
if !st.doNotProcess {
Expand Down
Loading

0 comments on commit ea28fae

Please sign in to comment.