blob: ac0af04298e9527bd39a472b7a3d20934e153c9b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.snowflake.test.unit.write;
import static org.junit.Assert.assertTrue;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
import org.apache.beam.sdk.io.snowflake.data.SnowflakeColumn;
import org.apache.beam.sdk.io.snowflake.data.SnowflakeTableSchema;
import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeDate;
import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeDateTime;
import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeTime;
import org.apache.beam.sdk.io.snowflake.data.structured.SnowflakeArray;
import org.apache.beam.sdk.io.snowflake.data.structured.SnowflakeObject;
import org.apache.beam.sdk.io.snowflake.data.structured.SnowflakeVariant;
import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeText;
import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeServiceImpl;
import org.apache.beam.sdk.io.snowflake.test.TestUtils;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class SchemaDispositionTest {
private static final String FAKE_TABLE = "FAKE_TABLE";
private static final String BUCKET_NAME = "BUCKET/";
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@Rule public ExpectedException exceptionRule = ExpectedException.none();
private static SnowflakePipelineOptions options;
private static SnowflakeIO.DataSourceConfiguration dc;
private static String stagingBucketName;
private static String storageIntegrationName;
private static SnowflakeService snowflakeService;
@BeforeClass
public static void setupAll() {
PipelineOptionsFactory.register(SnowflakePipelineOptions.class);
options = TestPipeline.testingPipelineOptions().as(SnowflakePipelineOptions.class);
options.setStagingBucketName(BUCKET_NAME);
options.setServerName("NULL.snowflakecomputing.com");
stagingBucketName = options.getStagingBucketName();
storageIntegrationName = options.getStorageIntegrationName();
snowflakeService = new FakeSnowflakeServiceImpl();
dc =
SnowflakeIO.DataSourceConfiguration.create(new FakeSnowflakeBasicDataSource())
.withServerName(options.getServerName());
}
@Before
public void setup() {}
@After
public void tearDown() {
TestUtils.removeTempDir(BUCKET_NAME);
FakeSnowflakeDatabase.clean();
}
public static SnowflakeIO.UserDataMapper<String[]> getCsvMapper() {
return (SnowflakeIO.UserDataMapper<String[]>) recordLine -> recordLine;
}
@Test
public void writeWithCreatedTableWithDatetimeSchemaSuccess() throws SQLException {
List<String[]> testDates =
LongStream.range(0, 100)
.boxed()
.map(num -> new String[] {"2020-08-25", "2014-01-01 16:00:00", "00:02:03"})
.collect(Collectors.toList());
List<String> testDatesSnowflakeFormat =
testDates.stream().map(TestUtils::toSnowflakeRow).collect(Collectors.toList());
SnowflakeTableSchema tableSchema =
new SnowflakeTableSchema(
SnowflakeColumn.of("date", SnowflakeDate.of()),
SnowflakeColumn.of("datetime", SnowflakeDateTime.of()),
SnowflakeColumn.of("time", SnowflakeTime.of()));
pipeline
.apply(Create.of(testDates))
.apply(
"Copy IO",
SnowflakeIO.<String[]>write()
.withDataSourceConfiguration(dc)
.to("NO_EXIST_TABLE")
.withTableSchema(tableSchema)
.withStagingBucketName(stagingBucketName)
.withStorageIntegrationName(storageIntegrationName)
.withFileNameTemplate("output")
.withUserDataMapper(TestUtils.getLStringCsvMapper())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withSnowflakeService(snowflakeService));
pipeline.run(options).waitUntilFinish();
List<String> actualData = FakeSnowflakeDatabase.getElements("NO_EXIST_TABLE");
assertTrue(TestUtils.areListsEqual(testDatesSnowflakeFormat, actualData));
}
@Test
public void writeWithCreatedTableWithNullValuesInSchemaSuccess() throws SnowflakeSQLException {
List<String[]> testNulls =
LongStream.range(0, 100)
.boxed()
.map(num -> new String[] {null, null, null})
.collect(Collectors.toList());
List<String> testNullsSnowflakeFormat =
testNulls.stream().map(TestUtils::toSnowflakeRow).collect(Collectors.toList());
SnowflakeTableSchema tableSchema =
new SnowflakeTableSchema(
SnowflakeColumn.of("date", SnowflakeDate.of(), true),
new SnowflakeColumn("datetime", SnowflakeDateTime.of(), true),
SnowflakeColumn.of("text", SnowflakeText.of(), true));
pipeline
.apply(Create.of(testNulls))
.apply(
"Copy IO",
SnowflakeIO.<String[]>write()
.withDataSourceConfiguration(dc)
.to("NO_EXIST_TABLE")
.withTableSchema(tableSchema)
.withStagingBucketName(stagingBucketName)
.withStorageIntegrationName(storageIntegrationName)
.withFileNameTemplate("output")
.withUserDataMapper(getCsvMapper())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withSnowflakeService(snowflakeService));
pipeline.run(options).waitUntilFinish();
List<String> actualData = FakeSnowflakeDatabase.getElements("NO_EXIST_TABLE");
assertTrue(TestUtils.areListsEqual(testNullsSnowflakeFormat, actualData));
}
@Test
public void writeWithCreatedTableWithStructuredDataSchemaSuccess() throws SQLException {
String json = "{ \"key1\": 1, \"key2\": {\"inner_key\": \"value2\", \"inner_key2\":18} }";
String array = "[1,2,3]";
List<String[]> testStructuredData =
LongStream.range(0, 100)
.boxed()
.map(num -> new String[] {json, array, json})
.collect(Collectors.toList());
List<String> testStructuredDataSnowflakeFormat =
testStructuredData.stream().map(TestUtils::toSnowflakeRow).collect(Collectors.toList());
SnowflakeTableSchema tableSchema =
new SnowflakeTableSchema(
SnowflakeColumn.of("variant", SnowflakeArray.of()),
SnowflakeColumn.of("object", SnowflakeObject.of()),
SnowflakeColumn.of("array", SnowflakeVariant.of()));
pipeline
.apply(Create.of(testStructuredData))
.apply(
"Copy IO",
SnowflakeIO.<String[]>write()
.withDataSourceConfiguration(dc)
.to("NO_EXIST_TABLE")
.withTableSchema(tableSchema)
.withStagingBucketName(stagingBucketName)
.withStorageIntegrationName(storageIntegrationName)
.withFileNameTemplate("output")
.withUserDataMapper(getCsvMapper())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withSnowflakeService(snowflakeService));
pipeline.run(options).waitUntilFinish();
List<String> actualData = FakeSnowflakeDatabase.getElements("NO_EXIST_TABLE");
assertTrue(TestUtils.areListsEqual(testStructuredDataSnowflakeFormat, actualData));
}
}