Skip to content

Commit

Permalink
Merge pull request #30 from enriquebris/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
enriquebris authored Oct 29, 2018
2 parents 13467b0 + 2adde45 commit 8d0aadd
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 12 deletions.
70 changes: 70 additions & 0 deletions examples/callback/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// On this example:
// - 30 jobs will be enqueued to be processed by the workers
// - 10 workers will be running until the 30 enqueued jobs get processed
// - 1 of the jobs will be enqueued plus a callback, which will be invoked just after the job get processed
// - there is a callback to enqueue without job's data

package main

import (
"log"
"time"

"github.com/enriquebris/goworkerpool"
)

func main() {
// total workers
totalWorkers := 10
// max number of pending jobs
maxNumberPendingJobs := 15
// do not log messages about the pool processing
verbose := false

pool := goworkerpool.NewPool(totalWorkers, maxNumberPendingJobs, verbose)

// add the worker handler function
pool.SetWorkerFunc(func(data interface{}) bool {
log.Printf("processing %v\n", data)
// add a 1 second delay (to makes it look as it were processing the job)
time.Sleep(time.Second)
log.Printf("processing finished for: %v\n", data)

// let the pool knows that the worker was able to complete the task
return true
})

// start up the workers
pool.StartWorkers()

// enqueue jobs in a separate goroutine
go func() {
for i := 0; i < 30; i++ {
if i == 15 {
// enqueue a job + callback
// the job will be processed first and later the callback will be invoked
pool.AddTaskCallback(i, func(data interface{}) {
log.Printf("Callback for: '%v' *******************************", data)
})
continue
}

if i == 20 {
// enqueue a callback
// there is no job to process, so the only thing the worker will do is to invoke the callback function
pool.AddCallback(func(data interface{}) {
log.Println("simple callback function")
})
continue
}

pool.AddTask(i)
}

// kill all workers after the currently enqueued jobs get processed
pool.LateKillAllWorkers()
}()

// wait while at least one worker is alive
pool.Wait()
}
5 changes: 5 additions & 0 deletions history.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## History

### v0.8.0

- Enqueue jobs plus callback functions
- Enqueue callback functions without jobs' data

### v0.7.4

- Fixed bug that caused randomly worker initialization error
Expand Down
98 changes: 90 additions & 8 deletions pool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// ** goworkerpool.com **********************************************************************************************
// ** github.com/enriquebris/goworkerpool **
// ** v0.7.4 ********************************************************************************************************
// ** v0.8.0 ********************************************************************************************************

// Package goworkerpool provides a pool of concurrent workers with the ability to increment / decrement / pause / resume workers on demand.
package goworkerpool
Expand Down Expand Up @@ -61,17 +61,23 @@ const (

// poolJobData codes
poolJobDataCodeRegular = "regular"
poolJobDataCodeCallback = "callback"
poolJobDataCodeLateKillWorker = "lateKillWorker"
poolJobDataCodeLateKillAllWorkers = "lateKillAllWorkers"
)

// PoolFunc defines the function signature to be implemented by the worker's func
type PoolFunc func(interface{}) bool

// PoolCallback defines the callback function signature
type PoolCallback func(interface{})

// poolJobData contains the job data && internal pool data
type poolJobData struct {
Code string
JobData interface{}
Code string
JobData interface{}
Category string
CallbackFunc PoolCallback
}

