Skip to content

Commit

Permalink
Merge pull request #40 from enriquebris/mod
Browse files Browse the repository at this point in the history
goconcurrentcounter && go.mod
  • Loading branch information
enriquebris authored Feb 24, 2022
2 parents 155c0f7 + 8d8bfb6 commit 4f64cfb
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 14 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ go:
- 1.12.x
- 1.13.x
- 1.14.x
- 1.15.x
- 1.16.x
- 1.17.x

before_install:
- go get -t -v ./...
Expand Down
15 changes: 15 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module github.com/enriquebris/goworkerpool

go 1.17

require (
github.com/enriquebris/goconcurrentcounter v0.0.0-20220112230718-926268af96e3
github.com/enriquebris/goconcurrentqueue v0.6.0
github.com/stretchr/testify v1.7.0
)

require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
15 changes: 15 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/enriquebris/goconcurrentcounter v0.0.0-20220112230718-926268af96e3 h1:BWpJbx0Y0MpUdxkuUzqTOnnOfTW9eQJNQ/z+l+XGQnk=
github.com/enriquebris/goconcurrentcounter v0.0.0-20220112230718-926268af96e3/go.mod h1:6JD9VP3tKnQxDyYlU8aw4V+4E+kM3vqPVite1uazIjc=
github.com/enriquebris/goconcurrentqueue v0.6.0 h1:DJ97cgoPVoqlC4tTGBokn/omaB3o16yIs5QdAm6YEjc=
github.com/enriquebris/goconcurrentqueue v0.6.0/go.mod h1:wGJhQNFI4wLNHleZLo5ehk1puj8M6OIl0tOjs3kwJus=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
22 changes: 11 additions & 11 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ func (st *Pool) initialize(options PoolOptions) {
st.totalWorkers = goconcurrentcounter.NewIntChan(0)

// add a trigger function to send a signal once all initial workers are up
st.totalWorkers.SetTriggerOnValue(int(options.TotalInitialWorkers), "WaitUntilInitialWorkersAreUp", func() {
st.totalWorkers.SetTriggerOnValue(int(options.TotalInitialWorkers), "WaitUntilInitialWorkersAreUp", func(currentValue int, previousValue int) {
// remove the trigger function
st.totalWorkers.EnqueueToRunAfterCurrentTriggerFunctions(func() {
st.totalWorkers.EnqueueToRunAfterCurrentTriggerFunctions(func(currentValue int, previousValue int) {
st.totalWorkers.UnsetTriggerOnValue(int(options.TotalInitialWorkers), "WaitUntilInitialWorkersAreUp")
})

Expand Down Expand Up @@ -198,7 +198,7 @@ func (st *Pool) initialize(options PoolOptions) {
}

// send signal if totalWorkers == 0 (all workers are down)
st.totalWorkers.SetTriggerOnValue(0, triggerZeroTotalWorkers, func() {
st.totalWorkers.SetTriggerOnValue(0, triggerZeroTotalWorkers, func(currentValue int, previousValue int) {
// remove the "killAllWorkersInProgress" flag (if active)
if st.getStatus(killAllWorkersInProgress) {
st.setStatus(killAllWorkersInProgress, false)
Expand Down Expand Up @@ -900,9 +900,9 @@ func (st *Pool) KillAllWorkersAndWait() error {
// wait until all workers are down
waitForKillAllWorkersAndWait := make(chan struct{}, 1)
// 1 - trigger a named function on totalWorkers (0)
st.totalWorkers.SetTriggerOnValue(0, "KillAllWorkersAndWait", func() {
st.totalWorkers.SetTriggerOnValue(0, "KillAllWorkersAndWait", func(currentValue int, previousValue int) {
// enqueue to run after trigger functions
st.totalWorkers.EnqueueToRunAfterCurrentTriggerFunctions(func() {
st.totalWorkers.EnqueueToRunAfterCurrentTriggerFunctions(func(currentValue int, previousValue int) {
// remove the named trigger
st.totalWorkers.UnsetTriggerOnValue(0, "KillAllWorkersAndWait")
})
Expand Down Expand Up @@ -1019,12 +1019,12 @@ func (st *Pool) Wait() error {
// TODO ::: An error will be returned if the worker's function is not already set.
func (st *Pool) WaitUntilNSuccesses(n int) error {
wait := make(chan struct{})
st.taskSuccesses.SetTriggerOnValue(n, triggerWaitUntilNSuccesses, func() {
st.taskSuccesses.SetTriggerOnValue(n, triggerWaitUntilNSuccesses, func(currentValue int, previousValue int) {
// kill all workers
st.KillAllWorkers()

// execute just after the trigger functions
st.taskSuccesses.EnqueueToRunAfterCurrentTriggerFunctions(func() {
st.taskSuccesses.EnqueueToRunAfterCurrentTriggerFunctions(func(currentValue int, previousValue int) {
// this function should be executed only once, so let's remove it after it's first call
st.taskSuccesses.UnsetTriggerOnValue(n, triggerWaitUntilNSuccesses)
})
Expand All @@ -1050,9 +1050,9 @@ func (st *Pool) SafeWaitUntilNSuccesses(n int) error {
safeWait := make(chan struct{})

// to be executed once extra workers finished their processing
st.totalWorkersInProgress.SetTriggerOnValue(0, "safeSafeWaitUntilNSuccesses", func() {
st.totalWorkersInProgress.SetTriggerOnValue(0, "safeSafeWaitUntilNSuccesses", func(currentValue int, previousValue int) {
// execute just after the trigger functions
st.totalWorkersInProgress.EnqueueToRunAfterCurrentTriggerFunctions(func() {
st.totalWorkersInProgress.EnqueueToRunAfterCurrentTriggerFunctions(func(currentValue int, previousValue int) {
// this function should be executed only once, let's remove it after the first call
st.totalWorkersInProgress.UnsetTriggerOnValue(0, "safeSafeWaitUntilNSuccesses")
})
Expand All @@ -1062,12 +1062,12 @@ func (st *Pool) SafeWaitUntilNSuccesses(n int) error {
})

// to be executed once successes == n
st.taskSuccesses.SetTriggerOnValue(n, triggerWaitUntilNSuccesses, func() {
st.taskSuccesses.SetTriggerOnValue(n, triggerWaitUntilNSuccesses, func(currentValue int, previousValue int) {
// kill all workers
st.KillAllWorkers()

// execute just after the trigger functions
st.taskSuccesses.EnqueueToRunAfterCurrentTriggerFunctions(func() {
st.taskSuccesses.EnqueueToRunAfterCurrentTriggerFunctions(func(currentValue int, previousValue int) {
// this function should be executed only once, so let's remove it after the first call
st.taskSuccesses.UnsetTriggerOnValue(n, triggerWaitUntilNSuccesses)
})
Expand Down
2 changes: 1 addition & 1 deletion pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (suite *PoolTestSuite) TestDispatcherActionDoNotSentToWorker() {

notSupposedToHappenChan := make(chan struct{}, 2)
// add a trigger to totalWorkers - 1 ==> fail and wait some short time !!!
suite.pool.totalWorkers.SetTriggerOnValue(totalWorkers-1, "error", func() {
suite.pool.totalWorkers.SetTriggerOnValue(totalWorkers-1, "error", func(currentValue int, previousValue int) {
notSupposedToHappenChan <- struct{}{}
})

Expand Down
9 changes: 7 additions & 2 deletions readme.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goworkerpool) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goworkerpool) ![version](https://img.shields.io/badge/version-v0.10.0-yellowgreen.svg?style=flat "goworkerpool v0.10.0") [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goworkerpool)](https://goreportcard.com/report/github.com/enriquebris/goworkerpool) [![Build Status](https://travis-ci.org/enriquebris/goworkerpool.svg?branch=master)](https://travis-ci.org/enriquebris/goworkerpool) [![codecov](https://codecov.io/gh/enriquebris/goworkerpool/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goworkerpool)
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goworkerpool) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goworkerpool) ![version](https://img.shields.io/badge/version-v0.10.1-yellowgreen.svg?style=flat "goworkerpool v0.10.1") [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goworkerpool)](https://goreportcard.com/report/github.com/enriquebris/goworkerpool) [![Build Status](https://api.travis-ci.com/enriquebris/goworkerpool.svg?branch=master&status=unknown)](https://app.travis-ci.com/github/enriquebris/goworkerpool) [![codecov](https://codecov.io/gh/enriquebris/goworkerpool/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goworkerpool)

# goworkerpool - Pool of workers
Pool of concurrent workers with the ability of increment / decrement / pause / resume workers on demand.
Expand All @@ -25,7 +25,7 @@ Golang version >= 1.9
## Installation
Execute:
```bash
go get -tags v0 github.com/enriquebris/goworkerpool
go get github.com/enriquebris/goworkerpool

```

Expand Down Expand Up @@ -424,6 +424,11 @@ pool.ResumeAllWorkers()

## History

### v0.10.1

- Adjusted [goconcurrentcounter](github.com/enriquebris/goconcurrentcounter) parameters
- Added go.mod && go.sum files

### v0.10.0

- Initial workers automatically start running on pool initialization
Expand Down

0 comments on commit 4f64cfb

Please sign in to comment.