blob: d30ec9fe11ba2613a4e472a7819b09acc96401dd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package primitives
import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
)
// TestStreamSequence tests the TestStream primitive by inserting string elements
// then advancing the watermark past the point where they were inserted.
func TestStreamStrings(s beam.Scope) {
con := teststream.NewConfig()
con.AddElements(100, "a", "b", "c")
con.AdvanceWatermarkToInfinity()
col := teststream.Create(s, con)
passert.Count(s, col, "teststream strings", 3)
}
// TestStreamByteSliceSequence tests the TestStream primitive by inserting byte slice elements
// then advancing the watermark to infinity and comparing the output..
func TestStreamByteSliceSequence(s beam.Scope) {
con := teststream.NewConfig()
b := []byte{91, 92, 93}
con.AddElements(1, b)
con.AdvanceWatermarkToInfinity()
col := teststream.Create(s, con)
passert.Count(s, col, "teststream byte", 1)
passert.Equals(s, col, append([]byte{3}, b...))
}
// TestStreamInt64Sequence tests the TestStream primitive by inserting int64 elements
// then advancing the watermark past the point where they were inserted.
func TestStreamInt64Sequence(s beam.Scope) {
con := teststream.NewConfig()
ele := []int64{91, 92, 93}
con.AddElementList(100, ele)
con.AdvanceWatermarkToInfinity()
col := teststream.Create(s, con)
passert.Count(s, col, "teststream int64", 3)
passert.EqualsList(s, col, ele)
}
// TestStreamTwoInt64Sequences tests the TestStream primitive by inserting two sets of
// int64 elements that arrive on-time into the TestStream
func TestStreamTwoInt64Sequences(s beam.Scope) {
con := teststream.NewConfig()
eo := []int64{91, 92, 93}
et := []int64{96, 97, 98}
con.AddElementList(100, eo)
con.AdvanceWatermark(110)
con.AddElementList(120, et)
con.AdvanceWatermark(130)
col := teststream.Create(s, con)
passert.Count(s, col, "teststream int64", 6)
passert.EqualsList(s, col, append(eo, et...))
}
// TestStreamFloat64Sequence tests the TestStream primitive by inserting float64 elements
// then advancing the watermark past the point where they were inserted.
func TestStreamFloat64Sequence(s beam.Scope) {
con := teststream.NewConfig()
ele := []float64{91.1, 92.2, 93.3}
con.AddElementList(100, ele)
con.AdvanceWatermarkToInfinity()
col := teststream.Create(s, con)
passert.Count(s, col, "teststream float64", 3)
passert.EqualsList(s, col, ele)
}
// TestStreamTwoFloat64Sequences tests the TestStream primitive by inserting two sets of
// float64 elements that arrive on-time into the TestStream
func TestStreamTwoFloat64Sequences(s beam.Scope) {
con := teststream.NewConfig()
eo := []float64{91.1, 92.2, 93.3}
et := []float64{96.4, 97.5, 98.6}
con.AddElementList(100, eo)
con.AdvanceWatermark(110)
con.AddElementList(120, et)
con.AdvanceWatermark(130)
col := teststream.Create(s, con)
passert.Count(s, col, "teststream float64", 6)
passert.EqualsList(s, col, append(eo, et...))
}
// TestStreamBoolSequence tests the TestStream primitive by inserting boolean elements
// then advancing the watermark past the point where they were inserted.
func TestStreamBoolSequence(s beam.Scope) {
con := teststream.NewConfig()
ele := []bool{true, false, true}
con.AddElementList(100, ele)
con.AdvanceWatermarkToInfinity()
col := teststream.Create(s, con)
passert.Count(s, col, "teststream bool", 3)
passert.EqualsList(s, col, ele)
}
// TestStreamTwoBoolSequences tests the TestStream primitive by inserting two sets of
// boolean elements that arrive on-time into the TestStream
func TestStreamTwoBoolSequences(s beam.Scope) {
con := teststream.NewConfig()
eo := []bool{true, false, true}
et := []bool{false, true, false}
con.AddElementList(100, eo)
con.AdvanceWatermark(110)
con.AddElementList(120, et)
con.AdvanceWatermark(130)
col := teststream.Create(s, con)
passert.Count(s, col, "teststream bool", 6)
passert.EqualsList(s, col, append(eo, et...))
}