| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.clickhouse; |
| |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| |
| import java.sql.ResultSet; |
| import java.util.Arrays; |
| import java.util.Objects; |
| import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType; |
| import org.apache.beam.sdk.schemas.JavaFieldSchema; |
| import org.apache.beam.sdk.schemas.Schema; |
| import org.apache.beam.sdk.schemas.Schema.FieldType; |
| import org.apache.beam.sdk.schemas.annotations.DefaultSchema; |
| import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; |
| import org.apache.beam.sdk.testing.TestPipeline; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.values.Row; |
| import org.joda.time.DateTime; |
| import org.joda.time.DateTimeZone; |
| import org.junit.Ignore; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| /** Tests for {@link ClickHouseIO}. */ |
| @RunWith(JUnit4.class) |
| public class ClickHouseIOTest extends BaseClickHouseTest { |
| |
| @Rule public TestPipeline pipeline = TestPipeline.create(); |
| |
| @Test |
| public void testInt64() throws Exception { |
| Schema schema = |
| Schema.of(Schema.Field.of("f0", FieldType.INT64), Schema.Field.of("f1", FieldType.INT64)); |
| Row row1 = Row.withSchema(schema).addValue(1L).addValue(2L).build(); |
| Row row2 = Row.withSchema(schema).addValue(2L).addValue(4L).build(); |
| Row row3 = Row.withSchema(schema).addValue(3L).addValue(6L).build(); |
| |
| executeSql("CREATE TABLE test_int64 (f0 Int64, f1 Int64) ENGINE=Log"); |
| |
| pipeline.apply(Create.of(row1, row2, row3).withRowSchema(schema)).apply(write("test_int64")); |
| |
| pipeline.run().waitUntilFinish(); |
| |
| long sum0 = executeQueryAsLong("SELECT SUM(f0) FROM test_int64"); |
| long sum1 = executeQueryAsLong("SELECT SUM(f1) FROM test_int64"); |
| |
| assertEquals(6L, sum0); |
| assertEquals(12L, sum1); |
| } |
| |
| @Test |
| public void testNullableInt64() throws Exception { |
| Schema schema = Schema.of(Schema.Field.nullable("f0", FieldType.INT64)); |
| Row row1 = Row.withSchema(schema).addValue(1L).build(); |
| Row row2 = Row.withSchema(schema).addValue(null).build(); |
| Row row3 = Row.withSchema(schema).addValue(3L).build(); |
| |
| executeSql("CREATE TABLE test_nullable_int64 (f0 Nullable(Int64)) ENGINE=Log"); |
| |
| pipeline |
| .apply(Create.of(row1, row2, row3).withRowSchema(schema)) |
| .apply(write("test_nullable_int64")); |
| |
| pipeline.run().waitUntilFinish(); |
| |
| long sum = executeQueryAsLong("SELECT SUM(f0) FROM test_nullable_int64"); |
| long count0 = executeQueryAsLong("SELECT COUNT(*) FROM test_nullable_int64"); |
| long count1 = executeQueryAsLong("SELECT COUNT(f0) FROM test_nullable_int64"); |
| |
| assertEquals(4L, sum); |
| assertEquals(3L, count0); |
| assertEquals(2L, count1); |
| } |
| |
| @Test |
| public void testInt64WithDefault() throws Exception { |
| Schema schema = Schema.of(Schema.Field.nullable("f0", FieldType.INT64)); |
| Row row1 = Row.withSchema(schema).addValue(1L).build(); |
| Row row2 = Row.withSchema(schema).addValue(null).build(); |
| Row row3 = Row.withSchema(schema).addValue(3L).build(); |
| |
| executeSql("CREATE TABLE test_int64_with_default (f0 Int64 DEFAULT -1) ENGINE=Log"); |
| |
| pipeline |
| .apply(Create.of(row1, row2, row3).withRowSchema(schema)) |
| .apply(write("test_int64_with_default")); |
| |
| pipeline.run().waitUntilFinish(); |
| |
| long sum = executeQueryAsLong("SELECT SUM(f0) FROM test_int64_with_default"); |
| |
| assertEquals(3L, sum); |
| } |
| |
| @Test |
| public void testArrayOfArrayOfInt64() throws Exception { |
| Schema schema = |
| Schema.of(Schema.Field.of("f0", FieldType.array(FieldType.array(FieldType.INT64)))); |
| Row row1 = |
| Row.withSchema(schema) |
| .addValue( |
| Arrays.asList(Arrays.asList(1L, 2L), Arrays.asList(2L, 3L), Arrays.asList(3L, 4L))) |
| .build(); |
| |
| executeSql("CREATE TABLE test_array_of_array_of_int64 (f0 Array(Array(Int64))) ENGINE=Log"); |
| |
| pipeline |
| .apply(Create.of(row1).withRowSchema(schema)) |
| .apply(write("test_array_of_array_of_int64")); |
| |
| pipeline.run().waitUntilFinish(); |
| |
| long sum0 = |
| executeQueryAsLong( |
| "SELECT SUM(arraySum(arrayMap(x -> arraySum(x), f0))) " |
| + "FROM test_array_of_array_of_int64"); |
| |
| assertEquals(15L, sum0); |
| } |
| |
| @Test |
| public void testPrimitiveTypes() throws Exception { |
| Schema schema = |
| Schema.of( |
| Schema.Field.of("f0", FieldType.DATETIME), |
| Schema.Field.of("f1", FieldType.DATETIME), |
| Schema.Field.of("f2", FieldType.FLOAT), |
| Schema.Field.of("f3", FieldType.DOUBLE), |
| Schema.Field.of("f4", FieldType.BYTE), |
| Schema.Field.of("f5", FieldType.INT16), |
| Schema.Field.of("f6", FieldType.INT32), |
| Schema.Field.of("f7", FieldType.INT64), |
| Schema.Field.of("f8", FieldType.STRING), |
| Schema.Field.of("f9", FieldType.INT16), |
| Schema.Field.of("f10", FieldType.INT32), |
| Schema.Field.of("f11", FieldType.INT64), |
| Schema.Field.of("f12", FieldType.INT64), |
| Schema.Field.of("f13", FieldType.STRING), |
| Schema.Field.of("f14", FieldType.STRING), |
| Schema.Field.of("f15", FieldType.STRING), |
| Schema.Field.of("f16", FieldType.BYTES), |
| Schema.Field.of("f17", FieldType.logicalType(FixedBytes.of(3)))); |
| Row row1 = |
| Row.withSchema(schema) |
| .addValue(new DateTime(2030, 10, 1, 0, 0, 0, DateTimeZone.UTC)) |
| .addValue(new DateTime(2030, 10, 9, 8, 7, 6, DateTimeZone.UTC)) |
| .addValue(2.2f) |
| .addValue(3.3) |
| .addValue((byte) 4) |
| .addValue((short) 5) |
| .addValue(6) |
| .addValue(7L) |
| .addValue("eight") |
| .addValue((short) 9) |
| .addValue(10) |
| .addValue(11L) |
| .addValue(12L) |
| .addValue("abc") |
| .addValue("cde") |
| .addValue("qwe") |
| .addValue(new byte[] {'a', 's', 'd'}) |
| .addValue(new byte[] {'z', 'x', 'c'}) |
| .build(); |
| |
| executeSql( |
| "CREATE TABLE test_primitive_types (" |
| + "f0 Date," |
| + "f1 DateTime," |
| + "f2 Float32," |
| + "f3 Float64," |
| + "f4 Int8," |
| + "f5 Int16," |
| + "f6 Int32," |
| + "f7 Int64," |
| + "f8 String," |
| + "f9 UInt8," |
| + "f10 UInt16," |
| + "f11 UInt32," |
| + "f12 UInt64," |
| + "f13 Enum8('abc' = 1, 'cde' = 2)," |
| + "f14 Enum16('abc' = -1, 'cde' = -2)," |
| + "f15 FixedString(3)," |
| + "f16 FixedString(3)," |
| + "f17 FixedString(3)" |
| + ") ENGINE=Log"); |
| |
| pipeline.apply(Create.of(row1).withRowSchema(schema)).apply(write("test_primitive_types")); |
| |
| pipeline.run().waitUntilFinish(); |
| |
| try (ResultSet rs = executeQuery("SELECT * FROM test_primitive_types")) { |
| rs.next(); |
| |
| assertEquals("2030-10-01", rs.getString("f0")); |
| assertEquals("2030-10-09 08:07:06", rs.getString("f1")); |
| assertEquals("2.2", rs.getString("f2")); |
| assertEquals("3.3", rs.getString("f3")); |
| assertEquals("4", rs.getString("f4")); |
| assertEquals("5", rs.getString("f5")); |
| assertEquals("6", rs.getString("f6")); |
| assertEquals("7", rs.getString("f7")); |
| assertEquals("eight", rs.getString("f8")); |
| assertEquals("9", rs.getString("f9")); |
| assertEquals("10", rs.getString("f10")); |
| assertEquals("11", rs.getString("f11")); |
| assertEquals("12", rs.getString("f12")); |
| assertEquals("abc", rs.getString("f13")); |
| assertEquals("cde", rs.getString("f14")); |
| assertArrayEquals(new byte[] {'q', 'w', 'e'}, rs.getBytes("f15")); |
| assertArrayEquals(new byte[] {'a', 's', 'd'}, rs.getBytes("f16")); |
| assertArrayEquals(new byte[] {'z', 'x', 'c'}, rs.getBytes("f17")); |
| } |
| } |
| |
| @Test |
| public void testArrayOfPrimitiveTypes() throws Exception { |
| Schema schema = |
| Schema.of( |
| Schema.Field.of("f0", FieldType.array(FieldType.DATETIME)), |
| Schema.Field.of("f1", FieldType.array(FieldType.DATETIME)), |
| Schema.Field.of("f2", FieldType.array(FieldType.FLOAT)), |
| Schema.Field.of("f3", FieldType.array(FieldType.DOUBLE)), |
| Schema.Field.of("f4", FieldType.array(FieldType.BYTE)), |
| Schema.Field.of("f5", FieldType.array(FieldType.INT16)), |
| Schema.Field.of("f6", FieldType.array(FieldType.INT32)), |
| Schema.Field.of("f7", FieldType.array(FieldType.INT64)), |
| Schema.Field.of("f8", FieldType.array(FieldType.STRING)), |
| Schema.Field.of("f9", FieldType.array(FieldType.INT16)), |
| Schema.Field.of("f10", FieldType.array(FieldType.INT32)), |
| Schema.Field.of("f11", FieldType.array(FieldType.INT64)), |
| Schema.Field.of("f12", FieldType.array(FieldType.INT64)), |
| Schema.Field.of("f13", FieldType.array(FieldType.STRING)), |
| Schema.Field.of("f14", FieldType.array(FieldType.STRING))); |
| Row row1 = |
| Row.withSchema(schema) |
| .addArray( |
| new DateTime(2030, 10, 1, 0, 0, 0, DateTimeZone.UTC), |
| new DateTime(2031, 10, 1, 0, 0, 0, DateTimeZone.UTC)) |
| .addArray( |
| new DateTime(2030, 10, 9, 8, 7, 6, DateTimeZone.UTC), |
| new DateTime(2031, 10, 9, 8, 7, 6, DateTimeZone.UTC)) |
| .addArray(2.2f, 3.3f) |
| .addArray(3.3, 4.4) |
| .addArray((byte) 4, (byte) 5) |
| .addArray((short) 5, (short) 6) |
| .addArray(6, 7) |
| .addArray(7L, 8L) |
| .addArray("eight", "nine") |
| .addArray((short) 9, (short) 10) |
| .addArray(10, 11) |
| .addArray(11L, 12L) |
| .addArray(12L, 13L) |
| .addArray("abc", "cde") |
| .addArray("cde", "abc") |
| .build(); |
| |
| executeSql( |
| "CREATE TABLE test_array_of_primitive_types (" |
| + "f0 Array(Date)," |
| + "f1 Array(DateTime)," |
| + "f2 Array(Float32)," |
| + "f3 Array(Float64)," |
| + "f4 Array(Int8)," |
| + "f5 Array(Int16)," |
| + "f6 Array(Int32)," |
| + "f7 Array(Int64)," |
| + "f8 Array(String)," |
| + "f9 Array(UInt8)," |
| + "f10 Array(UInt16)," |
| + "f11 Array(UInt32)," |
| + "f12 Array(UInt64)," |
| + "f13 Array(Enum8('abc' = 1, 'cde' = 2))," |
| + "f14 Array(Enum16('abc' = -1, 'cde' = -2))" |
| + ") ENGINE=Log"); |
| |
| pipeline |
| .apply(Create.of(row1).withRowSchema(schema)) |
| .apply(write("test_array_of_primitive_types")); |
| |
| pipeline.run().waitUntilFinish(); |
| |
| try (ResultSet rs = executeQuery("SELECT * FROM test_array_of_primitive_types")) { |
| rs.next(); |
| |
| assertEquals("['2030-10-01','2031-10-01']", rs.getString("f0")); |
| assertEquals("['2030-10-09 08:07:06','2031-10-09 08:07:06']", rs.getString("f1")); |
| assertEquals("[2.2,3.3]", rs.getString("f2")); |
| assertEquals("[3.3,4.4]", rs.getString("f3")); |
| assertEquals("[4,5]", rs.getString("f4")); |
| assertEquals("[5,6]", rs.getString("f5")); |
| assertEquals("[6,7]", rs.getString("f6")); |
| assertEquals("[7,8]", rs.getString("f7")); |
| assertEquals("['eight','nine']", rs.getString("f8")); |
| assertEquals("[9,10]", rs.getString("f9")); |
| assertEquals("[10,11]", rs.getString("f10")); |
| assertEquals("[11,12]", rs.getString("f11")); |
| assertEquals("[12,13]", rs.getString("f12")); |
| assertEquals("['abc','cde']", rs.getString("f13")); |
| assertEquals("['cde','abc']", rs.getString("f14")); |
| } |
| } |
| |
| @Test |
| public void testInsertSql() { |
| TableSchema tableSchema = |
| TableSchema.of( |
| TableSchema.Column.of("f0", ColumnType.INT64), |
| TableSchema.Column.of("f1", ColumnType.INT64)); |
| |
| String expected = "INSERT INTO \"test_table\" (\"f0\", \"f1\")"; |
| |
| assertEquals(expected, ClickHouseIO.WriteFn.insertSql(tableSchema, "test_table")); |
| } |
| |
| /** POJO used to test . */ |
| @DefaultSchema(JavaFieldSchema.class) |
| public static final class POJO { |
| public int f0; |
| public long f1; |
| |
| public POJO(int f0, long f1) { |
| this.f0 = f0; |
| this.f1 = f1; |
| } |
| |
| public POJO() {} |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| final POJO pojo = (POJO) o; |
| return f0 == pojo.f0 && f1 == pojo.f1; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(f0, f1); |
| } |
| } |
| |
| @Ignore |
| // FIXME java.lang.ClassNotFoundException: javax.annotation.Nullable |
| public void testPojo() throws Exception { |
| POJO pojo1 = new POJO(1, 2L); |
| POJO pojo2 = new POJO(2, 4L); |
| POJO pojo3 = new POJO(3, 6L); |
| |
| executeSql("CREATE TABLE test_pojo(f0 Int32, f1 Int64) ENGINE=Log"); |
| |
| pipeline.apply(Create.of(pojo1, pojo2, pojo3)).apply(write("test_pojo")); |
| |
| pipeline.run().waitUntilFinish(); |
| |
| long sum0 = executeQueryAsLong("SELECT SUM(f0) FROM test_pojo"); |
| long sum1 = executeQueryAsLong("SELECT SUM(f1) FROM test_pojo"); |
| |
| assertEquals(6L, sum0); |
| assertEquals(12L, sum1); |
| } |
| |
| private <T> ClickHouseIO.Write<T> write(String table) { |
| return ClickHouseIO.<T>write(clickHouse.getJdbcUrl(), table).withMaxRetries(0); |
| } |
| } |