blob: ff575be6f55c81a1c178925102151646db15e66e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.fs.s3.common;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
/**
* Base class for file system factories that create S3 file systems.
*/
public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
public static final ConfigOption<Long> PART_UPLOAD_MIN_SIZE = ConfigOptions
.key("s3.upload.min.part.size")
.defaultValue(FlinkS3FileSystem.S3_MULTIPART_MIN_PART_SIZE)
.withDescription(
"This option is relevant to the Recoverable Writer and sets the min size of data that " +
"buffered locally, before being sent to S3. Flink also takes care of checkpointing locally " +
"buffered data. This value cannot be less than 5MB or greater than 5GB (limits set by Amazon)."
);
public static final ConfigOption<Integer> MAX_CONCURRENT_UPLOADS = ConfigOptions
.key("s3.upload.max.concurrent.uploads")
.defaultValue(Runtime.getRuntime().availableProcessors())
.withDescription(
"This option is relevant to the Recoverable Writer and limits the number of " +
"parts that can be concurrently in-flight. By default, this is set to " +
Runtime.getRuntime().availableProcessors() + "."
);
/**
* The substring to be replaced by random entropy in checkpoint paths.
*/
public static final ConfigOption<String> ENTROPY_INJECT_KEY_OPTION = ConfigOptions
.key("s3.entropy.key")
.noDefaultValue()
.withDescription(
"This option can be used to improve performance due to sharding issues on Amazon S3. " +
"For file creations with entropy injection, this key will be replaced by random " +
"alphanumeric characters. For other file creations, the key will be filtered out.");
/**
* The number of entropy characters, in case entropy injection is configured.
*/
public static final ConfigOption<Integer> ENTROPY_INJECT_LENGTH_OPTION = ConfigOptions
.key("s3.entropy.length")
.defaultValue(4)
.withDescription(
"When '" + ENTROPY_INJECT_KEY_OPTION.key() + "' is set, this option defines the number of " +
"random characters to replace the entropy key with.");
// ------------------------------------------------------------------------
private static final String INVALID_ENTROPY_KEY_CHARS = "^.*[~#@*+%{}<>\\[\\]|\"\\\\].*$";
private static final Logger LOG = LoggerFactory.getLogger(AbstractS3FileSystemFactory.class);
/** Name of this factory for logging. */
private final String name;
private final HadoopConfigLoader hadoopConfigLoader;
private Configuration flinkConfig;
protected AbstractS3FileSystemFactory(String name, HadoopConfigLoader hadoopConfigLoader) {
this.name = name;
this.hadoopConfigLoader = hadoopConfigLoader;
}
// ------------------------------------------------------------------------
@Override
public void configure(Configuration config) {
flinkConfig = config;
hadoopConfigLoader.setFlinkConfig(config);
}
@Override
public FileSystem create(URI fsUri) throws IOException {
Configuration flinkConfig = this.flinkConfig;
if (flinkConfig == null) {
LOG.warn("Creating S3 FileSystem without configuring the factory. All behavior will be default.");
flinkConfig = new Configuration();
}
LOG.debug("Creating S3 file system backed by {}", name);
LOG.debug("Loading Hadoop configuration for {}", name);
try {
// create the Hadoop FileSystem
org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig();
org.apache.hadoop.fs.FileSystem fs = createHadoopFileSystem();
fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
// load the entropy injection settings
String entropyInjectionKey = flinkConfig.getString(ENTROPY_INJECT_KEY_OPTION);
int numEntropyChars = -1;
if (entropyInjectionKey != null) {
if (entropyInjectionKey.matches(INVALID_ENTROPY_KEY_CHARS)) {
throw new IllegalConfigurationException("Invalid character in value for " +
ENTROPY_INJECT_KEY_OPTION.key() + " : " + entropyInjectionKey);
}
numEntropyChars = flinkConfig.getInteger(ENTROPY_INJECT_LENGTH_OPTION);
if (numEntropyChars <= 0) {
throw new IllegalConfigurationException(
ENTROPY_INJECT_LENGTH_OPTION.key() + " must configure a value > 0");
}
}
final String[] localTmpDirectories = ConfigurationUtils.parseTempDirectories(flinkConfig);
Preconditions.checkArgument(localTmpDirectories.length > 0);
final String localTmpDirectory = localTmpDirectories[0];
final long s3minPartSize = flinkConfig.getLong(PART_UPLOAD_MIN_SIZE);
final int maxConcurrentUploads = flinkConfig.getInteger(MAX_CONCURRENT_UPLOADS);
final S3AccessHelper s3AccessHelper = getS3AccessHelper(fs);
return new FlinkS3FileSystem(
fs,
localTmpDirectory,
entropyInjectionKey,
numEntropyChars,
s3AccessHelper,
s3minPartSize,
maxConcurrentUploads);
}
catch (IOException e) {
throw e;
}
catch (Exception e) {
throw new IOException(e.getMessage(), e);
}
}
protected abstract org.apache.hadoop.fs.FileSystem createHadoopFileSystem();
protected abstract URI getInitURI(
URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig);
@Nullable
protected abstract S3AccessHelper getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs);
}