blob: 172bdb1799347abe2ec47b1b757761b601130720 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.text;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import java.io.File;
import java.nio.file.Files;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.NumberedShardedFile;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Charsets;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
/** Tests for {@link TextTableProvider}. */
public class TextTableProviderTest {
@Rule public TestPipeline pipeline = TestPipeline.create();
@Rule
public TemporaryFolder tempFolder =
new TemporaryFolder() {
@Override
protected void after() {}
};
private static final String SQL_CSV_SCHEMA = "(f_string VARCHAR, f_int INT)";
private static final Schema CSV_SCHEMA =
Schema.builder()
.addNullableField("f_string", Schema.FieldType.STRING)
.addNullableField("f_int", Schema.FieldType.INT32)
.build();
private static final Schema LINES_SCHEMA = Schema.builder().addStringField("f_string").build();
private static final String SQL_LINES_SCHEMA = "(f_string VARCHAR)";
// Even though these have the same schema as LINES_SCHEMA, that is accidental; they exist for a
// different purpose, to test Excel CSV format that does not ignore empty lines
private static final Schema SINGLE_STRING_CSV_SCHEMA =
Schema.builder().addStringField("f_string").build();
private static final String SINGLE_STRING_SQL_SCHEMA = "(f_string VARCHAR)";
/**
* Tests {@code CREATE EXTERNAL TABLE TYPE text} with no format reads a default CSV.
*
* <p>The default format ignores empty lines, so that is an important part of this test.
*/
@Test
public void testLegacyDefaultCsv() throws Exception {
Files.write(
tempFolder.newFile("test.csv").toPath(),
"hello,13\n\ngoodbye,42\n".getBytes(Charsets.UTF_8));
BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*'",
SQL_CSV_SCHEMA, tempFolder.getRoot()));
PCollection<Row> rows =
BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM test"));
PAssert.that(rows)
.containsInAnyOrder(
Row.withSchema(CSV_SCHEMA).addValues("hello", 13).build(),
Row.withSchema(CSV_SCHEMA).addValues("goodbye", 42).build());
pipeline.run();
}
/**
* Tests {@code CREATE EXTERNAL TABLE TYPE text} with a format other than "csv" or "lines" results
* in a CSV read of that format.
*/
@Test
public void testLegacyTdfCsv() throws Exception {
Files.write(
tempFolder.newFile("test.csv").toPath(),
"hello\t13\n\ngoodbye\t42\n".getBytes(Charsets.UTF_8));
BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*' TBLPROPERTIES '{\"format\":\"TDF\"}'",
SQL_CSV_SCHEMA, tempFolder.getRoot()));
PCollection<Row> rows =
BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM test"));
rows.apply(
MapElements.into(TypeDescriptors.voids())
.via(
r -> {
System.out.println(r.toString());
return null;
}));
PAssert.that(rows)
.containsInAnyOrder(
Row.withSchema(CSV_SCHEMA).addValues("hello", 13).build(),
Row.withSchema(CSV_SCHEMA).addValues("goodbye", 42).build());
pipeline.run();
}
/**
* Tests {@code CREATE EXTERNAL TABLE TYPE text TBLPROPERTIES '{"format":"csv"}'} works as
* expected.
*/
@Test
public void testExplicitCsv() throws Exception {
Files.write(
tempFolder.newFile("test.csv").toPath(),
"hello,13\n\ngoodbye,42\n".getBytes(Charsets.UTF_8));
BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*' TBLPROPERTIES '{\"format\":\"csv\"}'",
SQL_CSV_SCHEMA, tempFolder.getRoot()));
PCollection<Row> rows =
BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM test"));
PAssert.that(rows)
.containsInAnyOrder(
Row.withSchema(CSV_SCHEMA).addValues("hello", 13).build(),
Row.withSchema(CSV_SCHEMA).addValues("goodbye", 42).build());
pipeline.run();
}
/**
* Tests {@code CREATE EXTERNAL TABLE TYPE text TBLPROPERTIES '{"format":"csv", "csvFormat":
* "Excel"}'} works as expected.
*
* <p>Not that the different with "Excel" format is that blank lines are not ignored but have a
* single string field.
*/
@Test
public void testExplicitCsvExcel() throws Exception {
Files.write(
tempFolder.newFile("test.csv").toPath(), "hello\n\ngoodbye\n".getBytes(Charsets.UTF_8));
BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*' "
+ "TBLPROPERTIES '{\"format\":\"csv\", \"csvFormat\":\"Excel\"}'",
SINGLE_STRING_SQL_SCHEMA, tempFolder.getRoot()));
PCollection<Row> rows =
BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM test"));
PAssert.that(rows)
.containsInAnyOrder(
Row.withSchema(SINGLE_STRING_CSV_SCHEMA).addValues("hello").build(),
Row.withSchema(SINGLE_STRING_CSV_SCHEMA).addValues("goodbye").build());
pipeline.run();
}
/**
* Tests {@code CREATE EXTERNAL TABLE TYPE text TBLPROPERTIES '{"format":"lines"}'} works as
* expected.
*/
@Test
public void testLines() throws Exception {
// Data that looks like CSV but isn't parsed as it
Files.write(
tempFolder.newFile("test.csv").toPath(), "hello,13\ngoodbye,42\n".getBytes(Charsets.UTF_8));
BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*' TBLPROPERTIES '{\"format\":\"lines\"}'",
SQL_LINES_SCHEMA, tempFolder.getRoot()));
PCollection<Row> rows =
BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM test"));
PAssert.that(rows)
.containsInAnyOrder(
Row.withSchema(LINES_SCHEMA).addValues("hello,13").build(),
Row.withSchema(LINES_SCHEMA).addValues("goodbye,42").build());
pipeline.run();
}
@Test
public void testWriteLines() throws Exception {
File destinationFile = new File(tempFolder.getRoot(), "lines-outputs");
BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"lines\"}'",
SQL_LINES_SCHEMA, destinationFile.getAbsolutePath()));
BeamSqlRelUtils.toPCollection(
pipeline, env.parseQuery("INSERT INTO test VALUES ('hello'), ('goodbye')"));
pipeline.run();
assertThat(
new NumberedShardedFile(destinationFile.getAbsolutePath() + "*")
.readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
containsInAnyOrder("hello", "goodbye"));
}
@Test
public void testWriteCsv() throws Exception {
File destinationFile = new File(tempFolder.getRoot(), "csv-outputs");
BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
// NumberedShardedFile
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\"}'",
SQL_CSV_SCHEMA, destinationFile.getAbsolutePath()));
BeamSqlRelUtils.toPCollection(
pipeline, env.parseQuery("INSERT INTO test VALUES ('hello', 42), ('goodbye', 13)"));
pipeline.run();
assertThat(
new NumberedShardedFile(destinationFile.getAbsolutePath() + "*")
.readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
containsInAnyOrder("hello,42", "goodbye,13"));
}
}