blob: add2e25681fcc56545cb51d97ddc5dcda148fffe [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pool
import (
"context"
"os/exec"
"testing"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
)
func TestProcess(t *testing.T) {
// Use the no-op true binary, if available, skip this test otherwise.
dummyExec, err := exec.LookPath("true")
if err != nil {
t.Skip("Binary `true` doesn't exist, skipping tests.")
}
endpoint := &pipepb.ApiServiceDescriptor{
Url: "localhost:0",
}
secureEndpoint := &pipepb.ApiServiceDescriptor{
Url: "localhost:0",
Authentication: &pipepb.AuthenticationSpec{
Urn: "beam:authentication:oauth2_client_credentials_grant:v1",
},
}
ctx, cancelFn := context.WithCancel(context.Background())
t.Cleanup(cancelFn)
server, err := New(ctx, 0, dummyExec)
if err != nil {
t.Fatalf("Unable to create server: %v", err)
}
go server.ServeAndWait()
startTests := []struct {
req *fnpb.StartWorkerRequest
errExpected bool
}{
{
req: &fnpb.StartWorkerRequest{
WorkerId: "Worker1",
ControlEndpoint: endpoint,
LoggingEndpoint: endpoint,
},
}, {
req: &fnpb.StartWorkerRequest{
WorkerId: "Worker2",
ControlEndpoint: endpoint,
LoggingEndpoint: endpoint,
},
}, {
req: &fnpb.StartWorkerRequest{
WorkerId: "Worker1",
ControlEndpoint: endpoint,
LoggingEndpoint: endpoint,
},
errExpected: true, // Repeated start
}, {
req: &fnpb.StartWorkerRequest{
WorkerId: "missingControl",
LoggingEndpoint: endpoint,
},
errExpected: true,
}, {
req: &fnpb.StartWorkerRequest{
WorkerId: "missingLogging",
ControlEndpoint: endpoint,
},
errExpected: true,
}, {
req: &fnpb.StartWorkerRequest{
WorkerId: "secureLogging",
LoggingEndpoint: secureEndpoint,
ControlEndpoint: endpoint,
},
errExpected: true,
}, {
req: &fnpb.StartWorkerRequest{
WorkerId: "secureControl",
LoggingEndpoint: endpoint,
ControlEndpoint: secureEndpoint,
},
errExpected: true,
},
}
for _, test := range startTests {
resp, err := server.StartWorker(ctx, test.req)
if test.errExpected {
if err != nil || resp.Error == "" {
t.Errorf("Expected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp)
}
} else {
if err != nil || resp.Error != "" {
t.Errorf("Unexpected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp)
}
}
}
stopTests := []struct {
req *fnpb.StopWorkerRequest
errExpected bool
}{
{
req: &fnpb.StopWorkerRequest{
WorkerId: "Worker1",
},
}, {
req: &fnpb.StopWorkerRequest{
WorkerId: "Worker1",
},
errExpected: true,
}, {
req: &fnpb.StopWorkerRequest{
WorkerId: "NonExistent",
},
errExpected: true,
},
}
for _, test := range stopTests {
resp, err := server.StopWorker(ctx, test.req)
if test.errExpected {
if err != nil || resp.Error == "" {
t.Errorf("Expected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp)
}
} else {
if err != nil || resp.Error != "" {
t.Errorf("Unexpected error starting %v: err: %v, resp: %v", test.req.GetWorkerId(), err, resp)
}
}
}
if err := server.Stop(ctx); err != nil {
t.Fatalf("error stopping server: err: %v", err)
}
}