blob: 3afa2470157c01de6828b4f2b5b66ae45a280ed1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"fmt"
"testing"
"time"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
"github.com/apache/beam/sdks/v2/go/test/integration/internal/jars"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)
type testProcess struct {
killed bool
jar string
}
func (p *testProcess) Kill() error {
p.killed = true
return nil
}
func failRun(_ time.Duration, _ string, _ ...string) (jars.Process, error) {
return nil, fmt.Errorf("unexpectedly running a jar, failing")
}
func succeedRun(_ time.Duration, jar string, _ ...string) (jars.Process, error) {
return &testProcess{jar: jar}, nil
}
// TestExpansionServices_GetAddr_Addresses tests calling GetAddr on provided addresses.
func TestExpansionServices_GetAddr_Addresses(t *testing.T) {
addrsMap := map[string]string{
"label1": "testAddr1",
"label2": "testAddr2",
"label3": "testAddr3",
}
jarsMap := map[string]string{
"label2": "jarFilepath2",
}
es := &ExpansionServices{
addrs: addrsMap,
jars: jarsMap,
procs: make([]jars.Process, 0),
run: failRun,
waitTime: 0,
testMode: true,
}
// Ensure we get the same map we put in, and that addresses take priority over jars if
// both are given for the same label.
for label, wantAddr := range addrsMap {
gotAddr, err := es.GetAddr(label)
if err != nil {
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
continue
}
if gotAddr != wantAddr {
t.Errorf("incorrect address for \"%v\", want %v, got %v", label, wantAddr, gotAddr)
}
}
// Check that nonexistent labels fail.
if _, err := es.GetAddr("nonexistent_label"); err == nil {
t.Errorf("did not receive error when calling GetAddr with nonexistent label")
}
}
// TestExpansionServices_GetAddr_Jars tests calling GetAddr on provided jars.
func TestExpansionServices_GetAddr_Jars(t *testing.T) {
addrsMap := map[string]string{}
jarsMap := map[string]string{
"label1": "jarFilepath1",
"label2": "jarFilepath2",
"label3": "jarFilepath3",
}
es := &ExpansionServices{
addrs: addrsMap,
jars: jarsMap,
procs: make([]jars.Process, 0),
run: succeedRun,
waitTime: 0,
testMode: true,
}
// Call GetAddr on each jar twice, checking that the addresses remain consistent.
gotMap := make(map[string]string)
for label := range jarsMap {
gotAddr, err := es.GetAddr(label)
if err != nil {
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
continue
}
gotMap[label] = gotAddr
}
for label, gotAddr := range gotMap {
secondAddr, err := es.GetAddr(label)
if err != nil {
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
continue
}
if secondAddr != gotAddr {
t.Errorf("getAddr returned different address when called twice for \"%v\", "+
"attempt 1: %v, attempt 2: %v", label, gotAddr, secondAddr)
}
}
// Check that all jars were run.
gotJars := make([]string, 0)
for _, proc := range es.procs {
testProc := proc.(*testProcess)
gotJars = append(gotJars, testProc.jar)
}
wantJars := make([]string, 0)
for _, jar := range jarsMap {
wantJars = append(wantJars, jar)
}
lessFunc := func(a, b string) bool { return a < b }
if diff := cmp.Diff(wantJars, gotJars, cmpopts.SortSlices(lessFunc)); diff != "" {
t.Errorf("processes in ExpansionServices does not match jars that should be running: diff(-want,+got):\n%v", diff)
}
}
// TestExpansionServices_Shutdown tests that a shutdown correctly kills all jars started by an
// ExpansionServices.
func TestExpansionServices_Shutdown(t *testing.T) {
addrsMap := map[string]string{}
jarsMap := map[string]string{
"label1": "jarFilepath1",
"label2": "jarFilepath2",
"label3": "jarFilepath3",
}
es := &ExpansionServices{
addrs: addrsMap,
jars: jarsMap,
procs: make([]jars.Process, 0),
run: succeedRun,
waitTime: 0,
testMode: true,
}
// Call getAddr on each label to run jars.
for label := range addrsMap {
_, err := es.GetAddr(label)
if err != nil {
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
continue
}
}
// Shutdown and confirm that jars are killed and addresses can no longer be retrieved.
procs := es.procs
es.Shutdown()
for _, proc := range procs {
testProc := proc.(*testProcess)
if !testProc.killed {
t.Errorf("process for jar %v was not killed on Shutdown()", testProc.jar)
}
}
for label := range addrsMap {
_, err := es.GetAddr(label)
if err == nil {
t.Errorf("calling GetAddr after Shutdown did not return an error for \"%v\"", label)
}
}
}