blob: 85441df5ef44fc0b96a6c98a288e572cc87c00da [file] [log] [blame]
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package pf
import (
"context"
"fmt"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func testProcessSpawnerHealthCheckTimer(
tkr *time.Ticker, lastHealthCheckTs int64, expectedHealthCheckInterval int32, counter *int) {
fmt.Println("Starting processSpawnerHealthCheckTimer")
now := time.Now()
maxIdleTime := int64(time.Duration(expectedHealthCheckInterval) * 3 * time.Second)
fmt.Println("maxIdleTime is: " + strconv.FormatInt(maxIdleTime, 10))
timeSinceLastCheck := now.UnixNano() - lastHealthCheckTs
fmt.Println("timeSinceLastCheck is: " + strconv.FormatInt(timeSinceLastCheck, 10))
if (timeSinceLastCheck) > (maxIdleTime) {
fmt.Println("Haven't received health check from spawner in a while. Stopping instance...")
// os.Exit(1)
tkr.Stop()
} else {
fmt.Println("Continuing to check")
*counter++
}
}
func testStartScheduler(counter *int) {
now := time.Now()
lastHealthCheckTs := now.UnixNano()
var expectedHealthCheckInterval int32 = 1
if expectedHealthCheckInterval > 0 {
fmt.Println("Starting Scheduler")
go func() {
fmt.Println("Started Scheduler")
period := time.Second * time.Duration(expectedHealthCheckInterval)
fmt.Println("period is: " + period.String())
tkr := time.NewTicker(period)
for range tkr.C {
fmt.Println("Starting Timer")
testProcessSpawnerHealthCheckTimer(tkr, lastHealthCheckTs, expectedHealthCheckInterval, counter)
}
}()
}
}
func TestInstance_HeartbeatTimer(t *testing.T) {
counter := 0
testStartScheduler(&counter)
time.Sleep(time.Second * 10)
assert.Equal(t, 2, counter)
}
func TestTime_EqualsThreeSecondsFixed(t *testing.T) {
var expectedHealthCheckInterval int32 = 3
timeAmount := time.Millisecond * 1000 * time.Duration(expectedHealthCheckInterval)
assert.Equal(t, time.Second*3, timeAmount)
}
func TestTime_EqualsThreeSecondsTimed(t *testing.T) {
start := time.Now()
startTime := start.UnixNano()
time.Sleep(time.Second * 3)
end := time.Now()
endTime := end.UnixNano()
diff := endTime - startTime
assert.True(t, time.Duration(diff) > time.Second*3)
assert.True(t, time.Duration(diff) < time.Millisecond*3100)
}
type MockHandler struct{}
func (m *MockHandler) process(ctx context.Context, input []byte) ([]byte, error) {
return []byte(`output`), nil
}
func Test_goInstance_handlerMsg(t *testing.T) {
handler := &MockHandler{}
fc := NewFuncContext()
instance := &goInstance{
function: handler,
context: fc,
}
message := &MockMessage{payload: []byte(`{}`)}
output, err := instance.handlerMsg(message)
assert.Nil(t, err)
assert.Equal(t, "output", string(output))
assert.Equal(t, message, fc.record)
}