blob: b8885e7c728dc0ca71540049dae377b9a1fb5cb9 [file]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mongodbio
import (
"context"
"flag"
"fmt"
"reflect"
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/mongodbio"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/test/integration"
"github.com/google/go-cmp/cmp"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
func init() {
beam.RegisterType(reflect.TypeOf((*docWithObjectID)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*docWithStringID)(nil)).Elem())
}
type docWithObjectID struct {
ID primitive.ObjectID `bson:"_id"`
Field1 int32 `bson:"field1"`
}
type docWithStringID struct {
ID string `bson:"_id"`
Field1 int32 `bson:"field1"`
}
func TestMongoDBIO_Read(t *testing.T) {
integration.CheckFilters(t)
ctx := context.Background()
port := setUpTestContainer(ctx, t)
uri := fmt.Sprintf("mongodb://%s:%s", "localhost", port)
tests := []struct {
name string
input []any
t reflect.Type
options []mongodbio.ReadOptionFn
want []any
}{
{
name: "Read documents from MongoDB with id of type primitive.ObjectID",
input: []any{
bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)},
bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)},
bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)},
},
t: reflect.TypeOf(docWithObjectID{}),
want: []any{
docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), Field1: 0},
docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1},
docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2},
},
},
{
name: "Read documents from MongoDB with id of type string",
input: []any{
bson.M{"_id": "id01", "field1": int32(0)},
bson.M{"_id": "id02", "field1": int32(1)},
bson.M{"_id": "id03", "field1": int32(2)},
},
t: reflect.TypeOf(docWithStringID{}),
want: []any{
docWithStringID{ID: "id01", Field1: 0},
docWithStringID{ID: "id02", Field1: 1},
docWithStringID{ID: "id03", Field1: 2},
},
},
{
name: "Read documents from MongoDB where filter matches",
input: []any{
bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)},
bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)},
bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)},
},
t: reflect.TypeOf(docWithObjectID{}),
options: []mongodbio.ReadOptionFn{
mongodbio.WithReadFilter(bson.M{"field1": bson.M{"$gt": 0}}),
},
want: []any{
docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1},
docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2},
},
},
{
name: "Read documents from MongoDB with bucketAuto aggregation",
input: []any{
bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)},
bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)},
bson.M{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)},
},
t: reflect.TypeOf(docWithObjectID{}),
options: []mongodbio.ReadOptionFn{
mongodbio.WithReadBucketAuto(true),
},
want: []any{
docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), Field1: 0},
docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1},
docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
database := "db"
collection := "coll"
client := newClient(ctx, t, uri)
mongoCollection := client.Database(database).Collection(collection)
t.Cleanup(func() {
dropCollection(ctx, t, mongoCollection)
})
writeDocuments(ctx, t, mongoCollection, tt.input)
p, s := beam.NewPipelineWithRoot()
got := mongodbio.Read(s, uri, database, collection, tt.t, tt.options...)
passert.Equals(s, got, tt.want...)
ptest.RunAndValidate(t, p)
})
}
}
func TestMongoDBIO_Write(t *testing.T) {
integration.CheckFilters(t)
ctx := context.Background()
port := setUpTestContainer(ctx, t)
uri := fmt.Sprintf("mongodb://%s:%s", "localhost", port)
tests := []struct {
name string
input []any
options []mongodbio.WriteOptionFn
wantIDs []any
wantDocs []bson.M
}{
{
name: "Write documents to MongoDB with id of type primitive.ObjectID",
input: []any{
docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), Field1: 0},
docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), Field1: 1},
docWithObjectID{ID: objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), Field1: 2},
},
wantIDs: []any{
objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"),
objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"),
objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"),
},
wantDocs: []bson.M{
{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28620"), "field1": int32(0)},
{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28621"), "field1": int32(1)},
{"_id": objectIDFromHex(t, "61cf9980dd2d24dc5cf28622"), "field1": int32(2)},
},
},
{
name: "Write documents to MongoDB with id of type string",
input: []any{
docWithStringID{ID: "id01", Field1: 0},
docWithStringID{ID: "id02", Field1: 1},
docWithStringID{ID: "id03", Field1: 2},
},
wantIDs: []any{
"id01",
"id02",
"id03",
},
wantDocs: []bson.M{
{"_id": "id01", "field1": int32(0)},
{"_id": "id02", "field1": int32(1)},
{"_id": "id03", "field1": int32(2)},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
database := "db"
collection := "coll"
client := newClient(ctx, t, uri)
mongoCollection := client.Database(database).Collection(collection)
t.Cleanup(func() {
dropCollection(ctx, t, mongoCollection)
})
p, s := beam.NewPipelineWithRoot()
col := beam.CreateList(s, tt.input)
gotIDs := mongodbio.Write(s, uri, database, collection, col, tt.options...)
passert.Equals(s, gotIDs, tt.wantIDs...)
ptest.RunAndValidate(t, p)
if gotDocs := readDocuments(ctx, t, mongoCollection); !cmp.Equal(gotDocs, tt.wantDocs) {
t.Errorf("readDocuments() = %v, want %v", gotDocs, tt.wantDocs)
}
})
}
}
func TestMain(m *testing.M) {
flag.Parse()
beam.Init()
ptest.MainRet(m)
}