blob: 29b5e47e899ae70a1a5ec7deb24842539f0b761b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.services.assets.data;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jclouds.ContextBuilder;
import org.jclouds.blobstore.AsyncBlobStore;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobBuilder;
import org.jclouds.blobstore.options.GetOptions;
import org.jclouds.blobstore.options.PutOptions;
import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule;
import org.jclouds.logging.log4j.config.Log4JLoggingModule;
import org.jclouds.netty.config.NettyPayloadModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Module;
public class S3BinaryStore implements BinaryStore {
private static final Iterable<? extends Module> MODULES = ImmutableSet
.of( new JavaUrlHttpCommandExecutorServiceModule(), new Log4JLoggingModule(), new NettyPayloadModule() );
private static final Logger LOG = LoggerFactory.getLogger( S3BinaryStore.class );
private static final long FIVE_MB = ( FileUtils.ONE_MB * 5 );
private BlobStoreContext context;
private String accessId;
private String secretKey;
private String bucketName;
private ExecutorService executor = Executors.newFixedThreadPool( 10 );
@Autowired
private EntityManagerFactory emf;
public S3BinaryStore( String accessId, String secretKey, String bucketName ) {
this.accessId = accessId;
this.secretKey = secretKey;
this.bucketName = bucketName;
}
private BlobStoreContext getContext() {
if ( context == null ) {
context = ContextBuilder.newBuilder( "aws-s3" ).credentials( accessId, secretKey ).modules( MODULES )
.buildView( BlobStoreContext.class );
BlobStore blobStore = context.getBlobStore();
blobStore.createContainerInLocation( null, bucketName );
}
return context;
}
public void destroy() {
if ( context != null ) {
context.close();
}
}
@Override
public void write( final UUID appId, final Entity entity, InputStream inputStream ) throws IOException {
final String uploadFileName = AssetUtils.buildAssetKey( appId, entity );
ByteArrayOutputStream baos = new ByteArrayOutputStream();
long written = IOUtils.copyLarge( inputStream, baos, 0, FIVE_MB );
byte[] data = baos.toByteArray();
final Map<String, Object> fileMetadata = AssetUtils.getFileMetadata( entity );
fileMetadata.put( AssetUtils.LAST_MODIFIED, System.currentTimeMillis() );
final String mimeType = AssetMimeHandler.get().getMimeType( entity, data );
if ( written < FIVE_MB ) { // total smaller than 5mb
BlobStore blobStore = getContext().getBlobStore();
BlobBuilder.PayloadBlobBuilder bb = blobStore.blobBuilder( uploadFileName )
.payload( data ).calculateMD5().contentType( mimeType );
fileMetadata.put( AssetUtils.CONTENT_LENGTH, written );
if ( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ) != null ) {
bb.contentDisposition( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ).toString() );
}
final Blob blob = bb.build();
String md5sum = Hex.encodeHexString( blob.getMetadata().getContentMetadata().getContentMD5() );
fileMetadata.put( AssetUtils.CHECKSUM, md5sum );
String eTag = blobStore.putBlob( bucketName, blob );
fileMetadata.put( AssetUtils.E_TAG, eTag );
}
else { // bigger than 5mb... dump 5 mb tmp files and upload from them
// create temp file and copy entire file to that temp file
LOG.debug( "Writing temp file for S3 upload" );
final File tempFile = File.createTempFile( entity.getUuid().toString(), "tmp" );
tempFile.deleteOnExit();
OutputStream os = null;
try {
os = new BufferedOutputStream( new FileOutputStream( tempFile.getAbsolutePath() ) );
os.write( data );
written += IOUtils.copyLarge( inputStream, os, 0, ( FileUtils.ONE_GB * 5 ) );
}
finally {
IOUtils.closeQuietly( os );
}
fileMetadata.put( AssetUtils.CONTENT_LENGTH, written );
// JClouds no longer supports async blob store, so we have to do this fun stuff
LOG.debug( "Starting upload thread" );
Thread uploadThread = new Thread( new Runnable() {
@Override
public void run() {
try {
LOG.debug( "S3 upload thread started" );
BlobStore blobStore = getContext().getBlobStore();
BlobBuilder.PayloadBlobBuilder bb = blobStore.blobBuilder( uploadFileName )
.payload( tempFile ).calculateMD5().contentType( mimeType );
if ( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ) != null ) {
bb.contentDisposition( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ).toString() );
}
final Blob blob = bb.build();
String md5sum = Hex.encodeHexString( blob.getMetadata().getContentMetadata().getContentMD5() );
fileMetadata.put( AssetUtils.CHECKSUM, md5sum );
LOG.debug( "S3 upload starting" );
String eTag = blobStore.putBlob( bucketName, blob );
fileMetadata.put( AssetUtils.E_TAG, eTag );
LOG.debug( "S3 upload complete eTag=" + eTag);
EntityManager em = emf.getEntityManager( appId );
em.update( entity );
tempFile.delete();
}
catch ( Exception e ) {
LOG.error( "error uploading", e );
}
if ( tempFile != null && tempFile.exists() ) {
tempFile.delete();
}
}
});
uploadThread.start();
}
}
@Override
public InputStream read( UUID appId, Entity entity, long offset, long length ) throws IOException {
BlobStore blobStore = getContext().getBlobStore();
Blob blob;
if ( offset == 0 && length == FIVE_MB ) {
blob = blobStore.getBlob( bucketName, AssetUtils.buildAssetKey( appId, entity ) );
}
else {
GetOptions options = GetOptions.Builder.range( offset, length );
blob = blobStore.getBlob( bucketName, AssetUtils.buildAssetKey( appId, entity ), options );
}
if ( blob == null || blob.getPayload() == null ) {
return null;
}
return blob.getPayload().getInput();
}
@Override
public InputStream read( UUID appId, Entity entity ) throws IOException {
return read( appId, entity, 0, FIVE_MB );
}
@Override
public void delete( UUID appId, Entity entity ) {
BlobStore blobStore = getContext().getBlobStore();
blobStore.removeBlob( bucketName, AssetUtils.buildAssetKey( appId, entity ) );
}
}