Skip to content

Commit

Permalink
feat(transformer): Concurrent dispatch event to dispatchers
Browse files Browse the repository at this point in the history
  • Loading branch information
akshaynshah committed Mar 6, 2020
1 parent 3928471 commit ec21751
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 16 deletions.
14 changes: 5 additions & 9 deletions audit/auditLinux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
package audit

import (
"encoding/json"
"fmt"
"github.com/livingstonetech/providence/transformer"
"github.com/mozilla/libaudit-go"
log "github.com/sirupsen/logrus"
Expand All @@ -20,6 +18,8 @@ type Auditor struct {
netlinkSocket *libaudit.NetlinkConnection
}

var transChan chan *libaudit.AuditEvent

func CreateAudit(config *viper.Viper) *Auditor {
log.Info("Create audit called")
a := Auditor{Config:config}
Expand Down Expand Up @@ -68,17 +68,12 @@ func auditProc(e *libaudit.AuditEvent, err error) {
}
return
}
// Marshal the event to JSON and print
buf, err := json.Marshal(e)
if err != nil {
fmt.Printf("callback was unable to marshal event: %v", err)
return
}
fmt.Printf("%v\n", string(buf))
transChan <- e
}

//StartAudit : Starts Audit
func (au Auditor) StartAudit() {
go au.Transformer.Listen(transChan)
doneCh := make(chan bool, 1)
libaudit.GetAuditMessages(au.netlinkSocket, auditProc, &doneCh)
}
Expand Down Expand Up @@ -107,6 +102,7 @@ func (au Auditor) ConfigureAudit() {
}
rules := au.GetRules()
setRules(au.netlinkSocket, rules)
transChan = make(chan *libaudit.AuditEvent)
}

//StopAudit : Stops audit?
Expand Down
41 changes: 34 additions & 7 deletions transformer/transformerLinux.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package transformer
import (
"fmt"
"github.com/livingstonetech/providence/dispatcher"
"github.com/mozilla/libaudit-go"
"reflect"

"github.com/spf13/viper"
)

Expand All @@ -19,21 +22,45 @@ func CreateTransformer(config *viper.Viper) *Transformer {
Dispatchers: make([]*dispatcher.Dispatcher, 0),
}
dispatchers := make([]*dispatcher.Dispatcher, 0)
for k, _ := range config.GetStringMap("dispatch") {
for k := range config.GetStringMap("dispatch") {
d := dispatcher.CreateDispatcher(config.GetStringMap(fmt.Sprintf("dispatch.%v", k)))
dispatchers = append(dispatchers, d)
}
t.Dispatchers = dispatchers
return &t
}

func (t Transformer) filter(event interface{}) bool {
return true
//Listen causes transformer to listen for events to transform
func (t Transformer) Listen(event chan *libaudit.AuditEvent) {
for {
msg := <- event
go t.Transform(msg)
}

}

func (t Transformer) Transform(event interface{}) {
errChan := make(chan error)
for _, d := range t.Dispatchers {
go d.Dispatch(event, errChan)
func (t Transformer) Transform(event *libaudit.AuditEvent) {
dispatchersLength := len(t.Dispatchers)
var chans []chan error
for i := 0; i < dispatchersLength; i++ {
ch := make(chan error)
chans = append(chans, ch)
go t.Dispatchers[i].Dispatch(event, ch)
}

cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}

remaining := len(cases)
for remaining > 0 {
chosen, _, ok := reflect.Select(cases)
if !ok {
// The chosen channel has been closed, so zero out the channel to disable the case
cases[chosen].Chan = reflect.ValueOf(nil)
remaining--
continue
}
}
}

0 comments on commit ec21751

Please sign in to comment.