blob: 9deb0353d60e12df915fcb8c9ca91b06c59e612f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql;
import java.math.BigDecimal;
import java.text.ParseException;
import java.util.List;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.ExpectedException;
/**
* prepare input records to test {@link BeamSql}.
*
* <p>Note that, any change in these records would impact tests in this package.
*/
public class BeamSqlDslBase {
public static final DateTimeFormatter FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
@Rule public final TestPipeline pipeline = TestPipeline.create();
@Rule public ExpectedException exceptions = ExpectedException.none();
static Schema schemaInTableA;
static List<Row> rowsInTableA;
//bounded PCollections
PCollection<Row> boundedInput1;
PCollection<Row> boundedInput2;
//unbounded PCollections
PCollection<Row> unboundedInput1;
PCollection<Row> unboundedInput2;
@BeforeClass
public static void prepareClass() throws ParseException {
schemaInTableA =
Schema.builder()
.addInt32Field("f_int")
.addInt64Field("f_long")
.addInt16Field("f_short")
.addByteField("f_byte")
.addFloatField("f_float")
.addDoubleField("f_double")
.addStringField("f_string")
.addDateTimeField("f_timestamp")
.addInt32Field("f_int2")
.addDecimalField("f_decimal")
.build();
rowsInTableA =
TestUtils.RowsBuilder.of(schemaInTableA)
.addRows(
1,
1000L,
(short) 1,
(byte) 1,
1.0f,
1.0d,
"string_row1",
FORMAT.parseDateTime("2017-01-01 01:01:03"),
0,
new BigDecimal(1))
.addRows(
2,
2000L,
(short) 2,
(byte) 2,
2.0f,
2.0d,
"string_row2",
FORMAT.parseDateTime("2017-01-01 01:02:03"),
0,
new BigDecimal(2))
.addRows(
3,
3000L,
(short) 3,
(byte) 3,
3.0f,
3.0d,
"string_row3",
FORMAT.parseDateTime("2017-01-01 01:06:03"),
0,
new BigDecimal(3))
.addRows(
4,
4000L,
(short) 4,
(byte) 4,
4.0f,
4.0d,
"第四行",
FORMAT.parseDateTime("2017-01-01 02:04:03"),
0,
new BigDecimal(4))
.getRows();
}
@Before
public void preparePCollections() {
boundedInput1 =
pipeline.apply(
"boundedInput1",
Create.of(rowsInTableA)
.withSchema(
schemaInTableA,
SerializableFunctions.identity(),
SerializableFunctions.identity()));
boundedInput2 =
pipeline.apply(
"boundedInput2",
Create.of(rowsInTableA.get(0))
.withSchema(
schemaInTableA,
SerializableFunctions.identity(),
SerializableFunctions.identity()));
unboundedInput1 = prepareUnboundedPCollection1();
unboundedInput2 = prepareUnboundedPCollection2();
}
private PCollection<Row> prepareUnboundedPCollection1() {
TestStream.Builder<Row> values =
TestStream.create(
schemaInTableA, SerializableFunctions.identity(), SerializableFunctions.identity());
for (Row row : rowsInTableA) {
values = values.advanceWatermarkTo(new Instant(row.getDateTime("f_timestamp")));
values = values.addElements(row);
}
return PBegin.in(pipeline)
.apply("unboundedInput1", values.advanceWatermarkToInfinity())
.apply(
"unboundedInput1.fixedWindow1year",
Window.into(FixedWindows.of(Duration.standardDays(365))));
}
private PCollection<Row> prepareUnboundedPCollection2() {
TestStream.Builder<Row> values =
TestStream.create(
schemaInTableA, SerializableFunctions.identity(), SerializableFunctions.identity());
Row row = rowsInTableA.get(0);
values = values.advanceWatermarkTo(new Instant(row.getDateTime("f_timestamp")));
values = values.addElements(row);
return PBegin.in(pipeline)
.apply("unboundedInput2", values.advanceWatermarkToInfinity())
.apply(
"unboundedInput2.fixedWindow1year",
Window.into(FixedWindows.of(Duration.standardDays(365))));
}
}