| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.entity.nosql.cassandra; |
| |
| import java.util.Collection; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.brooklyn.api.entity.Entity; |
| import org.apache.brooklyn.api.entity.EntitySpec; |
| import org.apache.brooklyn.api.location.Location; |
| import org.apache.brooklyn.api.policy.PolicySpec; |
| import org.apache.brooklyn.api.sensor.SensorEvent; |
| import org.apache.brooklyn.api.sensor.SensorEventListener; |
| import org.apache.brooklyn.core.entity.Attributes; |
| import org.apache.brooklyn.core.entity.Entities; |
| import org.apache.brooklyn.core.entity.EntityInternal; |
| import org.apache.brooklyn.core.entity.EntityPredicates; |
| import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; |
| import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy; |
| import org.apache.brooklyn.entity.group.DynamicFabricImpl; |
| import org.apache.brooklyn.entity.group.DynamicGroup; |
| import org.apache.brooklyn.util.collections.CollectionFunctionals; |
| import org.apache.brooklyn.util.collections.MutableMap; |
| import org.apache.brooklyn.util.collections.MutableSet; |
| import org.apache.brooklyn.util.time.Time; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Function; |
| import com.google.common.base.Objects; |
| import com.google.common.base.Optional; |
| import com.google.common.base.Supplier; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.LinkedHashMultimap; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Multimap; |
| import com.google.common.collect.Sets; |
| |
| /** |
| * @deprecated since 1.0.0; use {@link 'https://github.com/brooklyncentral/brooklyn-cassandra'} which is a pure YAML template |
| * for a database cluster. |
| */ |
| @Deprecated |
| public class CassandraFabricImpl extends DynamicFabricImpl implements CassandraFabric { |
| |
| private static final Logger log = LoggerFactory.getLogger(CassandraFabricImpl.class); |
| |
| // Mutex for synchronizing during re-size operations |
| private final Object mutex = new Object[0]; |
| |
| private final Supplier<Set<Entity>> defaultSeedSupplier = new Supplier<Set<Entity>>() { |
| @Override public Set<Entity> get() { |
| // TODO Remove duplication from CassandraClusterImpl.defaultSeedSupplier |
| Set<Entity> seeds = getAttribute(CURRENT_SEEDS); |
| boolean hasPublishedSeeds = Boolean.TRUE.equals(getAttribute(HAS_PUBLISHED_SEEDS)); |
| int quorumSize = getSeedQuorumSize(); |
| |
| // update seeds if we're not quorate; note this may not work for dynamically adding new datacenters |
| // as we do not take a new seed from the new datacenter |
| if (seeds == null || seeds.size() < quorumSize || containsDownEntity(seeds)) { |
| Set<Entity> newseeds; |
| Map<CassandraDatacenter,Set<Entity>> potentialSeeds = MutableMap.of(); |
| int potentialSeedCount = 0; |
| for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { |
| Set<Entity> dcPotentialSeeds = member.gatherPotentialSeeds(); |
| potentialSeeds.put(member, dcPotentialSeeds); |
| potentialSeedCount += dcPotentialSeeds.size(); |
| } |
| |
| if (hasPublishedSeeds) { |
| Set<Entity> currentSeeds = getAttribute(CURRENT_SEEDS); |
| Lifecycle serviceState = getAttribute(SERVICE_STATE_ACTUAL); |
| if (serviceState == Lifecycle.STARTING) { |
| if (Sets.intersection(currentSeeds, ImmutableSet.copyOf(Iterables.concat(potentialSeeds.values()))).isEmpty()) { |
| log.warn("Fabric {} lost all its seeds while starting! Subsequent failure likely, but changing seeds during startup would risk split-brain: seeds={}", new Object[] {CassandraFabricImpl.this, currentSeeds}); |
| } |
| newseeds = currentSeeds; |
| } else if (serviceState == Lifecycle.STOPPING || serviceState == Lifecycle.STOPPED) { |
| if (log.isTraceEnabled()) log.trace("Fabric {} ignoring any potential seed-changes, because {}: seeds={}", new Object[] {CassandraFabricImpl.this, serviceState, currentSeeds}); |
| newseeds = currentSeeds; |
| } else if (potentialSeedCount == 0) { |
| // TODO Could be race where nodes have only just returned from start() and are about to |
| // transition to serviceUp; so don't just abandon all our seeds! |
| log.warn("Fabric {} has no seeds (after startup); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraFabricImpl.this}); |
| newseeds = currentSeeds; |
| } else if (!allNonEmpty(potentialSeeds.values())) { |
| log.warn("Fabric {} has datacenter with no seeds (after startup); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraFabricImpl.this}); |
| newseeds = currentSeeds; |
| } else { |
| Set<Entity> result = selectSeeds(quorumSize, potentialSeeds); |
| if (log.isDebugEnabled() && !Objects.equal(seeds, result)) { |
| log.debug("Fabric {} updating seeds: chosen={}; potential={}", new Object[] {CassandraFabricImpl.this, result, potentialSeeds}); |
| } |
| newseeds = result; |
| } |
| } else if (potentialSeedCount < quorumSize) { |
| if (log.isDebugEnabled()) log.debug("Not setting seeds of fabric {} yet, because still waiting for quorum (need {}; have {} potentials from {} members)", new Object[] {CassandraFabricImpl.this, quorumSize, potentialSeedCount, getMembers()}); |
| newseeds = ImmutableSet.of(); |
| } else if (!allNonEmpty(potentialSeeds.values())) { |
| if (log.isDebugEnabled()) { |
| Map<CassandraDatacenter, Integer> datacenterCounts = Maps.transformValues(potentialSeeds, CollectionFunctionals.sizeFunction()); |
| log.debug("Not setting seeds of fabric {} yet, because not all datacenters have seeds (sizes are {})", new Object[] {CassandraFabricImpl.this, datacenterCounts}); |
| } |
| newseeds = ImmutableSet.of(); |
| } else { |
| // yay, we're quorate |
| Set<Entity> result = selectSeeds(quorumSize, potentialSeeds); |
| log.info("Fabric {} has reached seed quorum: seeds={}", new Object[] {CassandraFabricImpl.this, result}); |
| newseeds = result; |
| } |
| |
| if (!Objects.equal(seeds, newseeds)) { |
| sensors().set(CURRENT_SEEDS, newseeds); |
| |
| if (newseeds != null && newseeds.size() > 0) { |
| sensors().set(HAS_PUBLISHED_SEEDS, true); |
| |
| // Need to tell every datacenter that seeds are ready. |
| // Otherwise a datacenter might get no more changes (e.g. to nodes' hostnames etc), |
| // and not call seedSupplier.get() again. |
| for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { |
| member.update(); |
| } |
| } |
| return newseeds; |
| } else { |
| return seeds; |
| } |
| } else { |
| if (log.isTraceEnabled()) log.trace("Not refresheed seeds of fabric {}, because have quorum {} (of {} members), and none are down: seeds={}", |
| new Object[] {CassandraFabricImpl.class, quorumSize, getMembers().size(), seeds}); |
| return seeds; |
| } |
| } |
| private boolean allNonEmpty(Collection<? extends Collection<Entity>> contenders) { |
| for (Collection<Entity> contender: contenders) |
| if (contender.isEmpty()) return false; |
| return true; |
| } |
| private Set<Entity> selectSeeds(int num, Map<CassandraDatacenter,? extends Collection<Entity>> contenders) { |
| // Prefer existing seeds wherever possible; |
| // otherwise prefer a seed from each sub-cluster; |
| // otherwise accept any other contenders |
| Set<Entity> currentSeeds = (getAttribute(CURRENT_SEEDS) != null) ? getAttribute(CURRENT_SEEDS) : ImmutableSet.<Entity>of(); |
| MutableSet<Entity> result = MutableSet.of(); |
| result.addAll(Sets.intersection(currentSeeds, ImmutableSet.copyOf(contenders.values()))); |
| for (CassandraDatacenter cluster : contenders.keySet()) { |
| Set<Entity> contendersInCluster = Sets.newLinkedHashSet(contenders.get(cluster)); |
| if (contendersInCluster.size() > 0 && Sets.intersection(result, contendersInCluster).isEmpty()) { |
| result.add(Iterables.getFirst(contendersInCluster, null)); |
| } |
| } |
| result.addAll(Iterables.concat(contenders.values())); |
| return ImmutableSet.copyOf(Iterables.limit(result, num)); |
| } |
| private boolean containsDownEntity(Set<Entity> seeds) { |
| for (Entity seed : seeds) { |
| if (!isViableSeed(seed)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| public boolean isViableSeed(Entity member) { |
| // TODO remove duplication from CassandraClusterImpl.SeedTracker.isViableSeed |
| boolean managed = Entities.isManaged(member); |
| String hostname = member.getAttribute(Attributes.HOSTNAME); |
| boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP)); |
| Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL); |
| boolean hasFailed = !managed || (serviceState == Lifecycle.ON_FIRE) || (serviceState == Lifecycle.RUNNING && !serviceUp) || (serviceState == Lifecycle.STOPPED); |
| boolean result = (hostname != null && !hasFailed); |
| if (log.isTraceEnabled()) log.trace("Node {} in Fabric {}: viableSeed={}; hostname={}; serviceUp={}; serviceState={}; hasFailed={}", new Object[] {member, CassandraFabricImpl.this, result, hostname, serviceUp, serviceState, hasFailed}); |
| return result; |
| } |
| }; |
| |
| public CassandraFabricImpl() { |
| } |
| |
| @Override |
| public void init() { |
| super.init(); |
| |
| if (!config().getRaw(CassandraDatacenter.SEED_SUPPLIER).isPresentAndNonNull()) |
| config().set(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier()); |
| |
| // track members |
| policies().add(PolicySpec.create(MemberTrackingPolicy.class) |
| .displayName("Cassandra Fabric Tracker") |
| .configure("group", this)); |
| |
| // Track first node's startup |
| subscriptions().subscribeToMembers(this, CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, new SensorEventListener<Long>() { |
| @Override |
| public void onEvent(SensorEvent<Long> event) { |
| Long oldval = getAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC); |
| Long newval = event.getValue(); |
| if (oldval == null && newval != null) { |
| sensors().set(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, newval); |
| for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { |
| ((EntityInternal)member).sensors().set(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, newval); |
| } |
| } |
| } |
| }); |
| |
| // Track the datacenters for this cluster |
| subscriptions().subscribeToMembers(this, CassandraDatacenter.DATACENTER_USAGE, new SensorEventListener<Multimap<String,Entity>>() { |
| @Override |
| public void onEvent(SensorEvent<Multimap<String,Entity>> event) { |
| Multimap<String, Entity> usage = calculateDatacenterUsage(); |
| sensors().set(DATACENTER_USAGE, usage); |
| sensors().set(DATACENTERS, usage.keySet()); |
| } |
| }); |
| subscriptions().subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() { |
| @Override public void onEvent(SensorEvent<Entity> event) { |
| Multimap<String, Entity> usage = calculateDatacenterUsage(); |
| sensors().set(DATACENTER_USAGE, usage); |
| sensors().set(DATACENTERS, usage.keySet()); |
| } |
| }); |
| } |
| |
| public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { |
| @Override |
| protected void onEntityChange(Entity member) { |
| if (log.isDebugEnabled()) log.debug("Location {} updated in Fabric {}", member, entity); |
| ((CassandraFabricImpl)entity).update(); |
| } |
| @Override |
| protected void onEntityAdded(Entity member) { |
| if (log.isDebugEnabled()) log.debug("Location {} added to Fabric {}", member, entity); |
| ((CassandraFabricImpl)entity).update(); |
| } |
| @Override |
| protected void onEntityRemoved(Entity member) { |
| if (log.isDebugEnabled()) log.debug("Location {} removed from Fabric {}", member, entity); |
| ((CassandraFabricImpl)entity).update(); |
| } |
| }; |
| |
| protected int getSeedQuorumSize() { |
| Integer quorumSize = getConfig(INITIAL_QUORUM_SIZE); |
| if (quorumSize!=null && quorumSize>0) |
| return quorumSize; |
| |
| int initialSizeSum = 0; |
| for (CassandraDatacenter cluster : Iterables.filter(getMembers(), CassandraDatacenter.class)) { |
| initialSizeSum += cluster.getConfig(CassandraDatacenter.INITIAL_SIZE); |
| } |
| if (initialSizeSum>5) initialSizeSum /= 2; |
| else if (initialSizeSum>3) initialSizeSum -= 2; |
| else if (initialSizeSum>2) initialSizeSum -= 1; |
| |
| return Math.min(Math.max(initialSizeSum, 1), CassandraFabric.DEFAULT_SEED_QUORUM); |
| } |
| |
| /** |
| * Sets the default {@link #MEMBER_SPEC} to describe the Cassandra sub-clusters. |
| */ |
| @Override |
| protected EntitySpec<?> getMemberSpec() { |
| // Need to set the seedSupplier, even if the caller has overridden the CassandraCluster config |
| // (unless they've explicitly overridden the seedSupplier as well!) |
| // TODO probably don't need to anymore, as it is set on the Fabric here -- just make sure there is a default! |
| EntitySpec<?> custom = getConfig(MEMBER_SPEC); |
| if (custom == null) { |
| return EntitySpec.create(CassandraDatacenter.class) |
| .configure(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier()); |
| } else if (custom.getConfig().containsKey(CassandraDatacenter.SEED_SUPPLIER) || custom.getFlags().containsKey("seedSupplier")) { |
| return custom; |
| } else { |
| return EntitySpec.create(custom) |
| .configure(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier()); |
| } |
| } |
| |
| @Override |
| protected Entity createCluster(Location location, Map flags) { |
| Function<Location, String> dataCenterNamer = getConfig(DATA_CENTER_NAMER); |
| if (dataCenterNamer != null) { |
| flags = ImmutableMap.builder() |
| .putAll(flags) |
| .put(CassandraNode.DATACENTER_NAME, dataCenterNamer.apply(location)) |
| .build(); |
| } |
| return super.createCluster(location, flags); |
| } |
| |
| /** |
| * Prefers one node per location, and then others from anywhere. |
| * Then trims result down to the "quorumSize". |
| */ |
| public Supplier<Set<Entity>> getSeedSupplier() { |
| return defaultSeedSupplier; |
| } |
| |
| @Override |
| public void start(Collection<? extends Location> locations) { |
| super.start(locations); |
| |
| connectSensors(); |
| |
| // TODO wait until all nodes which we think are up are consistent |
| // i.e. all known nodes use the same schema, as reported by |
| // SshEffectorTasks.ssh("echo \"describe cluster;\" | /bin/cassandra-cli"); |
| // once we've done that we can revert to using 2 seed nodes. |
| // see CassandraCluster.DEFAULT_SEED_QUORUM |
| Time.sleep(getConfig(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER)); |
| |
| update(); |
| } |
| |
| protected void connectSensors() { |
| connectEnrichers(); |
| } |
| |
| protected void connectEnrichers() { |
| // TODO Aggregate across sub-clusters |
| |
| subscriptions().subscribeToMembers(this, SERVICE_UP, new SensorEventListener<Boolean>() { |
| @Override public void onEvent(SensorEvent<Boolean> event) { |
| sensors().set(SERVICE_UP, calculateServiceUp()); |
| } |
| }); |
| } |
| |
| @Override |
| public void stop() { |
| disconnectSensors(); |
| |
| super.stop(); |
| } |
| |
| protected void disconnectSensors() { |
| } |
| |
| protected boolean calculateServiceUp() { |
| Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE)); |
| return upNode.isPresent(); |
| } |
| |
| protected Multimap<String, Entity> calculateDatacenterUsage() { |
| Multimap<String, Entity> result = LinkedHashMultimap.<String, Entity>create(); |
| for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { |
| Multimap<String, Entity> memberUsage = member.getAttribute(CassandraDatacenter.DATACENTER_USAGE); |
| if (memberUsage != null) result.putAll(memberUsage); |
| } |
| return result; |
| } |
| |
| @Override |
| public void update() { |
| synchronized (mutex) { |
| for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { |
| member.update(); |
| } |
| |
| calculateServiceUp(); |
| |
| // Choose the first available location to set host and port (and compute one-up) |
| Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE)); |
| |
| if (upNode.isPresent()) { |
| sensors().set(HOSTNAME, upNode.get().getAttribute(Attributes.HOSTNAME)); |
| sensors().set(THRIFT_PORT, upNode.get().getAttribute(CassandraNode.THRIFT_PORT)); |
| } |
| } |
| } |
| } |