blob: f93d1a8e320cec9831de9994071d46816684e7bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.c2.cache.s3;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.iterable.S3Objects;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import java.io.IOException;
import java.util.Comparator;
import java.util.Objects;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.minifi.c2.api.ConfigurationProviderException;
import org.apache.nifi.minifi.c2.api.cache.ConfigurationCacheFileInfo;
import org.apache.nifi.minifi.c2.api.cache.WriteableConfiguration;
import org.apache.nifi.minifi.c2.api.util.Pair;
public class S3CacheFileInfoImpl implements ConfigurationCacheFileInfo {
private final AmazonS3 s3;
private final String bucket;
private final String prefix;
private final String expectedFilename;
/**
* Creates a new S3 cache file info.
* @param s3 The {@link AmazonS3 client}.
* @param bucket The S3 bucket.
* @param prefix The S3 object prefix.
*/
public S3CacheFileInfoImpl(AmazonS3 s3, String bucket, String prefix,
String expectedFilename) {
this.s3 = s3;
this.bucket = bucket;
this.prefix = prefix;
this.expectedFilename = expectedFilename;
}
@Override
public Integer getVersionIfMatch(String objectKey) {
String filename = objectKey.substring(prefix.length());
int expectedFilenameLength = expectedFilename.length();
if (!filename.startsWith(expectedFilename) || filename.length() == expectedFilenameLength) {
return null;
}
try {
return Integer.parseInt(filename.substring(expectedFilenameLength));
} catch (NumberFormatException e) {
return null;
}
}
@Override
public WriteableConfiguration getConfiguration(Integer version)
throws ConfigurationProviderException {
if (version == null) {
try {
return getCachedConfigurations().findFirst()
.orElseThrow(() -> new ConfigurationProviderException("No configurations found."));
} catch (IOException e) {
throw new ConfigurationProviderException("Unable to get cached configurations.", e);
}
} else {
final S3Object s3Object;
if (StringUtils.isEmpty(prefix) || StringUtils.equals(prefix, "/")) {
s3Object = s3.getObject(new GetObjectRequest(bucket,
expectedFilename + version.toString()));
} else {
s3Object = s3.getObject(new GetObjectRequest(bucket,
prefix + expectedFilename + version.toString()));
}
if (s3Object == null) {
throw new ConfigurationProviderException("No configurations found for object key.");
}
return new S3WritableConfiguration(s3, s3Object, Integer.toString(version));
}
}
@Override
public Stream<WriteableConfiguration> getCachedConfigurations() throws IOException {
Iterable<S3ObjectSummary> objectSummaries = S3Objects.withPrefix(s3, bucket, prefix);
Stream<S3ObjectSummary> objectStream = StreamSupport.stream(objectSummaries.spliterator(), false);
return objectStream.map(p -> {
Integer version = getVersionIfMatch(p.getKey());
if (version == null) {
return null;
}
return new Pair<>(version, p);
}).filter(Objects::nonNull)
.sorted(Comparator.comparing(pair -> ((Pair<Integer, S3ObjectSummary>) pair).getFirst())
.reversed()).map(pair -> new S3WritableConfiguration(s3, pair.getSecond(), Integer.toString(pair.getFirst())));
}
}