blob: bb683a3b0f9cce5c4f5462e13776c148d10224d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbon.flink;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.serialization.BulkWriter;
public abstract class ProxyFileWriterFactory<OUT> implements BulkWriter.Factory<OUT> {
private static final long serialVersionUID = -1449889091046572219L;
private static final Map<String, Class<? extends ProxyFileWriterFactory>> FACTORY_MAP =
new ConcurrentHashMap<>();
public static ProxyFileWriterFactory newInstance(final String factoryType) {
if (factoryType == null) {
throw new IllegalArgumentException("Argument [factoryType] is null.");
}
final Class<? extends ProxyFileWriterFactory> factoryClass = FACTORY_MAP.get(factoryType);
if (factoryClass == null) {
return null;
}
try {
return factoryClass.newInstance();
} catch (InstantiationException | IllegalAccessException exception) {
throw new RuntimeException(exception);
}
}
public static void register(
final String factoryType,
final Class<? extends ProxyFileWriterFactory> factoryClass
) {
if (factoryType == null) {
throw new IllegalArgumentException("Argument [factoryType] is null.");
}
if (factoryClass == null) {
throw new IllegalArgumentException("Argument [factoryClass] is null.");
}
// TODO 检查参数
// TODO 检查是否已被注册,重复注册,直接忽略
FACTORY_MAP.put(factoryType, factoryClass);
}
private Configuration configuration;
public abstract String getType();
public Configuration getConfiguration() {
return this.configuration;
}
public void setConfiguration(final Configuration configuration) {
this.configuration = configuration;
}
public abstract ProxyFileWriter<OUT> create(String partition) throws IOException;
public static class Configuration implements Serializable {
private static final long serialVersionUID = 8615149992583690295L;
public Configuration(
final String databaseName,
final String tableName,
final String tablePath,
final Properties tableProperties,
final Properties writerProperties,
final Properties carbonProperties
) {
if (tableName == null) {
throw new IllegalArgumentException("Argument [tableName] is null.");
}
if (tablePath == null) {
throw new IllegalArgumentException("Argument [tablePath] is null.");
}
if (tableProperties == null) {
throw new IllegalArgumentException("Argument [tableProperties] is null.");
}
if (writerProperties == null) {
throw new IllegalArgumentException("Argument [writerProperties] is null.");
}
if (carbonProperties == null) {
throw new IllegalArgumentException("Argument [carbonProperties] is null.");
}
this.databaseName = databaseName;
this.tableName = tableName;
this.tablePath = tablePath;
this.tableProperties = tableProperties;
this.writerProperties = writerProperties;
this.carbonProperties = carbonProperties;
}
private final String databaseName;
private final String tableName;
private final String tablePath;
private final Properties tableProperties;
private final Properties writerProperties;
private final Properties carbonProperties;
public String getDatabaseName() {
return this.databaseName;
}
public String getTableName() {
return this.tableName;
}
public String getTablePath() {
return this.tablePath;
}
public Properties getTableProperties() {
return this.tableProperties;
}
public Properties getWriterProperties() {
return this.writerProperties;
}
public Properties getCarbonProperties() {
return this.carbonProperties;
}
}
}