Skip to content

Commit

Permalink
Merge pull request #23 from pyrocat101/svcctl
Browse files Browse the repository at this point in the history
svcctl
  • Loading branch information
dzbarsky authored Aug 25, 2024
2 parents 5d20434 + 4e40cb1 commit 8ab79ae
Show file tree
Hide file tree
Showing 19 changed files with 553 additions and 94 deletions.
15 changes: 15 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"files.exclude": {
"**/.git": true,
"**/.svn": true,
"**/.hg": true,
"**/CVS": true,
"**/.DS_Store": true,
"**/Thumbs.db": true,
"**/bazel-bin": true,
"**/bazel-out": true,
"**/bazel-testlogs": true,
"bazel-rules_itest": true,
"tests/bazel-tests": true,
}
}
1 change: 0 additions & 1 deletion MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ module(
bazel_dep(name = "aspect_bazel_lib", version = "1.42.0")
bazel_dep(name = "bazel_skylib", version = "1.7.1")
bazel_dep(name = "rules_go", version = "0.49.0")

bazel_dep(name = "gazelle", version = "0.37.0")

go_deps = use_extension("@gazelle//:extensions.bzl", "go_deps")
Expand Down
1 change: 1 addition & 0 deletions cmd/svcinit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
deps = [
"//logger",
"//runner",
"//svcctl",
"//svclib",
"@rules_go//go/runfiles:go_default_library",
] + select({
Expand Down
25 changes: 19 additions & 6 deletions cmd/svcinit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"rules_itest/logger"
"rules_itest/runner"
"rules_itest/svcctl"
"rules_itest/svclib"
)

Expand Down Expand Up @@ -97,16 +98,28 @@ func main() {
serviceSpecs, err := augmentServiceSpecs(unversionedSpecs, ports)
must(err)

/*if *allowSvcctl {
addr := net.Listen(network, address)
}*/

ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

r, err := runner.New(ctx, serviceSpecs)
must(err)

servicesErrCh := make(chan error, len(serviceSpecs))

listener, err := net.Listen("tcp", "127.0.0.1:0")
must(err)

go func() {
defer listener.Close()
err := svcctl.Serve(ctx, listener, r, servicesErrCh)
if err != nil {
log.Fatalf("svcctl.Serve: %v", err)
}
}()

port := listener.Addr().(*net.TCPAddr).Port
os.Setenv("SVCCTL_PORT", fmt.Sprintf("%d", port))

signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
Expand All @@ -123,7 +136,7 @@ func main() {
}
}()

criticalPath, servicesErrCh, err := r.StartAll()
criticalPath, err := r.StartAll(servicesErrCh)
if errors.Is(err, context.Canceled) {
_, err := r.StopAll()
must(err)
Expand Down Expand Up @@ -257,7 +270,7 @@ func main() {
must(err)

// TODO(zbarsky): what is the right behavior here when services are crashing in ibazel mode?
criticalPath, _, err = r.UpdateSpecsAndRestart(serviceSpecs, []byte(ibazelCmd))
criticalPath, err = r.UpdateSpecsAndRestart(serviceSpecs, servicesErrCh, []byte(ibazelCmd))
must(err)
}
}
Expand Down
12 changes: 12 additions & 0 deletions docs/itest.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ query:enable-reload --@rules_itest//:enable_per_service_reload
In addition, if can set the `hot_reloadable` attribute on an `itest_service`, the service manager will
forward the ibazel hot-reload notification over stdin instead of restarting the service.

# Service control

The svcinit also exposes a HTTP server on `http://127.0.0.1:{SVCCTL_PORT}`. It is useful for tests
that need to start / stop services in the midst of the test run. There are currently 4 API endpoint
available. All of them are GET requests:

1. `/v0/healthcheck?service={label}`: Returns 200 if the service is healthy, 503 otherwise.
2. `/v0/start?service={label}`: Starts the service if it is not already running.
3. `/v0/kill?service={label}[&signal={signal}]`: Send kill signal to the service if it is running.
You can optionally specify the signal to send to the service (valid values: SIGTERM and SIGKILL).
4. `/v0/wait?service={label}`: Wait for the service to exit and returns the exit code in the body.


<a id="itest_service"></a>

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module rules_itest

go 1.21.1
go 1.22.0

require (
github.com/bazelbuild/rules_go v0.49.0
Expand Down
12 changes: 12 additions & 0 deletions private/itest.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ query:enable-reload --@rules_itest//:enable_per_service_reload
In addition, if the `hot_reloadable` attribute is set on an `itest_service`, the service manager will
forward the ibazel hot-reload notification over stdin instead of restarting the service.
# Service control
The svcinit also exposes a HTTP server on `http://127.0.0.1:{SVCCTL_PORT}`. It is useful for tests
that need to start / stop services in the midst of the test run. There are currently 4 API endpoint
available. All of them are GET requests:
1. `/v0/healthcheck?service={label}`: Returns 200 if the service is healthy, 503 otherwise.
2. `/v0/start?service={label}`: Starts the service if it is not already running.
3. `/v0/kill?service={label}[&signal={signal}]`: Send kill signal to the service if it is running.
You can optionally specify the signal to send to the service (valid values: SIGTERM and SIGKILL).
4. `/v0/wait?service={label}`: Wait for the service to exit and returns the exit code in the body.
"""

load("@aspect_bazel_lib//lib:paths.bzl", "to_rlocation_path")
Expand Down
4 changes: 2 additions & 2 deletions runner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go_library(
name = "runner",
srcs = [
"runner.go",
"runner_unix.go",
"runner_windows.go",
"pgroup_unix.go",
"pgroup_windows.go",
"service_instance.go",
"topo.go",
],
Expand Down
4 changes: 2 additions & 2 deletions runner/runner_unix.go → runner/pgroup_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ func setPgid(cmd *exec.Cmd) {
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
}

func killGroup(cmd *exec.Cmd) {
func killGroup(cmd *exec.Cmd, sig syscall.Signal) error {
pid := cmd.Process.Pid
if shouldUseProcessGroups {
pid = -pid
}
syscall.Kill(pid, syscall.SIGKILL)
return syscall.Kill(pid, sig)
}
9 changes: 6 additions & 3 deletions runner/runner_windows.go → runner/pgroup_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

package runner

import "os/exec"
import (
"os/exec"
"syscall"
)

func setPgid(cmd *exec.Cmd) {
panic("Pgid not implemented on windows!")
}

func killGroup(cmd *exec.Cmd) {
func killGroup(cmd *exec.Cmd, _ syscall.Signal) error {
// Windows doesn't have process groups, so just kill the process.
cmd.Process.Kill()
return cmd.Process.Kill()
}
90 changes: 48 additions & 42 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"reflect"
"runtime"
"sync"
"syscall"
"time"

"rules_itest/logger"
Expand All @@ -26,15 +27,15 @@ var shouldUseProcessGroups = runtime.GOOS != "windows" && os.Getenv("BAZEL_TEST"

type ServiceSpecs = map[string]svclib.VersionedServiceSpec

type runner struct {
type Runner struct {
ctx context.Context
serviceSpecs ServiceSpecs

serviceInstances map[string]*ServiceInstance
}

func New(ctx context.Context, serviceSpecs ServiceSpecs) (*runner, error) {
r := &runner{
func New(ctx context.Context, serviceSpecs ServiceSpecs) (*Runner, error) {
r := &Runner{
ctx: ctx,
serviceInstances: map[string]*ServiceInstance{},
}
Expand All @@ -49,13 +50,13 @@ func colorize(s svclib.VersionedServiceSpec) string {
return s.Colorize(s.Label)
}

func (r *runner) StartAll() ([]topological.Task, chan error, error) {
func (r *Runner) StartAll(serviceErrCh chan error) ([]topological.Task, error) {
tasks := allTasks(r.serviceInstances, func(ctx context.Context, service *ServiceInstance) error {
if service.Type == "group" {
return nil
}
log.Printf("Starting %s %v\n", colorize(service.VersionedServiceSpec), service.Args[1:])
startErr := service.Start()
log.Printf("Starting %s %v\n", colorize(service.VersionedServiceSpec), service.cmd.Args[1:])
startErr := service.Start(ctx)
if startErr != nil {
return startErr
}
Expand All @@ -73,7 +74,6 @@ func (r *runner) StartAll() ([]topological.Task, chan error, error) {
starter := topological.NewRunner(tasks)
err := starter.Run(r.ctx)

serviceErrorCh := make(chan error, len(r.serviceInstances))
for _, service := range r.serviceInstances {
if service.Type != "service" {
continue
Expand All @@ -82,22 +82,22 @@ func (r *runner) StartAll() ([]topological.Task, chan error, error) {
// TODO(zbarsky): Can remove the loop var once Go is sufficiently upgraded.
go func(service *ServiceInstance) {
err := service.Wait()
if err != nil {
serviceErrorCh <- fmt.Errorf(colorize(service.VersionedServiceSpec) + " exited with error: " + err.Error())
if err != nil && !service.Killed() {
serviceErrCh <- fmt.Errorf(colorize(service.VersionedServiceSpec) + " exited with error: " + err.Error())
}
}(service)
}

return starter.CriticalPath(), serviceErrorCh, err
return starter.CriticalPath(), err
}

func (r *runner) StopAll() (map[string]*os.ProcessState, error) {
func (r *Runner) StopAll() (map[string]*os.ProcessState, error) {
tasks := allTasks(r.serviceInstances, func(ctx context.Context, service *ServiceInstance) error {
if service.Type == "group" {
return nil
}
log.Printf("Stopping %s\n", colorize(service.VersionedServiceSpec))
stopInstance(service)
service.Stop(syscall.SIGKILL)
return nil
})
stopper := topological.NewReversedRunner(tasks)
Expand All @@ -109,13 +109,13 @@ func (r *runner) StopAll() (map[string]*os.ProcessState, error) {
if serviceInstance.Type == "group" {
continue
}
states[serviceInstance.Label] = serviceInstance.Cmd.ProcessState
states[serviceInstance.Label] = serviceInstance.ProcessState()
}

return states, err
}

func (r *runner) GetStartDurations() map[string]time.Duration {
func (r *Runner) GetStartDurations() map[string]time.Duration {
durations := make(map[string]time.Duration)

for _, serviceInstance := range r.serviceInstances {
Expand All @@ -125,6 +125,10 @@ func (r *runner) GetStartDurations() map[string]time.Duration {
return durations
}

func (r *Runner) GetInstance(label string) *ServiceInstance {
return r.serviceInstances[label]
}

type updateActions struct {
toStopLabels []string
toStartLabels []string
Expand Down Expand Up @@ -169,15 +173,15 @@ func computeUpdateActions(currentServices, newServices ServiceSpecs) updateActio
return actions
}

func (r *runner) UpdateSpecs(serviceSpecs ServiceSpecs, ibazelCmd []byte) error {
func (r *Runner) UpdateSpecs(serviceSpecs ServiceSpecs, ibazelCmd []byte) error {
updateActions := computeUpdateActions(r.serviceSpecs, serviceSpecs)

for _, label := range updateActions.toStopLabels {
serviceInstance := r.serviceInstances[label]
if serviceInstance.Type == "group" {
continue
}
stopInstance(serviceInstance)
serviceInstance.Stop(syscall.SIGKILL)
delete(r.serviceInstances, label)
}

Expand All @@ -190,7 +194,7 @@ func (r *runner) UpdateSpecs(serviceSpecs ServiceSpecs, ibazelCmd []byte) error
}

for _, label := range updateActions.toReloadLabels {
_, err := r.serviceInstances[label].Stdin.Write(ibazelCmd)
_, err := r.serviceInstances[label].stdin.Write(ibazelCmd)
if err != nil {
return err
}
Expand All @@ -200,17 +204,18 @@ func (r *runner) UpdateSpecs(serviceSpecs ServiceSpecs, ibazelCmd []byte) error
return nil
}

func (r *runner) UpdateSpecsAndRestart(
func (r *Runner) UpdateSpecsAndRestart(
serviceSpecs ServiceSpecs,
serviceErrCh chan error,
ibazelCmd []byte,
) (
[]topological.Task, chan error, error,
[]topological.Task, error,
) {
err := r.UpdateSpecs(serviceSpecs, ibazelCmd)
if err != nil {
return nil, nil, err
return nil, err
}
return r.StartAll()
return r.StartAll(serviceErrCh)
}

func prepareServiceInstance(ctx context.Context, s svclib.VersionedServiceSpec) (*ServiceInstance, error) {
Expand All @@ -221,6 +226,21 @@ func prepareServiceInstance(ctx context.Context, s svclib.VersionedServiceSpec)
}, nil
}

instance := &ServiceInstance{
VersionedServiceSpec: s,
}

err := initializeServiceCmd(ctx, instance)
if err != nil {
return nil, err
}

return instance, nil
}

func initializeServiceCmd(ctx context.Context, instance *ServiceInstance) error {
s := instance.VersionedServiceSpec

cmd := exec.CommandContext(ctx, s.Exe, s.Args...)
// Note, this leaks the caller's env into the service, so it's not hermetic.
// For `bazel test`, Bazel is already sanitizing the env, so it's fine.
Expand All @@ -242,32 +262,18 @@ func prepareServiceInstance(ctx context.Context, s svclib.VersionedServiceSpec)
cmd.WaitDelay = 1
}

instance := &ServiceInstance{
VersionedServiceSpec: s,
Cmd: cmd,

startErrFn: sync.OnceValue(cmd.Start),
waitErrFn: sync.OnceValue(cmd.Wait),
}
instance.cmd = cmd
instance.killed = false
instance.startErrFn = sync.OnceValue(cmd.Start)
instance.waitErrFn = sync.OnceValue(cmd.Wait)

if s.HotReloadable {
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
return err
}
instance.Stdin = stdin
}
return instance, nil
}

func stopInstance(serviceInstance *ServiceInstance) {
if serviceInstance.Cmd.Process == nil {
return
instance.stdin = stdin
}

killGroup(serviceInstance.Cmd)

for serviceInstance.Cmd.ProcessState == nil {
time.Sleep(5 * time.Millisecond)
}
return nil
}
Loading

0 comments on commit 8ab79ae

Please sign in to comment.