blob: 0d323c84f861faed26314483633c9ab38a50d17a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.examples.blobstore.hdfs;
import static org.jclouds.Constants.PROPERTY_ENDPOINT;
import static org.jclouds.blobstore.options.PutOptions.Builder.multipart;
import static org.jclouds.location.reference.LocationConstants.ENDPOINT;
import static org.jclouds.location.reference.LocationConstants.PROPERTY_REGION;
import java.io.IOException;
import java.util.Properties;
import javax.ws.rs.core.MediaType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.jclouds.aws.domain.Region;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.BlobStoreContextFactory;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.util.BlobStoreUtils;
import org.jclouds.examples.blobstore.hdfs.config.HdfsModule;
import org.jclouds.examples.blobstore.hdfs.io.payloads.HdfsPayload;
import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule;
import org.jclouds.logging.log4j.config.Log4JLoggingModule;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.inject.Module;
/**
* Demonstrates the use of {@link BlobStore} to upload from HDFS to a blob container
*
* Usage is: java MainApp \"provider\" \"identity\" \"credential\" \"hdfsUrl\"
* \"containerName\" \"objectName\" plainhttp threadcount
*
* \"plainhttp\" and \"threadcound\" is optional if all the rest of parameters are omitted
*
* @author Tibor Kiss
*/
public class MainApp extends Configured {
public static int PARAMETERS = 6;
public static String INVALID_SYNTAX = "Invalid number of parameters. Syntax is: \"provider\" \"identity\" \"credential\" \"localFileName\" \"containerName\" \"objectName\" plainhttp threadcount";
public final static Properties PLAIN_HTTP_ENDPOINTS = new Properties();
static {
PLAIN_HTTP_ENDPOINTS.setProperty(PROPERTY_ENDPOINT, "http://s3.amazonaws.com");
PLAIN_HTTP_ENDPOINTS.setProperty(PROPERTY_REGION + "." + Region.US_STANDARD + "." + ENDPOINT,
"http://s3.amazonaws.com");
PLAIN_HTTP_ENDPOINTS.setProperty(PROPERTY_REGION + "." + Region.US_WEST_1 + "." + ENDPOINT,
"http://s3-us-west-1.amazonaws.com");
PLAIN_HTTP_ENDPOINTS.setProperty(PROPERTY_REGION + "." + "EU" + "." + ENDPOINT,
"http://s3-eu-west-1.amazonaws.com");
PLAIN_HTTP_ENDPOINTS.setProperty(PROPERTY_REGION + "." + Region.AP_SOUTHEAST_1 + "." + ENDPOINT,
"http://s3-ap-southeast-1.amazonaws.com");
}
final static Iterable<? extends Module> HDFS_MODULES =
ImmutableSet.of(new JavaUrlHttpCommandExecutorServiceModule(), new Log4JLoggingModule(), new HdfsModule());
static String getSpeed(long speed) {
if (speed < 1024) {
return "" + speed + " bytes/s";
} else if (speed < 1048576) {
return "" + (speed / 1024) + " kbytes/s";
} else {
return "" + (speed / 1048576) + " Mbytes/s";
}
}
static void printSpeed(String message, long start, long length) {
long sec = (System.currentTimeMillis() - start) / 1000;
if (sec == 0)
return;
long speed = length / sec;
System.out.print(message);
if (speed < 1024) {
System.out.print(" " + length + " bytes");
} else if (speed < 1048576) {
System.out.print(" " + (length / 1024) + " kB");
} else if (speed < 1073741824) {
System.out.print(" " + (length / 1048576) + " MB");
} else {
System.out.print(" " + (length / 1073741824) + " GB");
}
System.out.println(" with " + getSpeed(speed) + " (" + length + " bytes)");
}
/**
* @param provider
* @param identity
* @param credential
* @param hdfsUrl
* @param containerName
* @param objectName
* @param plainhttp
* @param threadcount
* @throws IOException
*/
private void upload(String provider, String identity,
String credential, String hdfsUrl, String containerName,
String objectName, boolean plainhttp, String threadcount)
throws IOException {
// Init
Properties overrides = new Properties();
if (plainhttp)
overrides.putAll(PLAIN_HTTP_ENDPOINTS); // default is https
if (threadcount != null)
overrides.setProperty("jclouds.mpu.parallel.degree", threadcount); // without setting,
// default is 4 threads
overrides.setProperty(provider + ".identity", identity);
overrides.setProperty(provider + ".credential", credential);
BlobStoreContext context = new BlobStoreContextFactory().createContext(provider, HDFS_MODULES, overrides);
try {
long start = System.currentTimeMillis();
Configuration conf = getConf();
if (conf == null) {
conf = new Configuration();
setConf(conf);
}
// Create Container
BlobStore blobStore = context.getBlobStore(); // it can be changed to sync
// BlobStore
blobStore.createContainerInLocation(null, containerName);
Blob blob = blobStore.blobBuilder(objectName).payload(
new HdfsPayload(new Path(hdfsUrl), conf))
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.contentDisposition(objectName).build();
long length = blob.getPayload().getContentMetadata().getContentLength();
blobStore.putBlob(containerName, blob, multipart());
printSpeed("Sucessfully uploaded", start, length);
} finally {
// Close connection
context.close();
}
}
public static void main(String[] args) throws IOException {
if (args.length < PARAMETERS)
throw new IllegalArgumentException(INVALID_SYNTAX);
// Args
String provider = args[0];
if (!Iterables.contains(BlobStoreUtils.getSupportedProviders(), provider))
throw new IllegalArgumentException("provider " + provider + " not in supported list: "
+ BlobStoreUtils.getSupportedProviders());
String identity = args[1];
String credential = args[2];
String hdfsUrl = args[3];
String containerName = args[4];
String objectName = args[5];
boolean plainhttp = args.length >= 7 && "plainhttp".equals(args[6]);
String threadcount = args.length >= 8 ? args[7] : null;
MainApp app = new MainApp();
app.upload(provider, identity, credential, hdfsUrl, containerName,
objectName, plainhttp, threadcount);
System.exit(0);
}
}