blob: 9dacbe8b546f4e426a0d1b2af8ea37f1d4823335 [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.utils.CompressionUtils;
import com.metamx.emitter.EmittingLogger;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.acl.gs.GSAccessControlList;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import java.io.File;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
public class S3DataSegmentPusher implements DataSegmentPusher
{
private static final EmittingLogger log = new EmittingLogger(S3DataSegmentPusher.class);
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private final RestS3Service s3Client;
private final S3DataSegmentPusherConfig config;
private final ObjectMapper jsonMapper;
public S3DataSegmentPusher(
RestS3Service s3Client,
S3DataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
{
this.s3Client = s3Client;
this.config = config;
this.jsonMapper = jsonMapper;
}
public S3DataSegmentPusherConfig getConfig()
{
return config;
}
@Override
public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
{
log.info("Uploading [%s] to S3", indexFilesDir);
String outputKey = JOINER.join(
config.getBaseKey().isEmpty() ? null : config.getBaseKey(),
DataSegmentPusherUtil.getStorageDir(segment)
);
final File zipOutFile = File.createTempFile("druid", "index.zip");
long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
try {
S3Object toPush = new S3Object(zipOutFile);
final String outputBucket = config.getBucket();
toPush.setBucketName(outputBucket);
toPush.setKey(outputKey + "/index.zip");
toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
log.info("Pushing %s.", toPush);
s3Client.putObject(outputBucket, toPush);
segment = segment.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object>of("type", "s3_zip", "bucket", outputBucket, "key", toPush.getKey())
)
.withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
File descriptorFile = File.createTempFile("druid", "descriptor.json");
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
S3Object descriptorObject = new S3Object(descriptorFile);
descriptorObject.setBucketName(outputBucket);
descriptorObject.setKey(outputKey + "/descriptor.json");
descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
log.info("Pushing %s", descriptorObject);
s3Client.putObject(outputBucket, descriptorObject);
log.info("Deleting zipped index File[%s]", zipOutFile);
zipOutFile.delete();
log.info("Deleting descriptor file[%s]", descriptorFile);
descriptorFile.delete();
return segment;
}
catch (NoSuchAlgorithmException e) {
throw new IOException(e);
}
catch (S3ServiceException e) {
throw new IOException(e);
}
}
}