blob: b57ff3dc3a6ff5d5788879e87202d27f2d69dac7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import com.google.common.base.Charsets;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
/**
* A MultipartUploader that uses the basic FileSystem commands.
* This is done in three stages:
* Init - create a temp _multipart directory.
* PutPart - copying the individual parts of the file to the temp directory.
* Complete - use {@link FileSystem#concat} to merge the files; and then delete
* the temp directory.
*/
public class FileSystemMultipartUploader extends MultipartUploader {
private final FileSystem fs;
public FileSystemMultipartUploader(FileSystem fs) {
this.fs = fs;
}
@Override
public UploadHandle initialize(Path filePath) throws IOException {
Path collectorPath = createCollectorPath(filePath);
fs.mkdirs(collectorPath, FsPermission.getDirDefault());
ByteBuffer byteBuffer = ByteBuffer.wrap(
collectorPath.toString().getBytes(Charsets.UTF_8));
return BBUploadHandle.from(byteBuffer);
}
@Override
public PartHandle putPart(Path filePath, InputStream inputStream,
int partNumber, UploadHandle uploadId, long lengthInBytes)
throws IOException {
byte[] uploadIdByteArray = uploadId.toByteArray();
Path collectorPath = new Path(new String(uploadIdByteArray, 0,
uploadIdByteArray.length, Charsets.UTF_8));
Path partPath =
Path.mergePaths(collectorPath, Path.mergePaths(new Path(Path.SEPARATOR),
new Path(Integer.toString(partNumber) + ".part")));
FSDataOutputStreamBuilder outputStream = fs.createFile(partPath);
FSDataOutputStream fsDataOutputStream = outputStream.build();
IOUtils.copy(inputStream, fsDataOutputStream, 4096);
fsDataOutputStream.close();
return BBPartHandle.from(ByteBuffer.wrap(
partPath.toString().getBytes(Charsets.UTF_8)));
}
private Path createCollectorPath(Path filePath) {
return Path.mergePaths(filePath.getParent(),
Path.mergePaths(new Path(filePath.getName().split("\\.")[0]),
Path.mergePaths(new Path("_multipart"),
new Path(Path.SEPARATOR))));
}
@Override
@SuppressWarnings("deprecation") // rename w/ OVERWRITE
public PathHandle complete(Path filePath,
List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
throws IOException {
handles.sort(Comparator.comparing(Pair::getKey));
List<Path> partHandles = handles
.stream()
.map(pair -> {
byte[] byteArray = pair.getValue().toByteArray();
return new Path(new String(byteArray, 0, byteArray.length,
Charsets.UTF_8));
})
.collect(Collectors.toList());
Path collectorPath = createCollectorPath(filePath);
Path filePathInsideCollector = Path.mergePaths(collectorPath,
new Path(Path.SEPARATOR + filePath.getName()));
fs.create(filePathInsideCollector).close();
fs.concat(filePathInsideCollector,
partHandles.toArray(new Path[handles.size()]));
fs.rename(filePathInsideCollector, filePath, Options.Rename.OVERWRITE);
fs.delete(collectorPath, true);
FileStatus status = fs.getFileStatus(filePath);
return fs.getPathHandle(status);
}
@Override
public void abort(Path filePath, UploadHandle uploadId) throws IOException {
byte[] uploadIdByteArray = uploadId.toByteArray();
Path collectorPath = new Path(new String(uploadIdByteArray, 0,
uploadIdByteArray.length, Charsets.UTF_8));
fs.delete(collectorPath, true);
}
/**
* Factory for creating MultipartUploaderFactory objects for file://
* filesystems.
*/
public static class Factory extends MultipartUploaderFactory {
protected MultipartUploader createMultipartUploader(FileSystem fs,
Configuration conf) {
if (fs.getScheme().equals("file")) {
return new FileSystemMultipartUploader(fs);
}
return null;
}
}
}