blob: 70cb55854c573f66546a420061a3b616784ee1a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.tests.util.s3;
import org.apache.flink.api.java.utils.ParameterTool;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.transfer.KeyFilter;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* S3 utilities.
*
* <p>Usage: java -jar S3UtilProgram.jar args.
*
* <p>Note: {@code S3UtilProgram.Action.lineNumber*} actions are applicable only
* to valid non-compressed CSV comma separated files.
*
* <p>Program parameters:
* <ul>
* <li>action (string, required): Action to perform, see {@link S3UtilProgram.Action}.</li>
* <li>bucket (string, required): Bucket where s3 objects reside.</li>
* <li>s3file (string, required for single object actions): s3 object key.</li>
* <li>s3prefix (string, required for actions over objects grouped by key prefix): s3 key prefix.</li>
* <li>s3filePrefix (string, optional for downloadByFullPathAndFileNamePrefix or numberOfLinesInFilesWithFullAndNamePrefix):
* s3 file name prefix w/o directory to filter files by name.</li>
* <li>localFile (string, required for single file actions): local file path.</li>
* <li>localFolder (string, required for actions over folders): local folder path.</li>
* <li>parallelism (int, default 10): parallelism for parallelizable actions
* (e.g. {@code numberOfLinesInFilesWithFullAndNamePrefix}).</li>
* </ul>
*/
class S3UtilProgram {
enum Action {
listByFullPathPrefix,
downloadFile,
downloadByFullPathAndFileNamePrefix,
deleteFile,
deleteByFullPathPrefix,
numberOfLinesInFile,
numberOfLinesInFilesWithFullAndNamePrefix
}
private static final Map<Action, Consumer<ParameterTool>> handlers;
static {
Map<Action, Consumer<ParameterTool>> handlersMutable = new HashMap<>();
handlersMutable.put(Action.listByFullPathPrefix, S3UtilProgram::listByFullPathPrefix);
handlersMutable.put(Action.downloadFile, S3UtilProgram::downloadFile);
handlersMutable.put(Action.downloadByFullPathAndFileNamePrefix,
S3UtilProgram::downloadByFullPathAndFileNamePrefix);
handlersMutable.put(Action.deleteFile, S3UtilProgram::deleteFile);
handlersMutable.put(Action.deleteByFullPathPrefix, S3UtilProgram::deleteByFullPathPrefix);
handlersMutable.put(Action.numberOfLinesInFile, S3UtilProgram::numberOfLinesInFile);
handlersMutable.put(Action.numberOfLinesInFilesWithFullAndNamePrefix,
S3UtilProgram::numberOfLinesInFilesWithFullAndNamePrefix);
handlers = Collections.unmodifiableMap(handlersMutable);
}
private static final String countQuery = "select count(*) from s3object";
public static void main(String[] args) {
final ParameterTool params = ParameterTool.fromArgs(args);
final Action action = Action.valueOf(params.getRequired("action"));
handlers.get(action).accept(params);
}
private static void listByFullPathPrefix(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3prefix = params.getRequired("s3prefix");
listByFullPathPrefix(bucket, s3prefix).forEach(System.out::println);
}
private static List<String> listByFullPathPrefix(final String bucket, final String s3prefix) {
return AmazonS3ClientBuilder.defaultClient().listObjects(bucket, s3prefix).getObjectSummaries()
.stream().map(S3ObjectSummary::getKey).collect(Collectors.toList());
}
private static void downloadFile(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3file = params.getRequired("s3file");
final String localFile = params.getRequired("localFile");
TransferManager tx = TransferManagerBuilder.defaultTransferManager();
try {
tx.download(bucket, s3file, new File(localFile)).waitForCompletion();
} catch (InterruptedException e) {
System.out.println("Transfer interrupted");
} finally {
tx.shutdownNow();
}
}
private static void downloadByFullPathAndFileNamePrefix(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3prefix = params.getRequired("s3prefix");
final String localFolder = params.getRequired("localFolder");
final String s3filePrefix = params.get("s3filePrefix", "");
TransferManager tx = TransferManagerBuilder.defaultTransferManager();
Predicate<String> keyPredicate = getKeyFilterByFileNamePrefix(s3filePrefix);
KeyFilter keyFilter = s3filePrefix.isEmpty() ? KeyFilter.INCLUDE_ALL :
objectSummary -> keyPredicate.test(objectSummary.getKey());
try {
tx.downloadDirectory(bucket, s3prefix, new File(localFolder), keyFilter).waitForCompletion();
} catch (InterruptedException e) {
System.out.println("Transfer interrupted");
} finally {
tx.shutdownNow();
}
}
private static Predicate<String> getKeyFilterByFileNamePrefix(String s3filePrefix) {
if (s3filePrefix.isEmpty()) {
return key -> true;
} else {
return key -> {
String[] parts = key.split("/");
String fileName = parts[parts.length - 1];
return fileName.startsWith(s3filePrefix);
};
}
}
private static void deleteFile(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3file = params.getRequired("s3file");
AmazonS3ClientBuilder.defaultClient().deleteObject(bucket, s3file);
}
private static void deleteByFullPathPrefix(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3prefix = params.getRequired("s3prefix");
String[] keys = listByFullPathPrefix(bucket, s3prefix).toArray(new String[] {});
if (keys.length > 0) {
DeleteObjectsRequest request = new DeleteObjectsRequest(bucket).withKeys(keys);
AmazonS3ClientBuilder.defaultClient().deleteObjects(request);
}
}
private static void numberOfLinesInFile(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3file = params.getRequired("s3file");
AmazonS3 s3client = AmazonS3ClientBuilder.defaultClient();
System.out.print(S3QueryUtil.queryFile(s3client, bucket, s3file, countQuery));
s3client.shutdown();
}
private static void numberOfLinesInFilesWithFullAndNamePrefix(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3prefix = params.getRequired("s3prefix");
final String s3filePrefix = params.get("s3filePrefix", "");
int parallelism = params.getInt("parallelism", 10);
List<String> files = listByFullPathPrefix(bucket, s3prefix);
ExecutorService executor = Executors.newFixedThreadPool(parallelism);
AmazonS3 s3client = AmazonS3ClientBuilder.defaultClient();
List<CompletableFuture<Integer>> requests =
submitLineCountingRequestsForFilesAsync(executor, s3client, bucket, files, s3filePrefix);
int count = waitAndComputeTotalLineCountResult(requests);
executor.shutdownNow();
s3client.shutdown();
System.out.print(count);
}
private static List<CompletableFuture<Integer>> submitLineCountingRequestsForFilesAsync(
ExecutorService executor, AmazonS3 s3client, String bucket, List<String> files, String s3filePrefix) {
List<CompletableFuture<Integer>> requests = new ArrayList<>();
Predicate<String> keyPredicate = getKeyFilterByFileNamePrefix(s3filePrefix);
files.forEach(file -> {
if (keyPredicate.test(file)) {
CompletableFuture<Integer> result = new CompletableFuture<>();
executor.execute(() ->
result.complete(Integer.parseInt(S3QueryUtil.queryFile(s3client, bucket, file, countQuery))));
requests.add(result);
}
});
return requests;
}
private static int waitAndComputeTotalLineCountResult(List<CompletableFuture<Integer>> requests) {
int count = 0;
for (CompletableFuture<Integer> result : requests) {
try {
count += result.get();
} catch (Throwable e) {
System.out.println("Failed count lines");
e.printStackTrace();
}
}
return count;
}
}