blob: cbc0272b3617ef5d33efbedabb8f1b67d2fa7abd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.entity.database.mysql;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.enricher.stock.Enrichers;
import org.apache.brooklyn.entity.group.DynamicClusterImpl;
import org.apache.brooklyn.feed.function.FunctionFeed;
import org.apache.brooklyn.feed.function.FunctionPollConfig;
import org.apache.brooklyn.util.collections.CollectionFunctionals;
import org.apache.brooklyn.util.core.ResourceUtils;
import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.core.task.TaskBuilder;
import org.apache.brooklyn.util.guava.Functionals;
import org.apache.brooklyn.util.guava.IfFunctions;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.text.StringPredicates;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.reflect.TypeToken;
// https://dev.mysql.com/doc/refman/5.7/en/replication-howto.html
// TODO SSL connection between master and slave
// TODO Promote slave to master
public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster {
private static final AttributeSensor<Boolean> NODE_REPLICATION_INITIALIZED = Sensors.newBooleanSensor("mysql.replication_initialized");
private static final String MASTER_CONFIG_URL = "classpath:///org/apache/brooklyn/entity/database/mysql/mysql_master.conf";
private static final String SLAVE_CONFIG_URL = "classpath:///org/apache/brooklyn/entity/database/mysql/mysql_slave.conf";
protected static final int MASTER_SERVER_ID = 1;
@SuppressWarnings("serial")
private static final AttributeSensor<Supplier<Integer>> SLAVE_NEXT_SERVER_ID = Sensors.newSensor(new TypeToken<Supplier<Integer>>() {},
"mysql.slave.next_server_id", "Returns the ID of the next slave server");
@SuppressWarnings("serial")
protected static final AttributeSensor<Map<String, String>> SLAVE_ID_ADDRESS_MAPPING = Sensors.newSensor(new TypeToken<Map<String, String>>() {},
"mysql.slave.id_address_mapping", "Maps slave entity IDs to SUBNET_ADDRESS, so the address is known at member remove time.");
@Override
public void init() {
super.init();
// Set id supplier in attribute so it is serialized
sensors().set(SLAVE_NEXT_SERVER_ID, new NextServerIdSupplier());
sensors().set(SLAVE_ID_ADDRESS_MAPPING, new ConcurrentHashMap<String, String>());
if (getConfig(SLAVE_PASSWORD) == null) {
sensors().set(SLAVE_PASSWORD, Identifiers.makeRandomId(8));
} else {
sensors().set(SLAVE_PASSWORD, getConfig(SLAVE_PASSWORD));
}
initSubscriptions();
}
@Override
public void rebind() {
super.rebind();
initSubscriptions();
}
private void initSubscriptions() {
subscriptions().subscribeToMembers(this, MySqlNode.SERVICE_PROCESS_IS_RUNNING, new NodeRunningListener(this));
subscriptions().subscribe(this, MEMBER_REMOVED, new MemberRemovedListener());
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected void initEnrichers() {
super.initEnrichers();
propagateMasterAttribute(MySqlNode.HOSTNAME);
propagateMasterAttribute(MySqlNode.ADDRESS);
propagateMasterAttribute(MySqlNode.SUBNET_HOSTNAME);
propagateMasterAttribute(MySqlNode.SUBNET_ADDRESS);
propagateMasterAttribute(MySqlNode.MYSQL_PORT);
propagateMasterAttribute(MySqlNode.DATASTORE_URL);
enrichers().add(Enrichers.builder()
.aggregating(MySqlNode.DATASTORE_URL)
.publishing(SLAVE_DATASTORE_URL_LIST)
.computing((Function<Collection<String>, List<String>>)(Function)Functions.identity())
.entityFilter(Predicates.not(MySqlClusterUtils.IS_MASTER))
.fromMembers()
.build());
enrichers().add(Enrichers.builder()
.aggregating(MySqlNode.QUERIES_PER_SECOND_FROM_MYSQL)
.publishing(QUERIES_PER_SECOND_FROM_MYSQL_PER_NODE)
.fromMembers()
.computingAverage()
.defaultValueForUnreportedSensors(0d)
.build());
}
private void propagateMasterAttribute(AttributeSensor<?> att) {
enrichers().add(Enrichers.builder()
.aggregating(att)
.publishing(att)
.computing(IfFunctions.ifPredicate(CollectionFunctionals.notEmpty())
.apply(CollectionFunctionals.firstElement())
.defaultValue(null))
.entityFilter(MySqlClusterUtils.IS_MASTER)
.build());
}
@Override
protected EntitySpec<?> getFirstMemberSpec() {
final EntitySpec<?> firstMemberSpec = super.getFirstMemberSpec();
if (firstMemberSpec != null) {
return applyDefaults(firstMemberSpec, Suppliers.ofInstance(MASTER_SERVER_ID), MASTER_CONFIG_URL);
}
final EntitySpec<?> memberSpec = super.getMemberSpec();
if (memberSpec != null) {
return applyDefaults(memberSpec, Suppliers.ofInstance(MASTER_SERVER_ID), MASTER_CONFIG_URL);
}
return EntitySpec.create(MySqlNode.class)
.displayName("MySql Master")
.configure(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID)
.configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, MASTER_CONFIG_URL);
}
@Override
protected EntitySpec<?> getMemberSpec() {
Supplier<Integer> serverIdSupplier = getAttribute(SLAVE_NEXT_SERVER_ID);
EntitySpec<?> spec = super.getMemberSpec();
if (spec != null) {
return applyDefaults(spec, serverIdSupplier, SLAVE_CONFIG_URL);
}
return EntitySpec.create(MySqlNode.class)
.displayName("MySql Slave")
// Slave server IDs will not be linear because getMemberSpec not always results in createNode (result discarded)
.configure(MySqlNode.MYSQL_SERVER_ID, serverIdSupplier.get())
.configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, SLAVE_CONFIG_URL);
}
private EntitySpec<?> applyDefaults(EntitySpec<?> spec, Supplier<Integer> serverId, String configUrl) {
boolean needsServerId = !isKeyConfigured(spec, MySqlNode.MYSQL_SERVER_ID);
boolean needsConfigUrl = !isKeyConfigured(spec, MySqlNode.TEMPLATE_CONFIGURATION_URL.getConfigKey());
if (needsServerId || needsConfigUrl) {
EntitySpec<?> clonedSpec = EntitySpec.create(spec);
if (needsServerId) {
clonedSpec.configure(MySqlNode.MYSQL_SERVER_ID, serverId.get());
}
if (needsConfigUrl) {
clonedSpec.configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, configUrl);
}
return clonedSpec;
} else {
return spec;
}
}
private boolean isKeyConfigured(EntitySpec<?> spec, ConfigKey<?> key) {
return spec.getConfig().containsKey(key) || spec.getFlags().containsKey(key.getName());
}
@Override
protected Entity createNode(Location loc, Map<?, ?> flags) {
MySqlNode node = (MySqlNode) super.createNode(loc, flags);
if (!MySqlClusterUtils.IS_MASTER.apply(node)) {
ServiceNotUpLogic.updateNotUpIndicator(node, MySqlSlave.SLAVE_HEALTHY, "Replication not started");
addFeed(FunctionFeed.builder()
.entity(node)
.period(Duration.FIVE_SECONDS)
.poll(FunctionPollConfig.forSensor(MySqlSlave.SLAVE_HEALTHY)
.callable(new SlaveStateCallable(node))
.checkSuccess(StringPredicates.isNonBlank())
.onSuccess(new SlaveStateParser(node))
.setOnFailure(false)
.description("Polls SHOW SLAVE STATUS"))
.build());
node.enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
.from(MySqlSlave.SLAVE_HEALTHY)
.computing(Functionals.ifNotEquals(true).value("Slave replication status is not healthy") )
.build());
}
return node;
}
public static class SlaveStateCallable implements Callable<String> {
private MySqlNode slave;
public SlaveStateCallable(MySqlNode slave) {
this.slave = slave;
}
@Override
public String call() throws Exception {
if (Boolean.TRUE.equals(slave.getAttribute(MySqlNode.SERVICE_PROCESS_IS_RUNNING))) {
return MySqlClusterUtils.executeSqlOnNode(slave, "SHOW SLAVE STATUS \\G");
} else {
return null;
}
}
}
public static class SlaveStateParser implements Function<String, Boolean> {
private Entity slave;
public SlaveStateParser(Entity slave) {
this.slave = slave;
}
@Override
public Boolean apply(String result) {
Map<String, String> status = MySqlRowParser.parseSingle(result);
String secondsBehindMaster = status.get("Seconds_Behind_Master");
if (secondsBehindMaster != null && !"NULL".equals(secondsBehindMaster)) {
slave.sensors().set(MySqlSlave.SLAVE_SECONDS_BEHIND_MASTER, new Integer(secondsBehindMaster));
}
return "Yes".equals(status.get("Slave_IO_Running")) && "Yes".equals(status.get("Slave_SQL_Running"));
}
}
private static class NextServerIdSupplier implements Supplier<Integer> {
private AtomicInteger nextId = new AtomicInteger(MASTER_SERVER_ID+1);
@Override
public Integer get() {
return nextId.getAndIncrement();
}
}
// ============= Member Init =============
// The task is executed separately from the start effector, so failing here
// will not fail the start effector as well, but it will eventually time out
// because replication is not started.
// Would be nice to be able to plug in to the entity lifecycle!
private static final class NodeRunningListener implements SensorEventListener<Boolean> {
private MySqlCluster cluster;
private Semaphore lock = new Semaphore(1);
public NodeRunningListener(MySqlCluster cluster) {
this.cluster = cluster;
}
@Override
public void onEvent(SensorEvent<Boolean> event) {
final MySqlNode node = (MySqlNode) event.getSource();
if (Boolean.TRUE.equals(event.getValue()) &&
// We are interested in SERVICE_PROCESS_IS_RUNNING only while haven't come online yet.
// Probably will get several updates while replication is initialized so an additional
// check is needed whether we have already seen this.
Boolean.FALSE.equals(node.getAttribute(MySqlNode.SERVICE_UP)) &&
!Boolean.TRUE.equals(node.getAttribute(NODE_REPLICATION_INITIALIZED))) {
// Events executed sequentially so no need to synchronize here.
node.sensors().set(NODE_REPLICATION_INITIALIZED, Boolean.TRUE);
final Runnable nodeInitTaskBody;
if (MySqlClusterUtils.IS_MASTER.apply(node)) {
nodeInitTaskBody = new InitMasterTaskBody(cluster, node);
} else {
nodeInitTaskBody = new InitSlaveTaskBody(cluster, node, lock);
}
DynamicTasks.submitTopLevelTask(TaskBuilder.builder()
.displayName("setup master-slave replication")
.body(nodeInitTaskBody)
.tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
.build(),
node);
}
}
}
private static class InitMasterTaskBody implements Runnable {
private MySqlNode master;
private MySqlCluster cluster;
public InitMasterTaskBody(MySqlCluster cluster, MySqlNode master) {
this.cluster = cluster;
this.master = master;
}
@Override
public void run() {
String binLogInfo = MySqlClusterUtils.executeSqlOnNode(master, "FLUSH TABLES WITH READ LOCK;SHOW MASTER STATUS \\G UNLOCK TABLES;");
Map<String, String> status = MySqlRowParser.parseSingle(binLogInfo);
String file = status.get("File");
String position = status.get("Position");
if (file != null && position != null) {
cluster.sensors().set(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT, new ReplicationSnapshot(null, null, file, Integer.parseInt(position)));
}
//NOTE: Will be executed on each start, analogously to the standard CREATION_SCRIPT config
String creationScript = getDatabaseCreationScriptAsString(master);
if (creationScript != null) {
master.invoke(MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of("commands", creationScript));
}
}
@Nullable private static String getDatabaseCreationScriptAsString(Entity entity) {
String url = entity.getConfig(MySqlMaster.MASTER_CREATION_SCRIPT_URL);
if (!Strings.isBlank(url))
return new ResourceUtils(entity).getResourceAsString(url);
String contents = entity.getConfig(MySqlMaster.MASTER_CREATION_SCRIPT_CONTENTS);
if (!Strings.isBlank(contents))
return contents;
return null;
}
}
// ============= Member Remove =============
public class MemberRemovedListener implements SensorEventListener<Entity> {
@Override
public void onEvent(SensorEvent<Entity> event) {
MySqlCluster cluster = (MySqlCluster) event.getSource();
Entity node = event.getValue();
String slaveAddress = cluster.getAttribute(SLAVE_ID_ADDRESS_MAPPING).remove(node.getId());
if (slaveAddress != null) {
// Could already be gone if stopping the entire app - let it throw an exception
MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), MySqlClusterUtils.IS_MASTER);
String username = MySqlClusterUtils.validateSqlParam(cluster.getConfig(SLAVE_USERNAME));
MySqlClusterUtils.executeSqlOnNodeAsync(master, String.format("DROP USER '%s'@'%s';", username, slaveAddress));
}
}
}
}