| /* |
| * Druid - a distributed column store. |
| * Copyright (C) 2012 Metamarkets Group Inc. |
| * |
| * This program is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU General Public License |
| * as published by the Free Software Foundation; either version 2 |
| * of the License, or (at your option) any later version. |
| * |
| * This program is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| * GNU General Public License for more details. |
| * |
| * You should have received a copy of the GNU General Public License |
| * along with this program; if not, write to the Free Software |
| * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
| */ |
| |
| package com.metamx.druid.loading; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.base.Joiner; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.io.ByteStreams; |
| import com.google.common.io.Files; |
| import com.metamx.druid.client.DataSegment; |
| import com.metamx.druid.index.v1.IndexIO; |
| import com.metamx.druid.utils.CompressionUtils; |
| import com.metamx.emitter.EmittingLogger; |
| import org.jets3t.service.S3ServiceException; |
| import org.jets3t.service.acl.gs.GSAccessControlList; |
| import org.jets3t.service.impl.rest.httpclient.RestS3Service; |
| import org.jets3t.service.model.S3Object; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.security.NoSuchAlgorithmException; |
| |
| public class S3DataSegmentPusher implements DataSegmentPusher |
| { |
| private static final EmittingLogger log = new EmittingLogger(S3DataSegmentPusher.class); |
| private static final Joiner JOINER = Joiner.on("/").skipNulls(); |
| |
| private final RestS3Service s3Client; |
| private final S3DataSegmentPusherConfig config; |
| private final ObjectMapper jsonMapper; |
| |
| public S3DataSegmentPusher( |
| RestS3Service s3Client, |
| S3DataSegmentPusherConfig config, |
| ObjectMapper jsonMapper |
| ) |
| { |
| this.s3Client = s3Client; |
| this.config = config; |
| this.jsonMapper = jsonMapper; |
| } |
| |
| public S3DataSegmentPusherConfig getConfig() |
| { |
| return config; |
| } |
| |
| @Override |
| public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException |
| { |
| log.info("Uploading [%s] to S3", indexFilesDir); |
| String outputKey = JOINER.join( |
| config.getBaseKey().isEmpty() ? null : config.getBaseKey(), |
| DataSegmentPusherUtil.getStorageDir(segment) |
| ); |
| |
| final File zipOutFile = File.createTempFile("druid", "index.zip"); |
| long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile); |
| |
| try { |
| S3Object toPush = new S3Object(zipOutFile); |
| |
| final String outputBucket = config.getBucket(); |
| toPush.setBucketName(outputBucket); |
| toPush.setKey(outputKey + "/index.zip"); |
| toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); |
| |
| log.info("Pushing %s.", toPush); |
| s3Client.putObject(outputBucket, toPush); |
| |
| segment = segment.withSize(indexSize) |
| .withLoadSpec( |
| ImmutableMap.<String, Object>of("type", "s3_zip", "bucket", outputBucket, "key", toPush.getKey()) |
| ) |
| .withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir)); |
| |
| File descriptorFile = File.createTempFile("druid", "descriptor.json"); |
| Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile); |
| S3Object descriptorObject = new S3Object(descriptorFile); |
| descriptorObject.setBucketName(outputBucket); |
| descriptorObject.setKey(outputKey + "/descriptor.json"); |
| descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); |
| |
| log.info("Pushing %s", descriptorObject); |
| s3Client.putObject(outputBucket, descriptorObject); |
| |
| log.info("Deleting zipped index File[%s]", zipOutFile); |
| zipOutFile.delete(); |
| |
| log.info("Deleting descriptor file[%s]", descriptorFile); |
| descriptorFile.delete(); |
| |
| return segment; |
| } |
| catch (NoSuchAlgorithmException e) { |
| throw new IOException(e); |
| } |
| catch (S3ServiceException e) { |
| throw new IOException(e); |
| } |
| } |
| } |