| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.entity.nosql.cassandra; |
| |
| import java.math.BigInteger; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.brooklyn.api.catalog.Catalog; |
| import org.apache.brooklyn.api.entity.Entity; |
| import org.apache.brooklyn.api.entity.ImplementedBy; |
| import org.apache.brooklyn.api.sensor.AttributeSensor; |
| import org.apache.brooklyn.config.ConfigKey; |
| import org.apache.brooklyn.core.annotation.Effector; |
| import org.apache.brooklyn.core.config.ConfigKeys; |
| import org.apache.brooklyn.core.effector.Effectors; |
| import org.apache.brooklyn.core.effector.MethodEffector; |
| import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey; |
| import org.apache.brooklyn.core.sensor.Sensors; |
| import org.apache.brooklyn.entity.nosql.cassandra.TokenGenerators.PosNeg63TokenGenerator; |
| import org.apache.brooklyn.entity.database.DatastoreMixins; |
| import org.apache.brooklyn.entity.group.DynamicCluster; |
| import org.apache.brooklyn.util.core.flags.SetFromFlag; |
| import org.apache.brooklyn.util.time.Duration; |
| |
| import com.google.common.base.Supplier; |
| import com.google.common.collect.Multimap; |
| import com.google.common.reflect.TypeToken; |
| |
| /** |
| * @deprecated since 1.0.0; use {@link 'https://github.com/brooklyncentral/brooklyn-cassandra'} which is a pure YAML template |
| * for a database cluster. |
| */ |
| @Deprecated |
| @Catalog(name="Apache Cassandra Datacenter Cluster", description="Cassandra is a highly scalable, eventually " + |
| "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " + |
| "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg") |
| @ImplementedBy(CassandraDatacenterImpl.class) |
| public interface CassandraDatacenter extends DynamicCluster, DatastoreMixins.HasDatastoreUrl, DatastoreMixins.CanExecuteScript { |
| |
| // FIXME datacenter name -- also CASS_CLUSTER_NODES should be CASS_DC_NODES |
| @SetFromFlag("clusterName") |
| BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class, "cassandra.cluster.name", "Name of the Cassandra cluster", "BrooklynCluster"); |
| |
| @SetFromFlag("snitchName") |
| ConfigKey<String> ENDPOINT_SNITCH_NAME = ConfigKeys.newStringConfigKey("cassandra.cluster.snitchName", "Type of the Cassandra snitch", "SimpleSnitch"); |
| |
| @SetFromFlag("seedSupplier") |
| @SuppressWarnings("serial") |
| ConfigKey<Supplier<Set<Entity>>> SEED_SUPPLIER = ConfigKeys.newConfigKey(new TypeToken<Supplier<Set<Entity>>>() { }, "cassandra.cluster.seedSupplier", "For determining the seed nodes", null); |
| |
| @SuppressWarnings("serial") |
| @SetFromFlag("tokenGeneratorClass") |
| ConfigKey<Class<? extends TokenGenerator>> TOKEN_GENERATOR_CLASS = ConfigKeys.newConfigKey( |
| new TypeToken<Class<? extends TokenGenerator>>() {}, "cassandra.cluster.tokenGenerator.class", "For determining the tokens of nodes", |
| PosNeg63TokenGenerator.class); |
| |
| @SetFromFlag("tokenShift") |
| ConfigKey<BigInteger> TOKEN_SHIFT = ConfigKeys.newConfigKey(BigInteger.class, "cassandra.cluster.tokenShift", |
| "Delta applied to all tokens generated for this Cassandra datacenter, " |
| + "useful when configuring multiple datacenters which should be shifted; " |
| + "if not set, a random shift is applied. (Pass 0 to prevent any shift.)", null); |
| |
| ConfigKey<Boolean> USE_VNODES = ConfigKeys.newBooleanConfigKey( |
| "cassandra.cluster.useVnodes", |
| "Determines whether to use vnodes; if doing so, tokens will not be explicitly assigned to nodes in the cluster", |
| false); |
| |
| /** |
| * num_tokens will automatically be reset to 1 for each node if {@link #USE_VNODES} is false. |
| */ |
| ConfigKey<Integer> NUM_TOKENS_PER_NODE = ConfigKeys.newIntegerConfigKey("cassandra.numTokensPerNode", |
| "Number of tokens per node; if using vnodes, should set this to a value like 256; will be overridden to 1 if USE_VNODES==false", |
| 256); |
| |
| /** |
| * Additional time after the nodes in the cluster are up when starting |
| * before announcing the cluster as up. |
| * <p> |
| * Useful to ensure nodes have synchronized. |
| * <p> |
| * On 1.2.2 this could be as much as 120s when using 2 seed nodes, |
| * or just a few seconds with 1 seed node. On 1.2.9 it seems a few |
| * seconds is sufficient even with 2 seed nodes |
| */ |
| @SetFromFlag("delayBeforeAdvertisingCluster") |
| ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "cassandra.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before checking and advertising its availability", Duration.TEN_SECONDS); |
| |
| @SuppressWarnings("serial") |
| AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = Sensors.newSensor(new TypeToken<Multimap<String,Entity>>() { }, "cassandra.cluster.datacenterUsages", "Current set of datacenters in use, with nodes in each"); |
| |
| @SuppressWarnings("serial") |
| AttributeSensor<Set<String>> DATACENTERS = Sensors.newSensor(new TypeToken<Set<String>>() { }, "cassandra.cluster.datacenters", "Current set of datacenters in use"); |
| |
| AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = Sensors.newBooleanSensor("cassandra.cluster.seeds.hasPublished", "Whether we have published any seeds"); |
| |
| @SuppressWarnings("serial") |
| AttributeSensor<Set<Entity>> CURRENT_SEEDS = Sensors.newSensor(new TypeToken<Set<Entity>>() { }, "cassandra.cluster.seeds.current", "Current set of seeds to use to bootstrap the cluster"); |
| |
| AttributeSensor<String> HOSTNAME = Sensors.newStringSensor("cassandra.cluster.hostname", "Hostname to connect to cluster with"); |
| |
| @SuppressWarnings("serial") |
| AttributeSensor<List<String>> CASSANDRA_CLUSTER_NODES = Sensors.newSensor(new TypeToken<List<String>>() {}, |
| "cassandra.cluster.nodes", "List of host:port of all active nodes in the cluster (thrift port, and public hostname/IP)"); |
| |
| AttributeSensor<Integer> THRIFT_PORT = Sensors.newIntegerSensor("cassandra.cluster.thrift.port", "Cassandra Thrift RPC port to connect to cluster with"); |
| |
| AttributeSensor<Long> FIRST_NODE_STARTED_TIME_UTC = Sensors.newLongSensor("cassandra.cluster.first.node.started.utc", "Time (UTC) when the first node was started"); |
| @SuppressWarnings("serial") |
| AttributeSensor<List<Entity>> QUEUED_START_NODES = Sensors.newSensor(new TypeToken<List<Entity>>() {}, "cassandra.cluster.start.nodes.queued", |
| "Nodes queued for starting (for sequential start)"); |
| |
| AttributeSensor<Integer> SCHEMA_VERSION_COUNT = Sensors.newIntegerSensor("cassandra.cluster.schema.versions.count", |
| "Number of different schema versions in the cluster; should be 1 for a healthy cluster, 0 when off; " + |
| "2 and above indicats a Schema Disagreement Error (and keyspace access may fail)"); |
| |
| AttributeSensor<Long> READ_PENDING = Sensors.newLongSensor("cassandra.cluster.read.pending", "Current pending ReadStage tasks"); |
| AttributeSensor<Integer> READ_ACTIVE = Sensors.newIntegerSensor("cassandra.cluster.read.active", "Current active ReadStage tasks"); |
| AttributeSensor<Long> WRITE_PENDING = Sensors.newLongSensor("cassandra.cluster.write.pending", "Current pending MutationStage tasks"); |
| AttributeSensor<Integer> WRITE_ACTIVE = Sensors.newIntegerSensor("cassandra.cluster.write.active", "Current active MutationStage tasks"); |
| |
| AttributeSensor<Long> THRIFT_PORT_LATENCY_PER_NODE = Sensors.newLongSensor("cassandra.cluster.thrift.latency.perNode", "Latency for thrift port connection averaged over all nodes (ms)"); |
| AttributeSensor<Double> READS_PER_SECOND_LAST_PER_NODE = Sensors.newDoubleSensor("cassandra.reads.perSec.last.perNode", "Reads/sec (last datapoint) averaged over all nodes"); |
| AttributeSensor<Double> WRITES_PER_SECOND_LAST_PER_NODE = Sensors.newDoubleSensor("cassandra.write.perSec.last.perNode", "Writes/sec (last datapoint) averaged over all nodes"); |
| AttributeSensor<Double> PROCESS_CPU_TIME_FRACTION_LAST_PER_NODE = Sensors.newDoubleSensor("cassandra.cluster.metrics.processCpuTime.fraction.perNode", "Fraction of CPU time used (percentage reported by JMX), averaged over all nodes"); |
| |
| AttributeSensor<Double> READS_PER_SECOND_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.reads.perSec.windowed.perNode", "Reads/sec (over time window) averaged over all nodes"); |
| AttributeSensor<Double> WRITES_PER_SECOND_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.writes.perSec.windowed.perNode", "Writes/sec (over time window) averaged over all nodes"); |
| AttributeSensor<Double> THRIFT_PORT_LATENCY_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.thrift.latency.windowed.perNode", "Latency for thrift port (ms, over time window) averaged over all nodes"); |
| AttributeSensor<Double> PROCESS_CPU_TIME_FRACTION_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.cluster.metrics.processCpuTime.fraction.windowed", "Fraction of CPU time used (percentage, over time window), averaged over all nodes"); |
| |
| MethodEffector<Void> UPDATE = new MethodEffector<Void>(CassandraDatacenter.class, "update"); |
| |
| org.apache.brooklyn.api.effector.Effector<String> EXECUTE_SCRIPT = Effectors.effector(DatastoreMixins.EXECUTE_SCRIPT) |
| .description("executes the given script contents using cassandra-cli") |
| .buildAbstract(); |
| |
| /** |
| * Sets the number of nodes used to seed the cluster. |
| * <p> |
| * Version 1.2.2 is buggy and requires a big delay for 2 nodes both seeds to reconcile, |
| * with 1.2.9 this seems fine, with just a few seconds' delay after starting. |
| * |
| * @see <a href="http://stackoverflow.com/questions/6770894/schemadisagreementexception/18639005" /> |
| */ |
| int DEFAULT_SEED_QUORUM = 2; |
| |
| /** |
| * Can insert a delay after the first node comes up. |
| * <p> |
| * Reportedly not needed with 1.2.9, but we are still seeing some seed failures so re-introducing it. |
| * (This does not seem to help with the bug in 1.2.2.) |
| */ |
| Duration DELAY_AFTER_FIRST = Duration.ONE_MINUTE; |
| |
| /** |
| * If set (ie non-null), this waits the indicated time after a successful launch of one node |
| * before starting the next. (If it is null, all nodes start simultaneously, |
| * possibly after the DELAY_AFTER_FIRST.) |
| * <p> |
| * When subsequent nodes start simultaneously, we occasionally see schema disagreement problems; |
| * if nodes start sequentially, we occasionally get "no sources for (tokenRange]" problems. |
| * Either way the node stops. Ideally this can be solved at the Cassandra level, |
| * but if not, we will have to introduce some restarts at the Cassandra nodes (which does seem |
| * to resolve the problems.) |
| */ |
| Duration DELAY_BETWEEN_STARTS = null; |
| |
| /** |
| * Whether to wait for the first node to start up |
| * <p> |
| * not sure whether this is needed or not. Need to test in env where not all nodes are seed nodes, |
| * what happens if non-seed nodes start before the seed nodes? |
| */ |
| boolean WAIT_FOR_FIRST = true; |
| |
| @Effector(description="Updates the cluster members") |
| void update(); |
| |
| /** |
| * The name of the cluster. |
| */ |
| String getClusterName(); |
| |
| Set<Entity> gatherPotentialSeeds(); |
| |
| Set<Entity> gatherPotentialRunningSeeds(); |
| |
| String executeScript(String commands); |
| |
| } |