blob: e02b8c5f5ecb3c09d02f30e7cc4ed6cfebcf3d2f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xlangx
import (
"strings"
"testing"
pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/testing/protocmp"
)
func expectPanic(t *testing.T, err string) {
if r := recover(); r == nil {
t.Errorf("expected panic; %v", err)
}
}
func TestAddNamespace(t *testing.T) {
tests := []struct {
name string
init *pipepb.Components
namespace string
transformID string
want *pipepb.Components
err string
}{
{
name: "[Correctness] Single Input Multi Output",
init: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"t0": {
UniqueName: "t0",
Inputs: map[string]string{"t0i0": "p0"},
Outputs: map[string]string{"t0o0": "p1", "t0o1": "p2"},
EnvironmentId: "e0",
},
"t1": {
UniqueName: "t1",
Inputs: map[string]string{"t1i0": "p1"},
Outputs: map[string]string{"t1o0": "p3"},
EnvironmentId: "e1",
},
"t2": {
UniqueName: "t2",
Inputs: map[string]string{"t2i0": "p2"},
Outputs: map[string]string{"t2o0": "p4"},
EnvironmentId: "e0",
},
"t3": {
UniqueName: "t3",
Inputs: map[string]string{"t3i0": "p3", "t3i1": "p4"},
Outputs: map[string]string{"t3o0": "p5"},
EnvironmentId: "e1",
},
},
Pcollections: map[string]*pipepb.PCollection{
"p0": {CoderId: "c0", WindowingStrategyId: "w1"},
"p1": {CoderId: "c1", WindowingStrategyId: "w1"},
"p2": {CoderId: "c0", WindowingStrategyId: "w1"},
"p3": {CoderId: "c3", WindowingStrategyId: "w1"},
"p4": {CoderId: "c2", WindowingStrategyId: "w0"},
"p5": {CoderId: "c2", WindowingStrategyId: "w1"},
},
WindowingStrategies: map[string]*pipepb.WindowingStrategy{
"w0": {WindowCoderId: "c3", EnvironmentId: "e0"},
"w1": {WindowCoderId: "c4", EnvironmentId: "e1"},
},
Coders: map[string]*pipepb.Coder{
"c0": {Spec: &pipepb.FunctionSpec{Urn: "c0"}},
"c1": {Spec: &pipepb.FunctionSpec{Urn: "c1"}},
"c2": {Spec: &pipepb.FunctionSpec{Urn: "c2"}},
"c3": {Spec: &pipepb.FunctionSpec{Urn: "c3"}},
"c4": {Spec: &pipepb.FunctionSpec{Urn: "c4"}},
},
Environments: map[string]*pipepb.Environment{
"e0": {Urn: "e0"},
"e1": {Urn: "e1"},
},
},
namespace: "daASxQwenJ",
transformID: "t0",
want: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"t0": {
UniqueName: "t0",
Inputs: map[string]string{"t0i0": "p0"},
Outputs: map[string]string{"t0o0": "p1", "t0o1": "p2"},
EnvironmentId: "e0",
},
"t1": {
UniqueName: "t1",
Inputs: map[string]string{"t1i0": "p1"},
Outputs: map[string]string{"t1o0": "p3"},
EnvironmentId: "e1",
},
"t2": {
UniqueName: "t2",
Inputs: map[string]string{"t2i0": "p2"},
Outputs: map[string]string{"t2o0": "p4"},
EnvironmentId: "e0",
},
"t3": {
UniqueName: "t3",
Inputs: map[string]string{"t3i0": "p3", "t3i1": "p4"},
Outputs: map[string]string{"t3o0": "p5"},
EnvironmentId: "e1",
},
},
Pcollections: map[string]*pipepb.PCollection{
"p0": {CoderId: "c0@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
"p1": {CoderId: "c1@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
"p2": {CoderId: "c0@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
"p3": {CoderId: "c3", WindowingStrategyId: "w1@daASxQwenJ"},
"p4": {CoderId: "c2", WindowingStrategyId: "w0"},
"p5": {CoderId: "c2", WindowingStrategyId: "w1@daASxQwenJ"},
},
WindowingStrategies: map[string]*pipepb.WindowingStrategy{
"w0": {WindowCoderId: "c3", EnvironmentId: "e0"},
"w1@daASxQwenJ": {WindowCoderId: "c4@daASxQwenJ", EnvironmentId: "e1"},
},
Coders: map[string]*pipepb.Coder{
"c0@daASxQwenJ": {Spec: &pipepb.FunctionSpec{Urn: "c0"}},
"c1@daASxQwenJ": {Spec: &pipepb.FunctionSpec{Urn: "c1"}},
"c2": {Spec: &pipepb.FunctionSpec{Urn: "c2"}},
"c3": {Spec: &pipepb.FunctionSpec{Urn: "c3"}},
"c4@daASxQwenJ": {Spec: &pipepb.FunctionSpec{Urn: "c4"}},
},
Environments: map[string]*pipepb.Environment{
"e0": {Urn: "e0"},
"e1": {Urn: "e1"},
},
},
},
{
name: "[Correctness] Single Input Single Output",
init: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"t0": {
UniqueName: "t0",
Inputs: map[string]string{"t0i0": "p0"},
Outputs: map[string]string{"t0o0": "p1", "t0o1": "p2"},
EnvironmentId: "e0",
},
"t1": {
UniqueName: "t1",
Inputs: map[string]string{"t1i0": "p1"},
Outputs: map[string]string{"t1o0": "p3"},
EnvironmentId: "e1",
},
"t2": {
UniqueName: "t2",
Inputs: map[string]string{"t2i0": "p2"},
Outputs: map[string]string{"t2o0": "p4"},
EnvironmentId: "e0",
},
"t3": {
UniqueName: "t3",
Inputs: map[string]string{"t3i0": "p3", "t3i1": "p4"},
Outputs: map[string]string{"t3o0": "p5"},
EnvironmentId: "e1",
},
},
Pcollections: map[string]*pipepb.PCollection{
"p0": {CoderId: "c0", WindowingStrategyId: "w1"},
"p1": {CoderId: "c1", WindowingStrategyId: "w1"},
"p2": {CoderId: "c0", WindowingStrategyId: "w1"},
"p3": {CoderId: "c3", WindowingStrategyId: "w1"},
"p4": {CoderId: "c2", WindowingStrategyId: "w0"},
"p5": {CoderId: "c2", WindowingStrategyId: "w1"},
},
WindowingStrategies: map[string]*pipepb.WindowingStrategy{
"w0": {WindowCoderId: "c3", EnvironmentId: "e0"},
"w1": {WindowCoderId: "c4", EnvironmentId: "e1"},
},
Coders: map[string]*pipepb.Coder{
"c0": {Spec: &pipepb.FunctionSpec{Urn: "c0"}},
"c1": {Spec: &pipepb.FunctionSpec{Urn: "c1"}},
"c2": {Spec: &pipepb.FunctionSpec{Urn: "c2"}},
"c3": {Spec: &pipepb.FunctionSpec{Urn: "c3"}},
"c4": {Spec: &pipepb.FunctionSpec{Urn: "c4"}},
},
Environments: map[string]*pipepb.Environment{
"e0": {Urn: "e0"},
"e1": {Urn: "e1"},
},
},
namespace: "daASxQwenJ",
transformID: "t1",
want: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"t0": {
UniqueName: "t0",
Inputs: map[string]string{"t0i0": "p0"},
Outputs: map[string]string{"t0o0": "p1", "t0o1": "p2"},
EnvironmentId: "e0",
},
"t1": {
UniqueName: "t1",
Inputs: map[string]string{"t1i0": "p1"},
Outputs: map[string]string{"t1o0": "p3"},
EnvironmentId: "e1",
},
"t2": {
UniqueName: "t2",
Inputs: map[string]string{"t2i0": "p2"},
Outputs: map[string]string{"t2o0": "p4"},
EnvironmentId: "e0",
},
"t3": {
UniqueName: "t3",
Inputs: map[string]string{"t3i0": "p3", "t3i1": "p4"},
Outputs: map[string]string{"t3o0": "p5"},
EnvironmentId: "e1",
},
},
Pcollections: map[string]*pipepb.PCollection{
"p0": {CoderId: "c0", WindowingStrategyId: "w1@daASxQwenJ"},
"p1": {CoderId: "c1@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
"p2": {CoderId: "c0", WindowingStrategyId: "w1@daASxQwenJ"},
"p3": {CoderId: "c3@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
"p4": {CoderId: "c2", WindowingStrategyId: "w0"},
"p5": {CoderId: "c2", WindowingStrategyId: "w1@daASxQwenJ"},
},
WindowingStrategies: map[string]*pipepb.WindowingStrategy{
"w0": {WindowCoderId: "c3", EnvironmentId: "e0"},
"w1@daASxQwenJ": {WindowCoderId: "c4@daASxQwenJ", EnvironmentId: "e1"},
},
Coders: map[string]*pipepb.Coder{
"c0": {Spec: &pipepb.FunctionSpec{Urn: "c0"}},
"c1@daASxQwenJ": {Spec: &pipepb.FunctionSpec{Urn: "c1"}},
"c2": {Spec: &pipepb.FunctionSpec{Urn: "c2"}},
"c3@daASxQwenJ": {Spec: &pipepb.FunctionSpec{Urn: "c3"}},
"c4@daASxQwenJ": {Spec: &pipepb.FunctionSpec{Urn: "c4"}},
},
Environments: map[string]*pipepb.Environment{
"e0": {Urn: "e0"},
"e1": {Urn: "e1"},
},
},
},
{
name: "[Correctness] Multi Input Single Output",
init: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"t0": {
UniqueName: "t0",
Inputs: map[string]string{"t0i0": "p0"},
Outputs: map[string]string{"t0o0": "p1", "t0o1": "p2"},
EnvironmentId: "e0",
},
"t1": {
UniqueName: "t1",
Inputs: map[string]string{"t1i0": "p1"},
Outputs: map[string]string{"t1o0": "p3"},
EnvironmentId: "e1",
},
"t2": {
UniqueName: "t2",
Inputs: map[string]string{"t2i0": "p2"},
Outputs: map[string]string{"t2o0": "p4"},
EnvironmentId: "e0",
},
"t3": {
UniqueName: "t3",
Inputs: map[string]string{"t3i0": "p3", "t3i1": "p4"},
Outputs: map[string]string{"t3o0": "p5"},
EnvironmentId: "e1",
},
},
Pcollections: map[string]*pipepb.PCollection{
"p0": {CoderId: "c0", WindowingStrategyId: "w1"},
"p1": {CoderId: "c1", WindowingStrategyId: "w1"},
"p2": {CoderId: "c0", WindowingStrategyId: "w1"},
"p3": {CoderId: "c3", WindowingStrategyId: "w1"},
"p4": {CoderId: "c2", WindowingStrategyId: "w0"},
"p5": {CoderId: "c2", WindowingStrategyId: "w1"},
},
WindowingStrategies: map[string]*pipepb.WindowingStrategy{
"w0": {WindowCoderId: "c3", EnvironmentId: "e0"},
"w1": {WindowCoderId: "c4", EnvironmentId: "e1"},
},
Coders: map[string]*pipepb.Coder{
"c0": {Spec: &pipepb.FunctionSpec{Urn: "c0"}},
"c1": {Spec: &pipepb.FunctionSpec{Urn: "c1"}},
"c2": {Spec: &pipepb.FunctionSpec{Urn: "c2"}},
"c3": {Spec: &pipepb.FunctionSpec{Urn: "c3"}},
"c4": {Spec: &pipepb.FunctionSpec{Urn: "c4"}},
},
Environments: map[string]*pipepb.Environment{
"e0": {Urn: "e0"},
"e1": {Urn: "e1"},
},
},
namespace: "daASxQwenJ",
transformID: "t3",
want: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"t0": {
UniqueName: "t0",
Inputs: map[string]string{"t0i0": "p0"},
Outputs: map[string]string{"t0o0": "p1", "t0o1": "p2"},
EnvironmentId: "e0",
},
"t1": {
UniqueName: "t1",
Inputs: map[string]string{"t1i0": "p1"},
Outputs: map[string]string{"t1o0": "p3"},
EnvironmentId: "e1",
},
"t2": {
UniqueName: "t2",
Inputs: map[string]string{"t2i0": "p2"},
Outputs: map[string]string{"t2o0": "p4"},
EnvironmentId: "e0",
},
"t3": {
UniqueName: "t3",
Inputs: map[string]string{"t3i0": "p3", "t3i1": "p4"},
Outputs: map[string]string{"t3o0": "p5"},
EnvironmentId: "e1",
},
},
Pcollections: map[string]*pipepb.PCollection{
"p0": {CoderId: "c0", WindowingStrategyId: "w1@daASxQwenJ"},
"p1": {CoderId: "c1", WindowingStrategyId: "w1@daASxQwenJ"},
"p2": {CoderId: "c0", WindowingStrategyId: "w1@daASxQwenJ"},
"p3": {CoderId: "c3@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
"p4": {CoderId: "c2@daASxQwenJ", WindowingStrategyId: "w0@daASxQwenJ"},
"p5": {CoderId: "c2@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
},
WindowingStrategies: map[string]*pipepb.WindowingStrategy{
"w0@daASxQwenJ": {WindowCoderId: "c3@daASxQwenJ", EnvironmentId: "e0"},
"w1@daASxQwenJ": {WindowCoderId: "c4@daASxQwenJ", EnvironmentId: "e1"},
},
Coders: map[string]*pipepb.Coder{
"c0": {Spec: &pipepb.FunctionSpec{Urn: "c0"}},
"c1": {Spec: &pipepb.FunctionSpec{Urn: "c1"}},
"c2@daASxQwenJ": {Spec: &pipepb.FunctionSpec{Urn: "c2"}},
"c3@daASxQwenJ": {Spec: &pipepb.FunctionSpec{Urn: "c3"}},
"c4@daASxQwenJ": {Spec: &pipepb.FunctionSpec{Urn: "c4"}},
},
Environments: map[string]*pipepb.Environment{
"e0": {Urn: "e0"},
"e1": {Urn: "e1"},
},
},
},
{
name: "[Correctness] Component Coders",
init: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"t0": {
UniqueName: "t0",
Inputs: map[string]string{"t0i0": "p0"},
Outputs: map[string]string{"t0o0": "p1"},
EnvironmentId: "e0",
},
"t1": {
UniqueName: "t1",
Inputs: map[string]string{"t1i0": "p1"},
Outputs: map[string]string{"t1o0": "p2"},
EnvironmentId: "e1",
},
},
Pcollections: map[string]*pipepb.PCollection{
"p0": {CoderId: "c0", WindowingStrategyId: "w0"},
"p1": {CoderId: "c1", WindowingStrategyId: "w1"},
"p2": {CoderId: "c0", WindowingStrategyId: "w1"},
},
WindowingStrategies: map[string]*pipepb.WindowingStrategy{
"w0": {WindowCoderId: "c3", EnvironmentId: "e0"},
"w1": {WindowCoderId: "c4", EnvironmentId: "e1"},
},
Coders: map[string]*pipepb.Coder{
"c0": {Spec: &pipepb.FunctionSpec{Urn: "c0"}, ComponentCoderIds: []string{"c2"}},
"c1": {Spec: &pipepb.FunctionSpec{Urn: "c1"}},
"c2": {Spec: &pipepb.FunctionSpec{Urn: "c2"}},
"c3": {Spec: &pipepb.FunctionSpec{Urn: "c3"}},
"c4": {Spec: &pipepb.FunctionSpec{Urn: "c4"}},
},
Environments: map[string]*pipepb.Environment{
"e0": {Urn: "e0"},
"e1": {Urn: "e1"},
},
},
namespace: "daASxQwenJ",
transformID: "t0",
want: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"t0": {
UniqueName: "t0",
Inputs: map[string]string{"t0i0": "p0"},
Outputs: map[string]string{"t0o0": "p1"},
EnvironmentId: "e0",
},
"t1": {
UniqueName: "t1",
Inputs: map[string]string{"t1i0": "p1"},
Outputs: map[string]string{"t1o0": "p2"},
EnvironmentId: "e1",
},
},
Pcollections: map[string]*pipepb.PCollection{
"p0": {CoderId: "c0@daASxQwenJ", WindowingStrategyId: "w0@daASxQwenJ"},
"p1": {CoderId: "c1@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
"p2": {CoderId: "c0@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
},
WindowingStrategies: map[string]*pipepb.WindowingStrategy{
"w0@daASxQwenJ": {WindowCoderId: "c3@daASxQwenJ", EnvironmentId: "e0"},
"w1@daASxQwenJ": {WindowCoderId: "c4@daASxQwenJ", EnvironmentId: "e1"},
},
Coders: map[string]*pipepb.Coder{
"c0@daASxQwenJ": {Spec: &pipepb.FunctionSpec{Urn: "c0"}, ComponentCoderIds: []string{"c2@daASxQwenJ"}},
"c1@daASxQwenJ": {Spec: &pipepb.FunctionSpec{Urn: "c1"}},
"c2@daASxQwenJ": {Spec: &pipepb.FunctionSpec{Urn: "c2"}},
"c3@daASxQwenJ": {Spec: &pipepb.FunctionSpec{Urn: "c3"}},
"c4@daASxQwenJ": {Spec: &pipepb.FunctionSpec{Urn: "c4"}},
},
Environments: map[string]*pipepb.Environment{
"e0": {Urn: "e0"},
"e1": {Urn: "e1"},
},
},
},
{
name: "[Consistency] Missing EnvironmentID",
init: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"t0": {
UniqueName: "t0",
Inputs: map[string]string{"t0i0": "p0"},
Outputs: map[string]string{"t0o0": "p1"},
EnvironmentId: "e0",
},
"t1": {
UniqueName: "t1",
Inputs: map[string]string{"t1i0": "p1"},
Outputs: map[string]string{"t1o0": "p2"},
EnvironmentId: "e1",
},
},
Pcollections: map[string]*pipepb.PCollection{
"p0": {CoderId: "c0", WindowingStrategyId: "w0"},
"p1": {CoderId: "c1", WindowingStrategyId: "w1"},
"p2": {CoderId: "c0", WindowingStrategyId: "w1"},
},
WindowingStrategies: map[string]*pipepb.WindowingStrategy{
"w0": {WindowCoderId: "c3", EnvironmentId: "e0"},
"w1": {WindowCoderId: "c4", EnvironmentId: "e1"},
},
Coders: map[string]*pipepb.Coder{
"c0": {Spec: &pipepb.FunctionSpec{Urn: "c0"}},
"c1": {Spec: &pipepb.FunctionSpec{Urn: "c1"}},
},
Environments: map[string]*pipepb.Environment{
// Missing "e0"
"e1": {Urn: "e1"},
},
},
namespace: "daASxQwenJ",
transformID: "t0",
err: "trying to add an Environment whose key is absent should panic",
},
{
name: "[Consistency] Missing WindowingStrategyID",
init: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"t0": {
UniqueName: "t0",
Inputs: map[string]string{"t0i0": "p0"},
Outputs: map[string]string{"t0o0": "p1"},
EnvironmentId: "e0",
},
"t1": {
UniqueName: "t1",
Inputs: map[string]string{"t1i0": "p1"},
Outputs: map[string]string{"t1o0": "p2"},
EnvironmentId: "e1",
},
},
Pcollections: map[string]*pipepb.PCollection{
"p0": {CoderId: "c0", WindowingStrategyId: "w0"},
"p1": {CoderId: "c1", WindowingStrategyId: "w1"},
"p2": {CoderId: "c0", WindowingStrategyId: "w1"},
},
WindowingStrategies: map[string]*pipepb.WindowingStrategy{
// Missing w0
"w1": {WindowCoderId: "c4", EnvironmentId: "e1"},
},
Coders: map[string]*pipepb.Coder{
"c0": {Spec: &pipepb.FunctionSpec{Urn: "c0"}},
"c1": {Spec: &pipepb.FunctionSpec{Urn: "c1"}},
},
Environments: map[string]*pipepb.Environment{
"e0": {Urn: "e0"},
"e1": {Urn: "e1"},
},
},
namespace: "daASxQwenJ",
transformID: "t0",
err: "trying to add an WindowingStrategy whose key is absent should panic",
},
{
name: "[Consistency] Missing CoderID",
init: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"t0": {
UniqueName: "t0",
Inputs: map[string]string{"t0i0": "p0"},
Outputs: map[string]string{"t0o0": "p1"},
EnvironmentId: "e0",
},
"t1": {
UniqueName: "t1",
Inputs: map[string]string{"t1i0": "p1"},
Outputs: map[string]string{"t1o0": "p2"},
EnvironmentId: "e1",
},
},
Pcollections: map[string]*pipepb.PCollection{
"p0": {CoderId: "c0", WindowingStrategyId: "w0"},
"p1": {CoderId: "c1", WindowingStrategyId: "w1"},
"p2": {CoderId: "c0", WindowingStrategyId: "w1"},
},
WindowingStrategies: map[string]*pipepb.WindowingStrategy{
"w0": {WindowCoderId: "c3", EnvironmentId: "e0"},
"w1": {WindowCoderId: "c4", EnvironmentId: "e1"},
},
Coders: map[string]*pipepb.Coder{
// Missing c0
"c1": {Spec: &pipepb.FunctionSpec{Urn: "c1"}},
},
Environments: map[string]*pipepb.Environment{
"e0": {Urn: "e0"},
"e1": {Urn: "e1"},
},
},
namespace: "daASxQwenJ",
transformID: "t0",
err: "trying to add an WindowingStrategy whose key is absent should panic",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if strings.Contains(tt.name, "Correctness") {
transform := tt.init.Transforms[tt.transformID]
addNamespace(transform, tt.init, tt.namespace)
if d := cmp.Diff(tt.want, tt.init, protocmp.Transform()); d != "" {
t.Errorf("diff (-want, +got): %v", d)
}
}
if strings.Contains(tt.name, "Consistency") {
defer expectPanic(t, tt.err)
transform := tt.init.Transforms[tt.transformID]
addNamespace(transform, tt.init, tt.namespace)
}
})
}
}