blob: 6ae83c5f9bff1dcfac2783ba0f3cb642909b4ead [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closeables;
import com.metamx.common.ISE;
import com.metamx.common.StreamUtils;
import com.metamx.druid.client.DataSegment;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
/**
*/
public class S3SegmentPusher implements SegmentPusher
{
private static final EmittingLogger log = new EmittingLogger(S3SegmentPusher.class);
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private final RestS3Service s3Client;
private final S3SegmentPusherConfig config;
private final ObjectMapper jsonMapper;
public S3SegmentPusher(
RestS3Service s3Client,
S3SegmentPusherConfig config,
ObjectMapper jsonMapper
)
{
this.s3Client = s3Client;
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
log.info("Uploading [%s] to S3", file);
String outputKey = JOINER.join(
config.getBaseKey().isEmpty() ? null : config.getBaseKey(),
segment.getDataSource(),
String.format(
"%s_%s",
segment.getInterval().getStart(),
segment.getInterval().getEnd()
),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
);
File indexFilesDir = file;
long indexSize = 0;
final File zipOutFile = File.createTempFile("druid", "index.zip");
ZipOutputStream zipOut = null;
try {
zipOut = new ZipOutputStream(new FileOutputStream(zipOutFile));
File[] indexFiles = indexFilesDir.listFiles();
for (File indexFile : indexFiles) {
log.info("Adding indexFile[%s] with size[%,d]. Total size[%,d]", indexFile, indexFile.length(), indexSize);
if (indexFile.length() >= Integer.MAX_VALUE) {
throw new ISE("indexFile[%s] too large [%,d]", indexFile, indexFile.length());
}
zipOut.putNextEntry(new ZipEntry(indexFile.getName()));
IOUtils.copy(new FileInputStream(indexFile), zipOut);
indexSize += indexFile.length();
}
}
finally {
Closeables.closeQuietly(zipOut);
}
try {
S3Object toPush = new S3Object(zipOutFile);
final String outputBucket = config.getBucket();
toPush.setBucketName(outputBucket);
toPush.setKey(outputKey + "/index.zip");
log.info("Pushing %s.", toPush);
s3Client.putObject(outputBucket, toPush);
DataSegment outputSegment = segment.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type", "s3_zip",
"bucket", outputBucket,
"key", toPush.getKey()
)
);
File descriptorFile = File.createTempFile("druid", "descriptor.json");
StreamUtils.copyToFileAndClose(new ByteArrayInputStream(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
S3Object descriptorObject = new S3Object(descriptorFile);
descriptorObject.setBucketName(outputBucket);
descriptorObject.setKey(outputKey + "/descriptor.json");
log.info("Pushing %s", descriptorObject);
s3Client.putObject(outputBucket, descriptorObject);
log.info("Deleting Index File[%s]", indexFilesDir);
FileUtils.deleteDirectory(indexFilesDir);
log.info("Deleting zipped index File[%s]", zipOutFile);
zipOutFile.delete();
log.info("Deleting descriptor file[%s]", descriptorFile);
descriptorFile.delete();
return outputSegment;
}
catch (NoSuchAlgorithmException e) {
throw new IOException(e);
}
catch (S3ServiceException e) {
throw new IOException(e);
}
}
}