type Pool struct {
Expand Down Expand Up @@ -422,7 +428,7 @@ func (st *Pool) SetWorkerFunc(fn PoolFunc) {

// AddTask will enqueue a job (into a FIFO queue: a channel).
//
// The parameter for the job's data accepts any kind of value (interface{}).
// The parameter for the job's data accepts any type (interface{}).
//
// Workers (if alive) will be listening to and picking up jobs from this queue. If no workers are alive nor idle,
// the job will stay in the queue until any worker will be ready to pick it up and start processing it.
Expand All @@ -431,7 +437,7 @@ func (st *Pool) SetWorkerFunc(fn PoolFunc) {
// for a free queue slot to enqueue a new job in case the queue is at full capacity.
//
// AddTask will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during
// a certain amount of time when WaitUntilNSuccesses meet the stop condition.
// a certain amount of time when WaitUntilNSuccesses meets the stop condition.
func (st *Pool) AddTask(data interface{}) error {
if !st.doNotProcess {
// enqueue a regular job
Expand All @@ -445,6 +451,60 @@ func (st *Pool) AddTask(data interface{}) error {
return errors.New("No new jobs are accepted at this moment")
}

// AddTaskCallback enqueues a job plus a callback function into a FIFO queue (a channel).
//
// The parameter for the job's data accepts any type (interface{}).
//
// Workers (if alive) will be listening to and picking up jobs from this queue. If no workers are alive nor idle,
// the job will stay in the queue until any worker will be ready to pick it up and start processing it.
//
// The worker who picks up this job + callback will process the job first and later will invoke the callback function, passing the job's data as a parameter.
//
// The queue in which this function enqueues the jobs has a limit (it was set up at pool initialization). It means that AddTaskCallback will wait
// for a free queue slot to enqueue a new job in case the queue is at full capacity.
//
// AddTaskCallback will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during
// a certain amount of time when WaitUntilNSuccesses meets the stop condition.
func (st *Pool) AddTaskCallback(data interface{}, callbackFn PoolCallback) error {
if !st.doNotProcess {
// enqueue a job + callback
st.jobsChan <- poolJobData{
Code: poolJobDataCodeCallback,
JobData: data,
CallbackFunc: callbackFn,
}
return nil
}

return errors.New("No new jobs are accepted at this moment")
}

// AddCallback enqueues a callback function into a FIFO queue (a channel).
//
// Workers (if alive) will be listening to and picking up jobs from this queue. If no workers are alive nor idle,
// the job will stay in the queue until any worker will be ready to pick it up and start processing it.
//
// The worker who picks up this job will only invoke the callback function, passing nil as a parameter.
//
// The queue in which this function enqueues the jobs has a limit (it was set up at pool initialization). It means that AddCallback will wait
// for a free queue slot to enqueue a new job in case the queue is at full capacity.
//
// AddCallback will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during
// a certain amount of time when WaitUntilNSuccesses meets the stop condition.
func (st *Pool) AddCallback(callbackFn PoolCallback) error {
if !st.doNotProcess {
// enqueue a callback-only job
st.jobsChan <- poolJobData{
Code: poolJobDataCodeCallback,
JobData: nil,
CallbackFunc: callbackFn,
}
return nil
}

return errors.New("No new jobs are accepted at this moment")
}

// *************************************************************************************************************
// ** Workers operations **************************************************************************************
// *************************************************************************************************************
Expand Down Expand Up @@ -601,7 +661,6 @@ func (st *Pool) workerFunc(n int) {
// TODO ::: re-enqueue in a different queue/channel/struct
// re-enqueue the job / task
st.AddTask(taskData.JobData)

} else {
// execute the job
fnSuccess := st.fn(taskData.JobData)
Expand All @@ -615,7 +674,30 @@ func (st *Pool) workerFunc(n int) {
}
}

// late kill signal
// job + callback || callback
case poolJobDataCodeCallback:
if st.doNotProcess {
st.AddTaskCallback(taskData.JobData, taskData.CallbackFunc)
}

// execute the job (in case there is some job data)
if taskData.JobData != nil {
// execute the job
fnSuccess := st.fn(taskData.JobData)

// avoid to cause deadlock
if !st.doNotProcess {
// keep track of the job's result
st.fnSuccessChan <- fnSuccess
} else {
// TODO ::: save the job result ...
}
}

// run the callback
taskData.CallbackFunc(taskData.JobData)

// late kill signal
case poolJobDataCodeLateKillWorker:
if st.verbose {
log.Printf("[pool] worker %v is going to be down", n)
Expand All @@ -628,7 +710,7 @@ func (st *Pool) workerFunc(n int) {
keepWorking = false
break

// late kill all workers
// late kill all workers
case poolJobDataCodeLateKillAllWorkers:
if st.verbose {
log.Printf("[pool] worker %v is going to be down :: LateKillAllWorkers()", n)
Expand Down
71 changes: 67 additions & 4 deletions readme.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goworkerpool) ![version](https://img.shields.io/badge/version-v0.7.4-yellowgreen.svg?style=flat "goworkerpool v0.7.3") [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goworkerpool)](https://goreportcard.com/report/github.com/enriquebris/goworkerpool) [![Build Status](https://travis-ci.org/enriquebris/goworkerpool.svg?branch=master)](https://travis-ci.org/enriquebris/goworkerpool)
[![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goworkerpool) ![version](https://img.shields.io/badge/version-v0.8.0-yellowgreen.svg?style=flat "goworkerpool v0.8.0") [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goworkerpool)](https://goreportcard.com/report/github.com/enriquebris/goworkerpool) [![Build Status](https://travis-ci.org/enriquebris/goworkerpool.svg?branch=master)](https://travis-ci.org/enriquebris/goworkerpool)

# goworkerpool - Pool of workers
Pool of concurrent workers with the ability to increment / decrement / pause / resume workers on demand.

## Features

- [Enqueue jobs on demand](#enqueue-a-job)
- [Enqueue jobs on demand](#enqueue-jobs-on-demand)
- Multiple ways to wait / block
- [Wait until at least a worker is alive](#wait-until-at-least-a-worker-is-alive)
- [Wait until n jobs get successfully processed](#wait-until-n-jobs-get-successfully-processed)
Expand All @@ -31,6 +31,9 @@ go get -tags v0 github.com/enriquebris/goworkerpool
## Documentation
Visit [goworkerpool at godoc.org](https://godoc.org/github.com/enriquebris/goworkerpool)

## TODO
Visit the [TODO](./todo.md) page.

## Get started

#### Simple usage
Expand Down Expand Up @@ -120,7 +123,8 @@ pool.SetWorkerFunc(func(data interface{}) bool {
})
```

### Enqueue a job
### Enqueue jobs on demand
#### Enqueue a simple job

[AddTask](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.AddTask) will enqueue a job into a FIFO queue (a channel).

Expand All @@ -137,7 +141,61 @@ The queue in which this function enqueues the jobs has a limit (it was set up at
for a free queue slot to enqueue a new job in case the queue is at full capacity.

AddTask will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during
a certain amount of time when [WaitUntilNSuccesses](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.WaitUntilNSuccesses) meet the stop condition.
a certain amount of time when [WaitUntilNSuccesses](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.WaitUntilNSuccesses) meets the stop condition.

#### Enqueue a simple job plus callback function

[AddTaskCallback](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.AddTaskCallback) will enqueue a job plus a callback function into a FIFO queue (a channel).

```go
pool.AddTaskCallback(data, callback)
```

The parameter for the job's data accepts any type (interface{}).

Workers (if alive) will be listening to and picking up jobs from this queue. If no workers are alive nor idle,
the job will stay in the queue until any worker will be ready to pick it up and start processing it.

The worker who picks up this job + callback will process the job first and later will invoke the callback function, passing the job's data as a parameter.

The queue in which this function enqueues the jobs has a limit (it was set up at pool initialization). It means that AddTaskCallback will wait
for a free queue slot to enqueue a new job in case the queue is at full capacity.

AddTaskCallback will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during
a certain amount of time when [WaitUntilNSuccesses](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.WaitUntilNSuccesses) meets the stop condition.

#### Enqueue a callback function

[AddCallback](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.AddCallback) will enqueue a callback function into a FIFO queue (a channel).

```go
pool.AddCallback(callback)
```

Workers (if alive) will be listening to and picking up jobs from this queue. If no workers are alive nor idle,
the job will stay in the queue until any worker will be ready to pick it up and start processing it.

The worker who picks up this task will only invoke the callback function, passing nil as a parameter.

The queue in which this function enqueues the jobs has a limit (it was set up at pool initialization). It means that AddCallback will wait
for a free queue slot to enqueue a new job in case the queue is at full capacity.

AddCallback will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during
a certain amount of time when [WaitUntilNSuccesses](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.WaitUntilNSuccesses) meets the stop condition.

#### Enqueue a job with category and callback

[AddComplexTask](https://godoc.org/github.com/enriquebris/goworkerpool#Pool.AddComplexTask) will enqueue a job into a FIFO queue (a channel).

```go
pool.AddComplexTask(data, category, callback)
```

This function extends the scope of [AddTask](#enqueue-a-simple-job) adding category and callback.

The job will be grouped based on the given category (for stats purposes).

The callback function (if any) will be invoked just after the job gets processed.

#### Pass multiple data to be processed by a worker

Expand Down Expand Up @@ -298,6 +356,11 @@ pool.ResumeAllWorkers()

## History

### v0.8.0

- Enqueue jobs plus callback functions
- Enqueue callback functions without jobs' data

### v0.7.4

- Fixed bug that caused randomly worker initialization error
Expand Down
Loading

0 comments on commit 8d0aadd

Please sign in to comment.