blob: 318c122dc206574b521bf5beb14f3d869b86aeeb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.factories;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.types.InternalType;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptorValidator;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Base class for {@link TableFormatFactory}s.
*
* @param <T> record type that the format produces or consumes.
*/
@PublicEvolving
public abstract class TableFormatFactoryBase<T> implements TableFormatFactory<T> {
// Constants for schema derivation
// TODO drop constants once SchemaValidator has been ported to flink-table-common
private static final String SCHEMA = "schema";
private static final String SCHEMA_NAME = "name";
private static final String SCHEMA_TYPE = "type";
private static final String SCHEMA_PROCTIME = "proctime";
private static final String SCHEMA_FROM = "from";
private static final String ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type";
private static final String ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field";
private static final String ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from";
private static final String ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class";
private static final String ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized";
private static final String ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type";
private static final String ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class";
private static final String ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized";
private static final String ROWTIME_WATERMARKS_DELAY = "rowtime.watermarks.delay";
private String type;
private String version;
private boolean supportsSchemaDerivation;
public TableFormatFactoryBase(String type, int version, boolean supportsSchemaDerivation) {
this.type = type;
this.version = Integer.toString(version);
this.supportsSchemaDerivation = supportsSchemaDerivation;
}
@Override
public final Map<String, String> requiredContext() {
final Map<String, String> context = new HashMap<>();
context.put(FormatDescriptorValidator.FORMAT_TYPE, type);
context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, version);
context.putAll(requiredFormatContext());
return context;
}
@Override
public final boolean supportsSchemaDerivation() {
return supportsSchemaDerivation;
}
@Override
public final List<String> supportedProperties() {
final List<String> properties = new ArrayList<>();
if (supportsSchemaDerivation) {
properties.add(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA);
// schema
properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
properties.add(SCHEMA + ".#." + SCHEMA_FROM);
// time attributes
properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS);
properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED);
properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS);
properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED);
properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY);
}
properties.addAll(supportedFormatProperties());
return properties;
}
/**
* Format specific context.
*
* <p>This method can be used if format type and a property version is not enough.
*/
protected Map<String, String> requiredFormatContext() {
return Collections.emptyMap();
}
/**
* Format specific supported properties.
*
* <p>This method can be used if schema derivation is not enough.
*/
protected List<String> supportedFormatProperties() {
return Collections.emptyList();
}
// --------------------------------------------------------------------------------------------
/**
* Finds the table schema that can be used for a format schema (without time attributes).
*/
public static TableSchema deriveSchema(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = new DescriptorProperties();
descriptorProperties.putProperties(properties);
final TableSchema.Builder builder = TableSchema.builder();
final TableSchema baseSchema = descriptorProperties.getTableSchema(SCHEMA);
for (int i = 0; i < baseSchema.getFieldCount(); i++) {
final String fieldName = baseSchema.getFieldNames()[i];
final InternalType fieldType = baseSchema.getFieldTypes()[i];
final boolean isProctime = descriptorProperties
.getOptionalBoolean(SCHEMA + '.' + i + '.' + SCHEMA_PROCTIME)
.orElse(false);
final String timestampKey = SCHEMA + '.' + i + '.' + ROWTIME_TIMESTAMPS_TYPE;
final boolean isRowtime = descriptorProperties.containsKey(timestampKey);
if (!isProctime && !isRowtime) {
// check for aliasing
final String aliasName = descriptorProperties
.getOptionalString(SCHEMA + '.' + i + '.' + SCHEMA_FROM)
.orElse(fieldName);
builder.field(aliasName, fieldType);
}
// only use the rowtime attribute if it references a field
else if (isRowtime &&
descriptorProperties.isValue(timestampKey, ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD)) {
final String aliasName = descriptorProperties
.getString(SCHEMA + '.' + i + '.' + ROWTIME_TIMESTAMPS_FROM);
builder.field(aliasName, fieldType);
}
}
return builder.build();
}
}