blob: 6a4b032b9c897bf490f7a09e430e836df09caf9a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.whirr.service;
import static com.google.common.base.Preconditions.checkArgument;
import static org.jclouds.aws.ec2.reference.AWSEC2Constants.PROPERTY_EC2_AMI_QUERY;
import static org.jclouds.aws.ec2.reference.AWSEC2Constants.PROPERTY_EC2_CC_AMI_QUERY;
import static org.jclouds.location.reference.LocationConstants.PROPERTY_REGION;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.lang.StringUtils;
import org.apache.whirr.ClusterSpec;
import org.jclouds.Context;
import org.jclouds.ContextBuilder;
import org.jclouds.apis.ApiMetadata;
import org.jclouds.apis.Apis;
import org.jclouds.byon.BYONApiMetadata;
import org.jclouds.byon.Node;
import org.jclouds.byon.config.BYONComputeServiceContextModule;
import org.jclouds.byon.config.CacheNodeStoreModule;
import org.jclouds.compute.ComputeService;
import org.jclouds.compute.ComputeServiceContext;
import org.jclouds.compute.Utils;
import org.jclouds.compute.domain.ExecResponse;
import org.jclouds.compute.events.StatementOnNodeCompletion;
import org.jclouds.compute.events.StatementOnNodeFailure;
import org.jclouds.compute.events.StatementOnNodeSubmission;
import org.jclouds.enterprise.config.EnterpriseConfigurationModule;
import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;
import org.jclouds.providers.ProviderMetadata;
import org.jclouds.providers.Providers;
import org.jclouds.scriptbuilder.domain.OsFamily;
import org.jclouds.sshj.config.SshjSshClientModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ForwardingObject;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.common.reflect.TypeToken;
import com.google.inject.Module;
/**
* A convenience class for building jclouds {@link ComputeServiceContext} objects.
*/
// singleton enum pattern
public enum ComputeCache implements Function<ClusterSpec, ComputeServiceContext> {
INSTANCE;
private static final Logger LOG = LoggerFactory.getLogger(ComputeCache.class);
@Override
public ComputeServiceContext apply(ClusterSpec arg0) {
return cache.getUnchecked(new Key(arg0));
}
public void invalidate(ClusterSpec arg0) {
cache.invalidate(new Key(arg0));
}
// this should prevent recreating the same compute context twice
@VisibleForTesting
final LoadingCache<Key, ComputeServiceContext> cache = CacheBuilder.newBuilder().build(
new CacheLoader<Key, ComputeServiceContext>(){
@Override
public ComputeServiceContext load(Key arg0) {
LOG.debug("creating new ComputeServiceContext {}", arg0);
ContextBuilder builder;
if (arg0.overrideApiMetadata != null) {
builder = ContextBuilder.newBuilder(arg0.overrideApiMetadata)
.credentials(arg0.identity, arg0.credential)
.overrides(arg0.overrides)
.modules(arg0.modules);
} else {
builder = ContextBuilder.newBuilder(arg0.provider)
.credentials(arg0.identity, arg0.credential)
.overrides(arg0.overrides)
.modules(arg0.modules);
}
if (arg0.endpoint != null)
builder.endpoint(arg0.endpoint);
ComputeServiceContext context = new IgnoreCloseComputeServiceContext(
builder.buildView(ComputeServiceContext.class));
LOG.debug("created new ComputeServiceContext {}", context);
context.utils().eventBus().register(ComputeCache.this);
return context;
}
}
);
@Subscribe
@AllowConcurrentEvents
public void onStart(StatementOnNodeSubmission event) {
LOG.info(">> running {} on node({})", event.getStatement(), event.getNode().getId());
if (LOG.isDebugEnabled()) {
LOG.debug(">> script for {} on node({})\n{}", new Object[] { event.getStatement(), event.getNode().getId(),
event.getStatement().render(OsFamily.UNIX) });
}
}
@Subscribe
@AllowConcurrentEvents
public void onFailure(StatementOnNodeFailure event) {
LOG.error("<< error running {} on node({}): {}", new Object[] { event.getStatement(), event.getNode().getId(),
event.getCause().getMessage() }, event.getCause());
}
@Subscribe
@AllowConcurrentEvents
public void onSuccess(StatementOnNodeCompletion event) {
ExecResponse arg0 = event.getResponse();
if (arg0.getExitStatus() != 0) {
LOG.error("<< error running {} on node({}): {}", new Object[] { event.getStatement(), event.getNode().getId(),
arg0 });
} else {
LOG.info("<< success executing {} on node({}): {}", new Object[] { event.getStatement(),
event.getNode().getId(), arg0 });
}
}
private static class IgnoreCloseComputeServiceContext
extends ForwardingObject implements ComputeServiceContext {
private final ComputeServiceContext context;
public IgnoreCloseComputeServiceContext(final ComputeServiceContext context) {
this.context = context;
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
LOG.debug("closing ComputeServiceContext {}", context);
context.close();
}
});
}
@Override
protected ComputeServiceContext delegate() {
return context;
}
@Override
public ComputeService getComputeService() {
return delegate().getComputeService();
}
@Override
public Utils getUtils() {
return delegate().getUtils();
}
@Override
public Utils utils() {
return delegate().utils();
}
@Override
public void close() {
/* Do nothing. The instance is closed by the builder */
}
@Override
public TypeToken<?> getBackendType() {
return delegate().getBackendType();
}
@Override
public <C extends Context> C unwrap(TypeToken<C> type) {
return delegate().<C>unwrap(type);
}
@Override
public <C extends Context> C unwrap() {
return delegate().<C>unwrap();
}
}
public static final Map<String, ApiMetadata> COMPUTE_APIS = Maps.uniqueIndex(Apis.viewableAs(ComputeServiceContext.class),
Apis.idFunction());
public static final Map<String, ProviderMetadata> COMPUTE_PROVIDERS = Maps.uniqueIndex(Providers.viewableAs(ComputeServiceContext.class),
Providers.idFunction());
public static final Set<String> COMPUTE_KEYS = ImmutableSet.copyOf(Iterables.concat(COMPUTE_PROVIDERS.keySet(), COMPUTE_APIS.keySet()));
/**
* configurable properties, scoped to a provider.
*/
public static final Iterable<String> PROVIDER_PROPERTIES = ImmutableSet.of(
"endpoint", "api", "apiversion", "iso3166-codes","nodes");
/**
* Key class for the compute context cache
*/
private static class Key {
private String provider;
private String endpoint;
private String identity;
private String credential;
private String clusterName;
private ApiMetadata overrideApiMetadata;
private final String key;
private final Properties overrides;
private final Set<Module> modules;
public Key(ClusterSpec spec) {
provider = spec.getProvider();
endpoint = spec.getEndpoint();
identity = spec.getIdentity();
credential = spec.getCredential();
clusterName = spec.getClusterName();
key = Objects.toStringHelper("").omitNullValues()
.add("provider", provider)
.add("endpoint", endpoint)
.add("identity", identity)
.add("clusterName", clusterName).toString();
Configuration jcloudsConfig = spec.getConfigurationForKeysWithPrefix("jclouds");
// jclouds configuration for providers are not prefixed with jclouds.
for (String key : COMPUTE_KEYS) {
for (String property : PROVIDER_PROPERTIES) {
String prefixedProperty = "jclouds." + key + "." + property;
if (jcloudsConfig.containsKey(prefixedProperty)) {
jcloudsConfig.setProperty(key + "." + property,
jcloudsConfig.getProperty(prefixedProperty));
}
}
}
overrides = ConfigurationConverter.getProperties(jcloudsConfig);
if ("aws-ec2".equals(spec.getProvider()) && spec.getTemplate().getImageId() != null) {
enableAWSEC2LazyImageFetching(spec);
}
if ("stub".equals(spec.getProvider())) {
modules = ImmutableSet.<Module>of(new SLF4JLoggingModule(), new DryRunModule());
} else if ("byon".equals(spec.getProvider()) && !spec.getByonNodes().isEmpty()) {
overrideApiMetadata = new BYONApiMetadata()
.toBuilder()
.defaultModule(BYONComputeServiceContextModule.class)
.build();
modules = ImmutableSet.<Module>of(new SLF4JLoggingModule(),
new EnterpriseConfigurationModule(),
new SshjSshClientModule(),
new CacheNodeStoreModule(ImmutableMap.<String,Node>copyOf(spec.getByonNodes())));
} else {
modules = ImmutableSet.<Module>of(new SLF4JLoggingModule(),
new EnterpriseConfigurationModule(), new SshjSshClientModule());
}
}
/**
* AWS EC2 specific optimisation to avoid running queries across
* all regiosn when the image-id is know from the property file
*
* @param spec
*/
private void enableAWSEC2LazyImageFetching(ClusterSpec spec) {
overrides.setProperty(PROPERTY_EC2_AMI_QUERY, "");
overrides.setProperty(PROPERTY_EC2_CC_AMI_QUERY, "");
String[] parts = StringUtils.split(spec.getTemplate().getImageId(), '/');
checkArgument(parts.length == 2, "Expected to find image-id = region/ami-id");
overrides.setProperty(PROPERTY_REGION, parts[0]);
}
@Override
public boolean equals(Object that) {
if (that instanceof Key) {
return Objects.equal(this.key, ((Key)that).key)
&& Objects.equal(overrides, ((Key)that).overrides);
}
return false;
}
@Override
public int hashCode() {
return Objects.hashCode(key, overrides);
}
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("provider", provider)
.add("identity", identity)
.add("overrides", overrides)
.add("clusterName", clusterName)
.toString();
}
}
}