blob: 7dca69a6085ca3b5facc35f970b97f73683dfd0a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
/** Test utilities. */
public class TestUtils {
/** A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}. */
public static class BeamSqlRow2StringDoFn extends DoFn<Row, String> {
@ProcessElement
public void processElement(ProcessContext ctx) {
ctx.output(ctx.element().toString());
}
}
/** Convert list of {@code BeamSqlRow} to list of {@code String}. */
public static List<String> beamSqlRows2Strings(List<Row> rows) {
List<String> strs = new ArrayList<>();
for (Row row : rows) {
strs.add(row.toString());
}
return strs;
}
public static RowsBuilder rowsBuilderOf(Schema type) {
return RowsBuilder.of(type);
}
/**
* Convenient way to build a list of {@code BeamSqlRow}s.
*
* <p>You can use it like this:
*
* <pre>{@code
* TestUtils.RowsBuilder.of(
* Types.INTEGER, "order_id",
* Types.INTEGER, "sum_site_id",
* Types.VARCHAR, "buyer"
* ).addRows(
* 1, 3, "james",
* 2, 5, "bond"
* ).getStringRows()
* }</pre>
*
* {@code}
*/
public static class RowsBuilder {
private Schema type;
private List<Row> rows = new ArrayList<>();
/**
* Create a RowsBuilder with the specified row type info.
*
* <p>For example:
*
* <pre>{@code
* TestUtils.RowsBuilder.of(
* Types.INTEGER, "order_id",
* Types.INTEGER, "sum_site_id",
* Types.VARCHAR, "buyer"
* )
* }</pre>
*
* @args pairs of column type and column names.
*/
public static RowsBuilder of(final Object... args) {
Schema beamSQLSchema = TestTableUtils.buildBeamSqlSchema(args);
RowsBuilder builder = new RowsBuilder();
builder.type = beamSQLSchema;
return builder;
}
/**
* Create a RowsBuilder with the specified row type info.
*
* <p>For example:
*
* <pre>{@code
* TestUtils.RowsBuilder.of(
* schema
* )
* }</pre>
*/
public static RowsBuilder of(final Schema schema) {
RowsBuilder builder = new RowsBuilder();
builder.type = schema;
return builder;
}
/**
* Add rows to the builder.
*
* <p>Note: check the class javadoc for for detailed example.
*/
public RowsBuilder addRows(final Object... args) {
this.rows.addAll(TestTableUtils.buildRows(type, Arrays.asList(args)));
return this;
}
/**
* Add rows to the builder.
*
* <p>Note: check the class javadoc for for detailed example.
*/
public RowsBuilder addRows(final List args) {
this.rows.addAll(TestTableUtils.buildRows(type, args));
return this;
}
public List<Row> getRows() {
return rows;
}
public List<String> getStringRows() {
return beamSqlRows2Strings(rows);
}
public PCollectionBuilder getPCollectionBuilder() {
return pCollectionBuilder().withSchema(type).withRows(rows);
}
}
public static PCollectionBuilder pCollectionBuilder() {
return new PCollectionBuilder();
}
static class PCollectionBuilder {
private Schema type;
private List<Row> rows;
private String timestampField;
private Pipeline pipeline;
public PCollectionBuilder withSchema(Schema type) {
this.type = type;
return this;
}
public PCollectionBuilder withRows(List<Row> rows) {
this.rows = rows;
return this;
}
/** Event time field, defines watermark. */
public PCollectionBuilder withTimestampField(String timestampField) {
this.timestampField = timestampField;
return this;
}
public PCollectionBuilder inPipeline(Pipeline pipeline) {
this.pipeline = pipeline;
return this;
}
/**
* Builds an unbounded {@link PCollection} in {@link Pipeline} set by {@link
* #inPipeline(Pipeline)}.
*
* <p>If timestamp field was set with {@link #withTimestampField(String)} then watermark will be
* advanced to the values from that field.
*/
public PCollection<Row> buildUnbounded() {
checkArgument(pipeline != null);
checkArgument(rows.size() > 0);
if (type == null) {
type = rows.get(0).getSchema();
}
TestStream.Builder<Row> values =
TestStream.create(
type, SerializableFunctions.identity(), SerializableFunctions.identity());
for (Row row : rows) {
if (timestampField != null) {
values = values.advanceWatermarkTo(new Instant(row.getDateTime(timestampField)));
}
values = values.addElements(row);
}
return PBegin.in(pipeline).apply("unboundedPCollection", values.advanceWatermarkToInfinity());
}
}
public static <T> PCollectionTuple tuple(String tag, PCollection<T> pCollection) {
return PCollectionTuple.of(new TupleTag<>(tag), pCollection);
}
public static <T, V> PCollectionTuple tuple(
String tag1, PCollection<T> pCollection1, String tag2, PCollection<V> pCollection2) {
return tuple(tag1, pCollection1).and(new TupleTag<>(tag2), pCollection2);
}
public static <T, V, W> PCollectionTuple tuple(
String tag1,
PCollection<T> pCollection1,
String tag2,
PCollection<V> pCollection2,
String tag3,
PCollection<W> pCollection3) {
return tuple(
tag1, pCollection1,
tag2, pCollection2)
.and(new TupleTag<>(tag3), pCollection3);
}
}