blob: 9c1ca113de97197ef6e2855705bfc623456ff15c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.rocketmq.sink.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.apache.flink.connector.rocketmq.common.config.RocketMQOptions.ENDPOINTS;
import static org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions.OPTIONAL_ACCESS_KEY;
import static org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions.OPTIONAL_ENCODING;
import static org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions.OPTIONAL_FIELD_DELIMITER;
import static org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions.OPTIONAL_SECRET_KEY;
import static org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions.OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN;
import static org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions.OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED;
import static org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions.OPTIONAL_WRITE_IS_DYNAMIC_TAG;
import static org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions.OPTIONAL_WRITE_KEYS_TO_BODY;
import static org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions.OPTIONAL_WRITE_KEY_COLUMNS;
import static org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions.OPTIONAL_WRITE_RETRY_TIMES;
import static org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions.OPTIONAL_WRITE_SLEEP_TIME_MS;
import static org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions.PRODUCER_GROUP;
import static org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions.TAG;
import static org.apache.flink.connector.rocketmq.sink.RocketMQSinkOptions.TOPIC;
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
/**
* Defines the {@link DynamicTableSinkFactory} implementation to create {@link
* RocketMQDynamicTableSink}.
*/
public class RocketMQDynamicTableSinkFactory implements DynamicTableSinkFactory {
@Override
public String factoryIdentifier() {
return "rocketmq";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> requiredOptions = new HashSet<>();
requiredOptions.add(TOPIC);
requiredOptions.add(PRODUCER_GROUP);
requiredOptions.add(ENDPOINTS);
// requiredOptions.add(PERSIST_OFFSET_INTERVAL);
return requiredOptions;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> optionalOptions = new HashSet<>();
optionalOptions.add(TAG);
optionalOptions.add(OPTIONAL_ENCODING);
optionalOptions.add(OPTIONAL_FIELD_DELIMITER);
optionalOptions.add(OPTIONAL_ACCESS_KEY);
optionalOptions.add(OPTIONAL_SECRET_KEY);
optionalOptions.add(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN);
optionalOptions.add(OPTIONAL_WRITE_RETRY_TIMES);
optionalOptions.add(OPTIONAL_WRITE_SLEEP_TIME_MS);
optionalOptions.add(OPTIONAL_WRITE_IS_DYNAMIC_TAG);
optionalOptions.add(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED);
optionalOptions.add(OPTIONAL_WRITE_KEYS_TO_BODY);
optionalOptions.add(OPTIONAL_WRITE_KEY_COLUMNS);
return optionalOptions;
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
helper.validate();
Map<String, String> rawProperties = context.getCatalogTable().getOptions();
Configuration properties = Configuration.fromMap(rawProperties);
String topicName = properties.getString(TOPIC);
String producerGroup = properties.getString(PRODUCER_GROUP);
String nameServerAddress = properties.getString(ENDPOINTS);
String tag = properties.getString(TAG);
String accessKey = properties.getString(OPTIONAL_ACCESS_KEY);
String secretKey = properties.getString(OPTIONAL_SECRET_KEY);
String dynamicColumn = properties.getString(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN);
String encoding = properties.getString(OPTIONAL_ENCODING);
String fieldDelimiter = properties.getString(OPTIONAL_FIELD_DELIMITER);
int retryTimes = properties.getInteger(OPTIONAL_WRITE_RETRY_TIMES);
long sleepTimeMs = properties.getLong(OPTIONAL_WRITE_SLEEP_TIME_MS);
boolean isDynamicTag = properties.getBoolean(OPTIONAL_WRITE_IS_DYNAMIC_TAG);
boolean isDynamicTagIncluded =
properties.getBoolean(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED);
boolean writeKeysToBody = properties.getBoolean(OPTIONAL_WRITE_KEYS_TO_BODY);
String keyColumnsConfig = properties.getString(OPTIONAL_WRITE_KEY_COLUMNS);
String[] keyColumns = new String[0];
if (keyColumnsConfig != null && keyColumnsConfig.length() > 0) {
keyColumns = keyColumnsConfig.split(",");
}
DescriptorProperties descriptorProperties = new DescriptorProperties();
descriptorProperties.putProperties(rawProperties);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new RocketMQDynamicTableSink(
descriptorProperties,
physicalSchema,
topicName,
producerGroup,
nameServerAddress,
accessKey,
secretKey,
tag,
dynamicColumn,
fieldDelimiter,
encoding,
retryTimes,
sleepTimeMs,
isDynamicTag,
isDynamicTagIncluded,
writeKeysToBody,
keyColumns);
}
}