blob: 13689a705cea0183b4a617e5b159f5886ea20a5a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Collectors;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
import com.google.common.base.Preconditions;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorInputStream;
import org.apache.commons.compress.compressors.CompressorOutputStream;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.io.IOUtils;
/**
* Compress/uncompress KeyValueContainer data to a tar.gz archive.
*/
public class TarContainerPacker
implements ContainerPacker<KeyValueContainerData> {
private static final String CHUNKS_DIR_NAME = OzoneConsts.STORAGE_DIR_CHUNKS;
private static final String DB_DIR_NAME = "db";
private static final String CONTAINER_FILE_NAME = "container.yaml";
/**
* Given an input stream (tar file) extract the data to the specified
* directories.
*
* @param container container which defines the destination structure.
* @param inputStream the input stream.
* @throws IOException
*/
@Override
public byte[] unpackContainerData(Container<KeyValueContainerData> container,
InputStream inputStream)
throws IOException {
byte[] descriptorFileContent = null;
try {
KeyValueContainerData containerData = container.getContainerData();
CompressorInputStream compressorInputStream =
new CompressorStreamFactory()
.createCompressorInputStream(CompressorStreamFactory.GZIP,
inputStream);
TarArchiveInputStream tarInput =
new TarArchiveInputStream(compressorInputStream);
TarArchiveEntry entry = tarInput.getNextTarEntry();
while (entry != null) {
String name = entry.getName();
if (name.startsWith(DB_DIR_NAME + "/")) {
Path destinationPath = containerData.getDbFile().toPath()
.resolve(name.substring(DB_DIR_NAME.length() + 1));
extractEntry(tarInput, entry.getSize(), destinationPath);
} else if (name.startsWith(CHUNKS_DIR_NAME + "/")) {
Path destinationPath = Paths.get(containerData.getChunksPath())
.resolve(name.substring(CHUNKS_DIR_NAME.length() + 1));
extractEntry(tarInput, entry.getSize(), destinationPath);
} else if (name.equals(CONTAINER_FILE_NAME)) {
//Don't do anything. Container file should be unpacked in a
//separated step by unpackContainerDescriptor call.
descriptorFileContent = readEntry(tarInput, entry);
} else {
throw new IllegalArgumentException(
"Unknown entry in the tar file: " + "" + name);
}
entry = tarInput.getNextTarEntry();
}
return descriptorFileContent;
} catch (CompressorException e) {
throw new IOException(
"Can't uncompress the given container: " + container
.getContainerData().getContainerID(),
e);
}
}
private void extractEntry(TarArchiveInputStream tarInput, long size,
Path path) throws IOException {
Preconditions.checkNotNull(path, "Path element should not be null");
Path parent = Preconditions.checkNotNull(path.getParent(),
"Path element should have a parent directory");
Files.createDirectories(parent);
try (BufferedOutputStream bos = new BufferedOutputStream(
new FileOutputStream(path.toAbsolutePath().toString()))) {
int bufferSize = 1024;
byte[] buffer = new byte[bufferSize + 1];
long remaining = size;
while (remaining > 0) {
int read =
tarInput.read(buffer, 0, (int) Math.min(remaining, bufferSize));
if (read >= 0) {
remaining -= read;
bos.write(buffer, 0, read);
} else {
remaining = 0;
}
}
}
}
/**
* Given a containerData include all the required container data/metadata
* in a tar file.
*
* @param container Container to archive (data + metadata).
* @param destination Destination tar file/stream.
* @throws IOException
*/
@Override
public void pack(Container<KeyValueContainerData> container,
OutputStream destination)
throws IOException {
KeyValueContainerData containerData = container.getContainerData();
try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
.createCompressorOutputStream(CompressorStreamFactory.GZIP,
destination)) {
try (ArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(
gzippedOut)) {
includePath(containerData.getDbFile().toString(), DB_DIR_NAME,
archiveOutputStream);
includePath(containerData.getChunksPath(), CHUNKS_DIR_NAME,
archiveOutputStream);
includeFile(container.getContainerFile(),
CONTAINER_FILE_NAME,
archiveOutputStream);
}
} catch (CompressorException e) {
throw new IOException(
"Can't compress the container: " + containerData.getContainerID(),
e);
}
}
@Override
public byte[] unpackContainerDescriptor(InputStream inputStream)
throws IOException {
try {
CompressorInputStream compressorInputStream =
new CompressorStreamFactory()
.createCompressorInputStream(CompressorStreamFactory.GZIP,
inputStream);
TarArchiveInputStream tarInput =
new TarArchiveInputStream(compressorInputStream);
TarArchiveEntry entry = tarInput.getNextTarEntry();
while (entry != null) {
String name = entry.getName();
if (name.equals(CONTAINER_FILE_NAME)) {
return readEntry(tarInput, entry);
}
entry = tarInput.getNextTarEntry();
}
} catch (CompressorException e) {
throw new IOException(
"Can't read the container descriptor from the container archive",
e);
}
throw new IOException(
"Container descriptor is missing from the container archive.");
}
private byte[] readEntry(TarArchiveInputStream tarInput,
TarArchiveEntry entry) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
int bufferSize = 1024;
byte[] buffer = new byte[bufferSize + 1];
long remaining = entry.getSize();
while (remaining > 0) {
int read =
tarInput.read(buffer, 0, (int) Math.min(remaining, bufferSize));
remaining -= read;
bos.write(buffer, 0, read);
}
return bos.toByteArray();
}
private void includePath(String containerPath, String subdir,
ArchiveOutputStream archiveOutputStream) throws IOException {
for (Path path : Files.list(Paths.get(containerPath))
.collect(Collectors.toList())) {
includeFile(path.toFile(), subdir + "/" + path.getFileName(),
archiveOutputStream);
}
}
private void includeFile(File file, String entryName,
ArchiveOutputStream archiveOutputStream) throws IOException {
ArchiveEntry archiveEntry =
archiveOutputStream.createArchiveEntry(file, entryName);
archiveOutputStream.putArchiveEntry(archiveEntry);
try (FileInputStream fis = new FileInputStream(file)) {
IOUtils.copy(fis, archiveOutputStream);
}
archiveOutputStream.closeArchiveEntry();
}
}