blob: 5dc4f5616fdddbc6ee051c695f3e44edb8236bb5 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package test
import (
"context"
"sync"
"github.com/onsi/gomega"
"github.com/pkg/errors"
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/pkg/run"
)
type (
StartFunc func() error
StopFunc func()
)
type Flow interface {
// PushErrorHandler only pushes a stopFunc
PushErrorHandler(StopFunc) Flow
// Run calls the startFunc and expects no error returned.
// If a non-nil error is returned or panic, shutdown must be called at once.
// The error will be stored and thus could be checked by the caller later.
Run(context.Context, StartFunc, StopFunc) Flow
// RunWithoutSideEffect calls the startFunc and does not expect any side effects from it.
// If a non-nil error is returned or panic, shutdown must be called at once.
// The error will be stored and thus could be checked by the caller later.
RunWithoutSideEffect(context.Context, StartFunc) Flow
// Shutdown does not actually shutdown the flow,
// but only returns a function containing all stopFunc(s) to be executed.
// The timing to do the real shutdown can be determined by users.
Shutdown() StopFunc
// Error returns all errors returned from startFunc(s)
// Nil error imply a successful flow.
Error() error
}
type testFlow struct {
err error
stopFuncs []StopFunc
}
// NewTestFlow creates a flow ready to prepare services/components to be used for testing.
func NewTestFlow() Flow {
return &testFlow{
stopFuncs: make([]StopFunc, 0),
}
}
// Shutdown does not actually call the shutdown functions but only return a composition of all
// stop functions given by the user thus it allows the user to determine the timing.
func (tf *testFlow) Shutdown() StopFunc {
return func() {
for idx := len(tf.stopFuncs) - 1; idx >= 0; idx-- {
tf.stopFuncs[idx]()
}
}
}
func (tf *testFlow) RunWithoutSideEffect(ctx context.Context, startFunc StartFunc) Flow {
return tf.Run(ctx, startFunc, nil)
}
func (tf *testFlow) Run(ctx context.Context, startFunc StartFunc, stopFunc StopFunc) Flow {
if tf.err != nil {
return tf
}
if stopFunc != nil {
tf.stopFuncs = append(tf.stopFuncs, stopFunc)
}
errCh := make(chan error)
defer func() {
close(errCh)
}()
donec := make(chan struct{})
// start a new goroutine in order to recover from panic
go func() {
defer func() {
if r := recover(); r != nil {
errCh <- errors.Errorf("panic found %v", r)
close(donec)
}
}()
err := startFunc()
if err != nil {
errCh <- err
}
close(donec)
}()
select {
case <-donec:
case err := <-errCh:
tf.err = multierr.Append(tf.err, err)
tf.Shutdown()()
return tf
}
return tf
}
func (tf *testFlow) PushErrorHandler(stopFunc StopFunc) Flow {
tf.stopFuncs = append(tf.stopFuncs, stopFunc)
return tf
}
func (tf *testFlow) Error() error {
return tf.err
}
func SetUpModules(flags []string, units ...run.Unit) func() {
closer := run.NewTester("closer")
g := run.NewGroup("standalone-test")
g.Register(append([]run.Unit{closer}, units...)...)
err := g.RegisterFlags().Parse(flags)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
errRun := g.Run()
gomega.Expect(errRun).ShouldNot(gomega.HaveOccurred())
}()
g.WaitTillReady()
return func() {
closer.GracefulStop()
wg.Wait()
}
}