blob: 82d5b667d00fa56a512119bfbb2d11fa14d3fba3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.clickhouse;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.io.StringReader;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
/** A descriptor for ClickHouse table schema. */
@Experimental(Experimental.Kind.SOURCE_SINK)
@AutoValue
public abstract class TableSchema implements Serializable {
public abstract List<Column> columns();
public static TableSchema of(Column... columns) {
return new AutoValue_TableSchema(Arrays.asList(columns));
}
/**
* Returns Beam equivalent of ClickHouse schema.
*
* @param tableSchema schema of ClickHouse table
* @return Beam schema
*/
public static Schema getEquivalentSchema(TableSchema tableSchema) {
return tableSchema.columns().stream()
.map(
x -> {
if (x.columnType().nullable()) {
return Schema.Field.nullable(x.name(), getEquivalentFieldType(x.columnType()));
} else {
return Schema.Field.of(x.name(), getEquivalentFieldType(x.columnType()));
}
})
.collect(Schema.toSchema());
}
/**
* Returns Beam equivalent of ClickHouse column type.
*
* @param columnType type of ClickHouse column
* @return Beam field type
*/
public static Schema.FieldType getEquivalentFieldType(ColumnType columnType) {
switch (columnType.typeName()) {
case DATE:
case DATETIME:
return Schema.FieldType.DATETIME;
case STRING:
return Schema.FieldType.STRING;
case FIXEDSTRING:
int size = columnType.fixedStringSize(); // non-null for fixed strings
return Schema.FieldType.logicalType(FixedBytes.of(size));
case FLOAT32:
return Schema.FieldType.FLOAT;
case FLOAT64:
return Schema.FieldType.DOUBLE;
case INT8:
return Schema.FieldType.BYTE;
case INT16:
return Schema.FieldType.INT16;
case INT32:
return Schema.FieldType.INT32;
case INT64:
return Schema.FieldType.INT64;
case UINT8:
return Schema.FieldType.INT16;
case UINT16:
return Schema.FieldType.INT32;
case UINT32:
return Schema.FieldType.INT64;
case UINT64:
return Schema.FieldType.INT64;
case ARRAY:
return Schema.FieldType.array(getEquivalentFieldType(columnType.arrayElementType()));
case ENUM8:
case ENUM16:
return Schema.FieldType.STRING;
}
// not possible, errorprone checks for exhaustive switch
throw new AssertionError("Unexpected type: " + columnType.typeName());
}
/** A column in ClickHouse table. */
@AutoValue
public abstract static class Column implements Serializable {
public abstract String name();
public abstract ColumnType columnType();
@Nullable
public abstract DefaultType defaultType();
@Nullable
public abstract Object defaultValue();
public boolean materializedOrAlias() {
return DefaultType.MATERIALIZED.equals(defaultType())
|| DefaultType.ALIAS.equals(defaultType());
}
public static Column of(String name, ColumnType columnType) {
return of(name, columnType, null, null);
}
public static Column of(
String name,
ColumnType columnType,
@Nullable DefaultType defaultType,
@Nullable Object defaultValue) {
return new AutoValue_TableSchema_Column(name, columnType, defaultType, defaultValue);
}
}
/** An enumeration of possible types in ClickHouse. */
public enum TypeName {
// Primitive types
DATE,
DATETIME,
ENUM8,
ENUM16,
FIXEDSTRING,
FLOAT32,
FLOAT64,
INT8,
INT16,
INT32,
INT64,
STRING,
UINT8,
UINT16,
UINT32,
UINT64,
// Composite types
ARRAY
}
/**
* An enumeration of possible kinds of default values in ClickHouse.
*
* @see <a href="https://clickhouse.yandex/docs/en/single/#default-values">ClickHouse
* documentation</a>
*/
public enum DefaultType {
DEFAULT,
MATERIALIZED,
ALIAS;
public static Optional<DefaultType> parse(String str) {
if ("".equals(str)) {
return Optional.empty();
} else {
return Optional.of(valueOf(str));
}
}
}
/** A descriptor for a column type. */
@AutoValue
public abstract static class ColumnType implements Serializable {
public static final ColumnType DATE = ColumnType.of(TypeName.DATE);
public static final ColumnType DATETIME = ColumnType.of(TypeName.DATETIME);
public static final ColumnType FLOAT32 = ColumnType.of(TypeName.FLOAT32);
public static final ColumnType FLOAT64 = ColumnType.of(TypeName.FLOAT64);
public static final ColumnType INT8 = ColumnType.of(TypeName.INT8);
public static final ColumnType INT16 = ColumnType.of(TypeName.INT16);
public static final ColumnType INT32 = ColumnType.of(TypeName.INT32);
public static final ColumnType INT64 = ColumnType.of(TypeName.INT64);
public static final ColumnType STRING = ColumnType.of(TypeName.STRING);
public static final ColumnType UINT8 = ColumnType.of(TypeName.UINT8);
public static final ColumnType UINT16 = ColumnType.of(TypeName.UINT16);
public static final ColumnType UINT32 = ColumnType.of(TypeName.UINT32);
public static final ColumnType UINT64 = ColumnType.of(TypeName.UINT64);
// ClickHouse doesn't allow nested nullables, so boolean flag is enough
public abstract boolean nullable();
public abstract TypeName typeName();
@Nullable
public abstract Map<String, Integer> enumValues();
@Nullable
public abstract Integer fixedStringSize();
@Nullable
public abstract ColumnType arrayElementType();
public ColumnType withNullable(boolean nullable) {
return toBuilder().nullable(nullable).build();
}
public static ColumnType of(TypeName typeName) {
return ColumnType.builder().typeName(typeName).nullable(false).build();
}
public static ColumnType nullable(TypeName typeName) {
return ColumnType.builder().typeName(typeName).nullable(true).build();
}
public static ColumnType fixedString(int size) {
return ColumnType.builder()
.typeName(TypeName.FIXEDSTRING)
.nullable(false)
.fixedStringSize(size)
.build();
}
public static ColumnType enum8(Map<String, Integer> enumValues) {
return ColumnType.builder()
.typeName(TypeName.ENUM8)
.nullable(false)
.enumValues(enumValues)
.build();
}
public static ColumnType enum16(Map<String, Integer> enumValues) {
return ColumnType.builder()
.typeName(TypeName.ENUM16)
.nullable(false)
.enumValues(enumValues)
.build();
}
public static ColumnType array(ColumnType arrayElementType) {
return ColumnType.builder()
.typeName(TypeName.ARRAY)
// ClickHouse doesn't allow nullable arrays
.nullable(false)
.arrayElementType(arrayElementType)
.build();
}
/**
* Parse string with ClickHouse type to {@link ColumnType}.
*
* @param str string representation of ClickHouse type
* @return type of ClickHouse column
*/
public static ColumnType parse(String str) {
try {
return new org.apache.beam.sdk.io.clickhouse.impl.parser.ColumnTypeParser(
new StringReader(str))
.parse();
} catch (org.apache.beam.sdk.io.clickhouse.impl.parser.ParseException e) {
throw new IllegalArgumentException("failed to parse", e);
}
}
/**
* Get default value of a column based on expression.
*
* <p>E.g., "CREATE TABLE hits(id Int32, count Int32 DEFAULT &lt;str&gt;)"
*
* @param columnType type of ClickHouse expression
* @param str ClickHouse expression
* @return value of ClickHouse expression
*/
public static Object parseDefaultExpression(ColumnType columnType, String str) {
try {
String value =
new org.apache.beam.sdk.io.clickhouse.impl.parser.ColumnTypeParser(
new StringReader(str))
.parseDefaultExpression();
switch (columnType.typeName()) {
case INT8:
return Byte.valueOf(value);
case INT16:
return Short.valueOf(value);
case INT32:
return Integer.valueOf(value);
case INT64:
return Long.valueOf(value);
case ENUM16:
case ENUM8:
case FIXEDSTRING:
case STRING:
return value;
case UINT8:
return Short.valueOf(value);
case UINT16:
return Integer.valueOf(value);
case UINT32:
return Long.valueOf(value);
case UINT64:
return Long.valueOf(value);
default:
throw new UnsupportedOperationException("Unsupported type: " + columnType);
}
} catch (org.apache.beam.sdk.io.clickhouse.impl.parser.ParseException e) {
throw new IllegalArgumentException("failed to parse", e);
}
}
abstract Builder toBuilder();
public static Builder builder() {
return new AutoValue_TableSchema_ColumnType.Builder();
}
@AutoValue.Builder
abstract static class Builder {
public abstract Builder typeName(TypeName typeName);
public abstract Builder arrayElementType(ColumnType arrayElementType);
public abstract Builder nullable(boolean nullable);
public abstract Builder enumValues(Map<String, Integer> enumValues);
public abstract Builder fixedStringSize(Integer size);
public abstract ColumnType build();
}
}
}