215 lines
4.8 KiB
Go
215 lines
4.8 KiB
Go
|
package internal
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"io/ioutil"
|
||
|
"net"
|
||
|
"net/http"
|
||
|
"net/url"
|
||
|
"os"
|
||
|
"os/exec"
|
||
|
"path"
|
||
|
"strconv"
|
||
|
"time"
|
||
|
|
||
|
"github.com/onsi/gomega/gbytes"
|
||
|
"github.com/onsi/gomega/gexec"
|
||
|
|
||
|
"sigs.k8s.io/testing_frameworks/integration/addr"
|
||
|
)
|
||
|
|
||
|
type ProcessState struct {
|
||
|
DefaultedProcessInput
|
||
|
Session *gexec.Session
|
||
|
// Healthcheck Endpoint. If we get http.StatusOK from this endpoint, we
|
||
|
// assume the process is ready to operate. E.g. "/healthz". If this is set,
|
||
|
// we ignore StartMessage.
|
||
|
HealthCheckEndpoint string
|
||
|
// HealthCheckPollInterval is the interval which will be used for polling the
|
||
|
// HealthCheckEndpoint.
|
||
|
// If left empty it will default to 100 Milliseconds.
|
||
|
HealthCheckPollInterval time.Duration
|
||
|
// StartMessage is the message to wait for on stderr. If we recieve this
|
||
|
// message, we assume the process is ready to operate. Ignored if
|
||
|
// HealthCheckEndpoint is specified.
|
||
|
//
|
||
|
// The usage of StartMessage is discouraged, favour HealthCheckEndpoint
|
||
|
// instead!
|
||
|
//
|
||
|
// Deprecated: Use HealthCheckEndpoint in favour of StartMessage
|
||
|
StartMessage string
|
||
|
Args []string
|
||
|
|
||
|
// ready holds wether the process is currently in ready state (hit the ready condition) or not.
|
||
|
// It will be set to true on a successful `Start()` and set to false on a successful `Stop()`
|
||
|
ready bool
|
||
|
}
|
||
|
|
||
|
type DefaultedProcessInput struct {
|
||
|
URL url.URL
|
||
|
Dir string
|
||
|
DirNeedsCleaning bool
|
||
|
Path string
|
||
|
StopTimeout time.Duration
|
||
|
StartTimeout time.Duration
|
||
|
}
|
||
|
|
||
|
func DoDefaulting(
|
||
|
name string,
|
||
|
listenUrl *url.URL,
|
||
|
dir string,
|
||
|
path string,
|
||
|
startTimeout time.Duration,
|
||
|
stopTimeout time.Duration,
|
||
|
) (DefaultedProcessInput, error) {
|
||
|
defaults := DefaultedProcessInput{
|
||
|
Dir: dir,
|
||
|
Path: path,
|
||
|
StartTimeout: startTimeout,
|
||
|
StopTimeout: stopTimeout,
|
||
|
}
|
||
|
|
||
|
if listenUrl == nil {
|
||
|
port, host, err := addr.Suggest()
|
||
|
if err != nil {
|
||
|
return DefaultedProcessInput{}, err
|
||
|
}
|
||
|
defaults.URL = url.URL{
|
||
|
Scheme: "http",
|
||
|
Host: net.JoinHostPort(host, strconv.Itoa(port)),
|
||
|
}
|
||
|
} else {
|
||
|
defaults.URL = *listenUrl
|
||
|
}
|
||
|
|
||
|
if dir == "" {
|
||
|
newDir, err := ioutil.TempDir("", "k8s_test_framework_")
|
||
|
if err != nil {
|
||
|
return DefaultedProcessInput{}, err
|
||
|
}
|
||
|
defaults.Dir = newDir
|
||
|
defaults.DirNeedsCleaning = true
|
||
|
}
|
||
|
|
||
|
if path == "" {
|
||
|
if name == "" {
|
||
|
return DefaultedProcessInput{}, fmt.Errorf("must have at least one of name or path")
|
||
|
}
|
||
|
defaults.Path = BinPathFinder(name)
|
||
|
}
|
||
|
|
||
|
if startTimeout == 0 {
|
||
|
defaults.StartTimeout = 20 * time.Second
|
||
|
}
|
||
|
|
||
|
if stopTimeout == 0 {
|
||
|
defaults.StopTimeout = 20 * time.Second
|
||
|
}
|
||
|
|
||
|
return defaults, nil
|
||
|
}
|
||
|
|
||
|
type stopChannel chan struct{}
|
||
|
|
||
|
func (ps *ProcessState) Start(stdout, stderr io.Writer) (err error) {
|
||
|
if ps.ready {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
command := exec.Command(ps.Path, ps.Args...)
|
||
|
|
||
|
ready := make(chan bool)
|
||
|
timedOut := time.After(ps.StartTimeout)
|
||
|
var pollerStopCh stopChannel
|
||
|
|
||
|
if ps.HealthCheckEndpoint != "" {
|
||
|
healthCheckURL := ps.URL
|
||
|
healthCheckURL.Path = ps.HealthCheckEndpoint
|
||
|
pollerStopCh = make(stopChannel)
|
||
|
go pollURLUntilOK(healthCheckURL, ps.HealthCheckPollInterval, ready, pollerStopCh)
|
||
|
} else {
|
||
|
startDetectStream := gbytes.NewBuffer()
|
||
|
ready = startDetectStream.Detect(ps.StartMessage)
|
||
|
stderr = safeMultiWriter(stderr, startDetectStream)
|
||
|
}
|
||
|
|
||
|
ps.Session, err = gexec.Start(command, stdout, stderr)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case <-ready:
|
||
|
ps.ready = true
|
||
|
return nil
|
||
|
case <-timedOut:
|
||
|
if pollerStopCh != nil {
|
||
|
close(pollerStopCh)
|
||
|
}
|
||
|
if ps.Session != nil {
|
||
|
ps.Session.Terminate()
|
||
|
}
|
||
|
return fmt.Errorf("timeout waiting for process %s to start", path.Base(ps.Path))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func safeMultiWriter(writers ...io.Writer) io.Writer {
|
||
|
safeWriters := []io.Writer{}
|
||
|
for _, w := range writers {
|
||
|
if w != nil {
|
||
|
safeWriters = append(safeWriters, w)
|
||
|
}
|
||
|
}
|
||
|
return io.MultiWriter(safeWriters...)
|
||
|
}
|
||
|
|
||
|
func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh stopChannel) {
|
||
|
if interval <= 0 {
|
||
|
interval = 100 * time.Millisecond
|
||
|
}
|
||
|
for {
|
||
|
res, err := http.Get(url.String())
|
||
|
if err == nil && res.StatusCode == http.StatusOK {
|
||
|
ready <- true
|
||
|
return
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case <-stopCh:
|
||
|
return
|
||
|
default:
|
||
|
time.Sleep(interval)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (ps *ProcessState) Stop() error {
|
||
|
if ps.Session == nil {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// gexec's Session methods (Signal, Kill, ...) do not check if the Process is
|
||
|
// nil, so we are doing this here for now.
|
||
|
// This should probably be fixed in gexec.
|
||
|
if ps.Session.Command.Process == nil {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
detectedStop := ps.Session.Terminate().Exited
|
||
|
timedOut := time.After(ps.StopTimeout)
|
||
|
|
||
|
select {
|
||
|
case <-detectedStop:
|
||
|
break
|
||
|
case <-timedOut:
|
||
|
return fmt.Errorf("timeout waiting for process %s to stop", path.Base(ps.Path))
|
||
|
}
|
||
|
ps.ready = false
|
||
|
if ps.DirNeedsCleaning {
|
||
|
return os.RemoveAll(ps.Dir)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|