| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.entity.nosql.riak; |
| |
| import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth; |
| |
| import java.net.URI; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.brooklyn.api.entity.Entity; |
| import org.apache.brooklyn.api.entity.EntitySpec; |
| import org.apache.brooklyn.api.policy.PolicySpec; |
| import org.apache.brooklyn.api.sensor.AttributeSensor; |
| import org.apache.brooklyn.api.sensor.EnricherSpec; |
| import org.apache.brooklyn.core.entity.Attributes; |
| import org.apache.brooklyn.core.entity.Entities; |
| import org.apache.brooklyn.core.entity.EntityInternal; |
| import org.apache.brooklyn.core.entity.EntityPredicates; |
| import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; |
| import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; |
| import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic; |
| import org.apache.brooklyn.core.entity.trait.Startable; |
| import org.apache.brooklyn.core.sensor.DependentConfiguration; |
| import org.apache.brooklyn.enricher.stock.Enrichers; |
| import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy; |
| import org.apache.brooklyn.entity.group.DynamicClusterImpl; |
| import org.apache.brooklyn.util.core.task.Tasks; |
| import org.apache.brooklyn.util.time.Duration; |
| import org.apache.brooklyn.util.time.Time; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Function; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Optional; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Predicates; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| |
| public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { |
| |
| private static final Logger log = LoggerFactory.getLogger(RiakClusterImpl.class); |
| |
| private transient Object mutex = new Object[0]; |
| |
| @Override |
| public void init() { |
| super.init(); |
| log.info("Initializing the riak cluster..."); |
| sensors().set(IS_CLUSTER_INIT, false); |
| } |
| |
| @Override |
| protected void doStart() { |
| super.doStart(); |
| connectSensors(); |
| |
| try { |
| Duration delay = getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER); |
| Tasks.setBlockingDetails("Sleeping for "+delay+" before advertising cluster available"); |
| Time.sleep(delay); |
| } finally { |
| Tasks.resetBlockingDetails(); |
| } |
| |
| //FIXME: add a quorum to tolerate failed nodes before setting on fire. |
| @SuppressWarnings("unchecked") |
| Optional<Entity> anyNode = Iterables.tryFind(getMembers(), Predicates.and( |
| Predicates.instanceOf(RiakNode.class), |
| EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true), |
| EntityPredicates.attributeEqualTo(RiakNode.SERVICE_UP, true))); |
| if (anyNode.isPresent()) { |
| sensors().set(IS_CLUSTER_INIT, true); |
| } else { |
| log.warn("No Riak Nodes are found on the cluster: {}. Initialization Failed", getId()); |
| ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE); |
| } |
| } |
| |
| @Override |
| protected EntitySpec<?> getMemberSpec() { |
| EntitySpec<?> result = config().get(MEMBER_SPEC); |
| if (result!=null) return result; |
| return EntitySpec.create(RiakNode.class); |
| } |
| |
| protected void connectSensors() { |
| policies().add(PolicySpec.create(MemberTrackingPolicy.class) |
| .displayName("Controller targets tracker") |
| .configure("sensorsToTrack", ImmutableSet.of(RiakNode.SERVICE_UP)) |
| .configure("group", this)); |
| |
| EnricherSpec<?> first = Enrichers.builder() |
| .aggregating(Attributes.MAIN_URI) |
| .publishing(Attributes.MAIN_URI) |
| .computing(new Function<Collection<URI>,URI>() { |
| @Override |
| public URI apply(Collection<URI> input) { |
| return input.iterator().next(); |
| } }) |
| .fromMembers() |
| .build(); |
| enrichers().add(first); |
| |
| Map<? extends AttributeSensor<? extends Number>, ? extends AttributeSensor<? extends Number>> enricherSetup = |
| ImmutableMap.<AttributeSensor<? extends Number>, AttributeSensor<? extends Number>>builder() |
| .put(RiakNode.NODE_PUTS, RiakCluster.NODE_PUTS_1MIN_PER_NODE) |
| .put(RiakNode.NODE_GETS, RiakCluster.NODE_GETS_1MIN_PER_NODE) |
| .put(RiakNode.NODE_OPS, RiakCluster.NODE_OPS_1MIN_PER_NODE) |
| .build(); |
| // construct sum and average over cluster |
| for (AttributeSensor<? extends Number> nodeSensor : enricherSetup.keySet()) { |
| addSummingMemberEnricher(nodeSensor); |
| addAveragingMemberEnricher(nodeSensor, enricherSetup.get(nodeSensor)); |
| } |
| } |
| |
| private void addAveragingMemberEnricher(AttributeSensor<? extends Number> fromSensor, AttributeSensor<? extends Number> toSensor) { |
| enrichers().add(Enrichers.builder() |
| .aggregating(fromSensor) |
| .publishing(toSensor) |
| .fromMembers() |
| .computingAverage() |
| .build() |
| ); |
| } |
| |
| private void addSummingMemberEnricher(AttributeSensor<? extends Number> source) { |
| enrichers().add(Enrichers.builder() |
| .aggregating(source) |
| .publishing(source) |
| .fromMembers() |
| .computingSum() |
| .build() |
| ); |
| } |
| |
| protected void onServerPoolMemberChanged(final Entity member) { |
| synchronized (mutex) { |
| log.trace("For {}, considering membership of {} which is in locations {}", new Object[]{ this, member, member.getLocations() }); |
| |
| Map<Entity, String> nodes = getAttribute(RIAK_CLUSTER_NODES); |
| if (belongsInServerPool(member)) { |
| // TODO can we discover the nodes by asking the riak cluster, rather than assuming what we add will be in there? |
| // TODO and can we do join as part of node starting? |
| |
| if (nodes == null) { |
| nodes = Maps.newLinkedHashMap(); |
| } |
| String riakName = getRiakName(member); |
| Preconditions.checkNotNull(riakName); |
| |
| // flag a first node to be the first node in the riak cluster. |
| Boolean firstNode = getAttribute(IS_FIRST_NODE_SET); |
| if (!Boolean.TRUE.equals(firstNode)) { |
| sensors().set(IS_FIRST_NODE_SET, Boolean.TRUE); |
| |
| nodes.put(member, riakName); |
| sensors().set(RIAK_CLUSTER_NODES, nodes); |
| |
| ((EntityInternal) member).sensors().set(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE); |
| |
| log.info("Added initial Riak node {}: {}; {} to new cluster", new Object[] { this, member, getRiakName(member) }); |
| } else { |
| // TODO: be wary of erroneous nodes but are still flagged 'in cluster' |
| // add the new node to be part of the riak cluster. |
| Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and( |
| Predicates.instanceOf(RiakNode.class), |
| EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true))); |
| if (anyNodeInCluster.isPresent()) { |
| if (!nodes.containsKey(member) && member.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER) == null) { |
| String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME); |
| Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName).blockUntilEnded(); |
| nodes.put(member, riakName); |
| sensors().set(RIAK_CLUSTER_NODES, nodes); |
| log.info("Added Riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) }); |
| } |
| } else { |
| log.error("isFirstNodeSet, but no cluster members found to add {}", member.getId()); |
| } |
| } |
| } else { |
| if (nodes != null && nodes.containsKey(member)) { |
| DependentConfiguration.attributeWhenReady(member, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Predicates.equalTo(false)).blockUntilEnded(Duration.TWO_MINUTES); |
| @SuppressWarnings("unchecked") |
| Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and( |
| EntityPredicates.isManaged(), |
| Predicates.instanceOf(RiakNode.class), |
| EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true), |
| Predicates.not(Predicates.equalTo(member)))); |
| if (anyNodeInCluster.isPresent()) { |
| Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.REMOVE_FROM_CLUSTER, getRiakName(member)).blockUntilEnded(); |
| } |
| nodes.remove(member); |
| sensors().set(RIAK_CLUSTER_NODES, nodes); |
| log.info("Removed Riak node {}: {}; {} from cluster", new Object[]{ this, member, getRiakName(member) }); |
| } |
| } |
| |
| ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyMap(this, RIAK_CLUSTER_NODES); |
| |
| calculateClusterAddresses(); |
| } |
| } |
| |
| private void calculateClusterAddresses() { |
| List<String> addresses = Lists.newArrayList(); |
| List<String> addressesPbPort = Lists.newArrayList(); |
| for (Entity entity : this.getMembers()) { |
| if (entity instanceof RiakNode && entity.getAttribute(Attributes.SERVICE_UP)) { |
| RiakNode riakNode = (RiakNode) entity; |
| addresses.add(riakNode.getAttribute(Attributes.SUBNET_HOSTNAME) + ":" + riakNode.getAttribute(RiakNode.RIAK_WEB_PORT)); |
| addressesPbPort.add(riakNode.getAttribute(Attributes.SUBNET_HOSTNAME) + ":" + riakNode.getAttribute(RiakNode.RIAK_PB_PORT)); |
| } |
| } |
| sensors().set(RiakCluster.NODE_LIST, Joiner.on(",").join(addresses)); |
| sensors().set(RiakCluster.NODE_LIST_PB_PORT, Joiner.on(",").join(addressesPbPort)); |
| } |
| |
| protected boolean belongsInServerPool(Entity member) { |
| if (!groovyTruth(member.getAttribute(Startable.SERVICE_UP))) { |
| log.trace("Members of {}, checking {}, eliminating because not up", this, member); |
| return false; |
| } |
| if (!getMembers().contains(member)) { |
| log.trace("Members of {}, checking {}, eliminating because not member", this, member); |
| return false; |
| } |
| log.trace("Members of {}, checking {}, approving", this, member); |
| |
| return true; |
| } |
| |
| private String getRiakName(Entity node) { |
| return node.getAttribute(RiakNode.RIAK_NODE_NAME); |
| } |
| |
| public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { |
| @Override |
| protected void onEntityEvent(EventType type, Entity entity) { |
| ((RiakClusterImpl) super.entity).onServerPoolMemberChanged(entity); |
| } |
| } |
| } |