blob: 70e29d74e16963e7fc3f10b80a53f24248f0ed83 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.entity.database.mysql;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.core.effector.EffectorTasks;
import org.apache.brooklyn.core.effector.Effectors;
import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks;
import org.apache.brooklyn.core.entity.EntityPredicates;
import org.apache.brooklyn.core.sensor.DependentConfiguration;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.entity.database.mysql.MySqlNode.ExportDumpEffector;
import org.apache.brooklyn.entity.software.base.SoftwareProcess;
import org.apache.brooklyn.location.ssh.SshMachineLocation;
import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.core.task.TaskTags;
import org.apache.brooklyn.util.core.task.ssh.SshTasks;
import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory;
import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.os.Os;
import org.apache.brooklyn.util.ssh.BashCommands;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.text.StringEscapes.BashStringEscapes;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.concurrent.ConcurrentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
public class InitSlaveTaskBody implements Runnable {
private static final String SNAPSHOT_DUMP_OPTIONS = "--skip-lock-tables --single-transaction --flush-logs --hex-blob";
private static final Logger log = LoggerFactory.getLogger(InitSlaveTaskBody.class);
private final MySqlCluster cluster;
private final MySqlNode slave;
private Semaphore lock;
public InitSlaveTaskBody(MySqlCluster cluster, MySqlNode slave, Semaphore lock) {
this.cluster = cluster;
this.slave = slave;
this.lock = lock;
}
@Override
public void run() {
// Replication init state consists of:
// * initial dump (optional)
// * location of initial dump (could be on any of the members, optional)
// * bin log file name
// * bin log position
// 1. Check replication state:
// * Does the dump exist (and the machine where it is located)
// * Does the bin log exist on the master
// 2. If the replication state is not valid create a new one
// * Select a slave to dump, master if no slaves
// * If it's a slave do 'STOP SLAVE SQL_THREAD;'
// * Call mysqldump to create the snapshot
// * When done if a slave do 'START SLAVE SQL_THREAD;'
// * Get master state from the dump - grep "MASTER_LOG_POS" dump.sql.
// If slave get state from 'SHOW SLAVE STATUS'
// * Save new init info in cluster - bin log name, position, dump
// 3. Init Slave
// * transfer dump to new slave (if dump exists)
// * import - mysql < ~/dump.sql
// * change master to and start slave
//!!! Caveat if dumping from master and MyISAM tables are used dump may be inconsistent.
// * Only way around it is to lock the database while dumping (or taking a snapshot through LVM which is quicker)
bootstrapSlaveAsync(getValidReplicationInfo(), slave);
cluster.getAttribute(MySqlClusterImpl.SLAVE_ID_ADDRESS_MAPPING).put(slave.getId(), slave.getAttribute(MySqlNode.SUBNET_ADDRESS));
}
private MySqlNode getMaster() {
return (MySqlNode) Iterables.find(cluster.getMembers(), MySqlClusterUtils.IS_MASTER);
}
private void bootstrapSlaveAsync(final Future<ReplicationSnapshot> replicationInfoFuture, final MySqlNode slave) {
DynamicTasks.queue("bootstrap slave replication", new Runnable() {
@Override
public void run() {
ReplicationSnapshot replicationSnapshot;
try {
replicationSnapshot = replicationInfoFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw Exceptions.propagate(e);
}
MySqlNode master = getMaster();
String masterAddress = MySqlClusterUtils.validateSqlParam(master.getAttribute(MySqlNode.SUBNET_ADDRESS));
Integer masterPort = master.getAttribute(MySqlNode.MYSQL_PORT);
String slaveAddress = MySqlClusterUtils.validateSqlParam(slave.getAttribute(MySqlNode.SUBNET_ADDRESS));
String username = MySqlClusterUtils.validateSqlParam(cluster.getConfig(MySqlCluster.SLAVE_USERNAME));
String password = MySqlClusterUtils.validateSqlParam(cluster.getAttribute(MySqlCluster.SLAVE_PASSWORD));
if (replicationSnapshot.getEntityId() != null) {
Entity sourceEntity = Iterables.find(cluster.getMembers(), EntityPredicates.idEqualTo(replicationSnapshot.getEntityId()));
String dumpId = FilenameUtils.removeExtension(replicationSnapshot.getSnapshotPath());
copyDumpAsync(sourceEntity, slave, replicationSnapshot.getSnapshotPath(), dumpId);
DynamicTasks.queue(Effectors.invocation(slave, MySqlNode.IMPORT_DUMP, ImmutableMap.of("path", replicationSnapshot.getSnapshotPath())));
//The dump resets the password to whatever is on the source instance, reset it back.
//We are able to still login because privileges are not flushed, so we just set the password to the same value.
DynamicTasks.queue(Effectors.invocation(slave, MySqlNode.CHANGE_PASSWORD, ImmutableMap.of("password", slave.getAttribute(MySqlNode.PASSWORD)))); //
//Flush privileges to load new users coming from the dump
MySqlClusterUtils.executeSqlOnNodeAsync(slave, "FLUSH PRIVILEGES;");
}
MySqlClusterUtils.executeSqlOnNodeAsync(master, String.format(
"CREATE USER '%s'@'%s' IDENTIFIED BY '%s';\n" +
"GRANT REPLICATION SLAVE ON *.* TO '%s'@'%s';\n",
username, slaveAddress, password, username, slaveAddress));
// Executing this will unblock SERVICE_UP wait in the start effector
String slaveCmd = String.format(
"CHANGE MASTER TO " +
"MASTER_HOST='%s', " +
"MASTER_PORT=%d, " +
"MASTER_USER='%s', " +
"MASTER_PASSWORD='%s', " +
"MASTER_LOG_FILE='%s', " +
"MASTER_LOG_POS=%d;\n" +
"START SLAVE;\n",
masterAddress, masterPort,
username, password,
replicationSnapshot.getBinLogName(),
replicationSnapshot.getBinLogPosition());
MySqlClusterUtils.executeSqlOnNodeAsync(slave, slaveCmd);
}
});
}
private void copyDumpAsync(Entity source, Entity dest, String sourceDumpPath, String dumpId) {
final SshMachineLocation sourceMachine = EffectorTasks.getSshMachine(source);
final SshMachineLocation destMachine = EffectorTasks.getSshMachine(dest);
String sourceRunDir = source.getAttribute(MySqlNode.RUN_DIR);
String privateKeyFile = dumpId + ".id_rsa";
final Task<String> tempKeyTask = DynamicTasks.queue(SshEffectorTasks.ssh(
"cd $RUN_DIR",
"PRIVATE_KEY=" + privateKeyFile,
"ssh-keygen -t rsa -N '' -f $PRIVATE_KEY -C " + dumpId + " > /dev/null",
"cat $PRIVATE_KEY.pub")
.environmentVariable("RUN_DIR", sourceRunDir)
.machine(sourceMachine)
.summary("generate private key for slave access")
.requiringZeroAndReturningStdout())
.asTask();
DynamicTasks.queue("add key to authorized_keys", new Runnable() {
@Override
public void run() {
String publicKey = tempKeyTask.getUnchecked();
DynamicTasks.queue(SshEffectorTasks.ssh(String.format(
"cat >> ~/.ssh/authorized_keys <<EOF\n%s\nEOF",
publicKey))
.machine(destMachine)
.summary("Add key to authorized_keys")
.requiringExitCodeZero());
}
});
final ProcessTaskWrapper<Integer> copyTask = SshEffectorTasks.ssh(
"cd $RUN_DIR",
String.format(
"scp -o 'BatchMode yes' -o 'StrictHostKeyChecking no' -i '%s' '%s' '%s@%s:%s/%s.sql'",
privateKeyFile,
sourceDumpPath,
destMachine.getUser(),
dest.getAttribute(MySqlNode.SUBNET_ADDRESS),
dest.getAttribute(MySqlNode.RUN_DIR),
dumpId))
.environmentVariable("RUN_DIR", sourceRunDir)
.machine(sourceMachine)
.summary("copy database dump to slave")
.newTask();
// Let next couple of tasks complete even if this one fails so that we can clean up.
TaskTags.markInessential(copyTask);
DynamicTasks.queue(copyTask);
// Delete private key
DynamicTasks.queue(SshEffectorTasks.ssh(
"cd $RUN_DIR",
"rm " + privateKeyFile)
.environmentVariable("RUN_DIR", sourceRunDir)
.machine(sourceMachine)
.summary("remove private key"));
DynamicTasks.queue(SshEffectorTasks.ssh(String.format(
"sed -i'' -e '/%s/d' ~/.ssh/authorized_keys",
dumpId))
.machine(destMachine)
.summary("remove private key from authorized_keys")).asTask();
// The task will fail if copyTask fails, but only after the private key is deleted.
DynamicTasks.queue("check for successful copy", new Runnable() {
@Override
public void run() {
copyTask.asTask().getUnchecked();
}
});
}
private Future<ReplicationSnapshot> getValidReplicationInfo() {
try {
try {
lock.acquire();
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
ReplicationSnapshot replicationSnapshot = getReplicationInfoMasterConfig();
if (replicationSnapshot == null) {
replicationSnapshot = getAttributeBlocking(cluster, MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT);
}
if (!isReplicationInfoValid(replicationSnapshot)) {
final MySqlNode snapshotNode = getSnapshotNode();
final String dumpName = getDumpUniqueId() + ".sql";
if (MySqlClusterUtils.IS_MASTER.apply(snapshotNode)) {
return createMasterReplicationSnapshot(snapshotNode, dumpName);
} else {
return createSlaveReplicationSnapshot(snapshotNode, dumpName);
}
}
return ConcurrentUtils.constantFuture(replicationSnapshot);
} finally {
lock.release();
}
}
/**
* Rebind backwards compatibility
* @deprecated since 0.9.0
*/
@Deprecated
private ReplicationSnapshot getReplicationInfoMasterConfig() {
Entity master = getMaster();
AttributeSensor<String> MASTER_LOG_FILE = Sensors.newStringSensor(
"mysql.master.log_file", "The binary log file master is writing to");
AttributeSensor<Integer> MASTER_LOG_POSITION = Sensors.newIntegerSensor(
"mysql.master.log_position", "The position in the log file to start replication");
String logFile = master.sensors().get(MASTER_LOG_FILE);
Integer logPos = master.sensors().get(MASTER_LOG_POSITION);
if(logFile != null && logPos != null) {
ReplicationSnapshot replicationSnapshot = new ReplicationSnapshot(null, null, logFile, logPos);
cluster.sensors().set(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT, replicationSnapshot);
master.sensors().set(MASTER_LOG_FILE, null);
master.sensors().set(MASTER_LOG_POSITION, null);
return replicationSnapshot;
}
return null;
}
private Future<ReplicationSnapshot> createMasterReplicationSnapshot(final MySqlNode master, final String dumpName) {
log.info("MySql cluster " + cluster + ": generating new replication snapshot on master node " + master + " with name " + dumpName);
String dumpOptions = SNAPSHOT_DUMP_OPTIONS + " --master-data=2" + getDumpDatabases(master);
ImmutableMap<String, String> params = ImmutableMap.of(
ExportDumpEffector.PATH.getName(), dumpName,
ExportDumpEffector.ADDITIONAL_OPTIONS.getName(), dumpOptions);
DynamicTasks.queue(Effectors.invocation(master, MySqlNode.EXPORT_DUMP, params));
return DynamicTasks.queue("get master log info from dump", new Callable<ReplicationSnapshot>() {
@Override
public ReplicationSnapshot call() throws Exception {
Pattern masterInfoPattern = Pattern.compile("CHANGE MASTER TO.*MASTER_LOG_FILE\\s*=\\s*'([^']+)'.*MASTER_LOG_POS\\s*=\\s*(\\d+)");
String masterInfo = DynamicTasks.queue(execSshTask(master, "grep -m1 'CHANGE MASTER TO' " + dumpName, "Extract master replication status from dump")
.requiringZeroAndReturningStdout()).asTask().getUnchecked();
Matcher masterInfoMatcher = masterInfoPattern.matcher(masterInfo);
if (!masterInfoMatcher.find() || masterInfoMatcher.groupCount() != 2) {
throw new IllegalStateException("Master dump doesn't contain replication info: " + masterInfo);
}
String masterLogFile = masterInfoMatcher.group(1);
int masterLogPosition = Integer.parseInt(masterInfoMatcher.group(2));
ReplicationSnapshot replicationSnapshot = new ReplicationSnapshot(master.getId(), dumpName, masterLogFile, masterLogPosition);
cluster.sensors().set(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT, replicationSnapshot);
return replicationSnapshot;
}
});
}
private String getDumpDatabases(MySqlNode node) {
// The config will be inherited from the cluster
Collection<String> dumpDbs = node.config().get(MySqlCluster.SLAVE_REPLICATE_DUMP_DB);
if (dumpDbs != null && !dumpDbs.isEmpty()) {
return " --databases " + Joiner.on(' ').join(Iterables.transform(dumpDbs, BashStringEscapes.wrapBash()));
} else {
return " --all-databases";
}
}
private Future<ReplicationSnapshot> createSlaveReplicationSnapshot(final MySqlNode slave, final String dumpName) {
MySqlClusterUtils.executeSqlOnNodeAsync(slave, "STOP SLAVE SQL_THREAD;");
try {
log.info("MySql cluster " + cluster + ": generating new replication snapshot on slave node " + slave + " with name " + dumpName);
String dumpOptions = SNAPSHOT_DUMP_OPTIONS + getDumpDatabases(slave);
ImmutableMap<String, String> params = ImmutableMap.of(
ExportDumpEffector.PATH.getName(), dumpName,
ExportDumpEffector.ADDITIONAL_OPTIONS.getName(), dumpOptions);
DynamicTasks.queue(Effectors.invocation(slave, MySqlNode.EXPORT_DUMP, params));
return DynamicTasks.queue("get master log info from slave", new Callable<ReplicationSnapshot>() {
@Override
public ReplicationSnapshot call() throws Exception {
String slaveStatusRow = slave.executeScript("SHOW SLAVE STATUS \\G");
Map<String, String> slaveStatus = MySqlRowParser.parseSingle(slaveStatusRow);
String masterLogFile = slaveStatus.get("Relay_Master_Log_File");
int masterLogPosition = Integer.parseInt(slaveStatus.get("Exec_Master_Log_Pos"));
ReplicationSnapshot replicationSnapshot = new ReplicationSnapshot(slave.getId(), dumpName, masterLogFile, masterLogPosition);
cluster.sensors().set(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT, replicationSnapshot);
return replicationSnapshot;
}
});
} finally {
MySqlClusterUtils.executeSqlOnNodeAsync(slave, "START SLAVE SQL_THREAD;");
}
}
private MySqlNode getSnapshotNode() {
String snapshotNodeId = cluster.getConfig(MySqlCluster.REPLICATION_PREFERRED_SOURCE);
if (snapshotNodeId != null) {
Optional<Entity> preferredNode = Iterables.tryFind(cluster.getMembers(), EntityPredicates.idEqualTo(snapshotNodeId));
if (preferredNode.isPresent()) {
return (MySqlNode) preferredNode.get();
} else {
log.warn("MySql cluster " + this + " configured with preferred snapshot node " + snapshotNodeId + " but it's not a member. Defaulting to a random slave.");
}
}
return getRandomSlave();
}
private MySqlNode getRandomSlave() {
List<MySqlNode> slaves = getHealhtySlaves();
if (slaves.size() > 0) {
return slaves.get(new Random().nextInt(slaves.size()));
} else {
return getMaster();
}
}
private ImmutableList<MySqlNode> getHealhtySlaves() {
return FluentIterable.from(cluster.getMembers())
.filter(Predicates.not(MySqlClusterUtils.IS_MASTER))
.filter(EntityPredicates.attributeEqualTo(MySqlNode.SERVICE_UP, Boolean.TRUE))
.filter(MySqlNode.class)
.toList();
}
private boolean isReplicationInfoValid(ReplicationSnapshot replicationSnapshot) {
MySqlNode master = getMaster();
String dataDir = Strings.nullToEmpty(master.getConfig(MySqlNode.DATA_DIR));
if (!checkFileExistsOnEntity(master, Os.mergePathsUnix(dataDir, replicationSnapshot.getBinLogName()))) {
return false;
}
if (replicationSnapshot.getEntityId() != null) {
Optional<Entity> snapshotSlave = Iterables.tryFind(cluster.getChildren(), EntityPredicates.idEqualTo(replicationSnapshot.getEntityId()));
if (!snapshotSlave.isPresent()) {
log.info("MySql cluster " + cluster + " missing node " + replicationSnapshot.getEntityId() + " with last snapshot " + replicationSnapshot.getSnapshotPath() + ". Will generate new snapshot.");
return false;
}
if (!checkFileExistsOnEntity(snapshotSlave.get(), replicationSnapshot.getSnapshotPath())) {
log.info("MySql cluster " + cluster + ", node " + snapshotSlave.get() + " missing replication snapshot " + replicationSnapshot.getSnapshotPath() + ". Will generate new snapshot.");
return false;
}
}
return true;
}
private boolean checkFileExistsOnEntity(Entity entity, String path) {
String cmd = BashCommands.chain(
BashCommands.requireTest(String.format("-f \"%s\"", path), "File " + path + " doesn't exist."));
String summary = "Check if file " + path + " exists";
return DynamicTasks.queue(execSshTask(entity, cmd, summary).allowingNonZeroExitCode()).asTask().getUnchecked() == 0;
}
private ProcessTaskFactory<Integer> execSshTask(Entity entity, String cmd, String summary) {
SshMachineLocation machine = EffectorTasks.getSshMachine(entity);
return SshTasks.newSshExecTaskFactory(machine, "cd $RUN_DIR\n" + cmd)
.allowingNonZeroExitCode()
.environmentVariable("RUN_DIR", entity.getAttribute(SoftwareProcess.RUN_DIR))
.summary(summary);
}
private <T> T getAttributeBlocking(Entity masterNode, AttributeSensor<T> att) {
return DynamicTasks.queue(DependentConfiguration.attributeWhenReady(masterNode, att)).getUnchecked();
}
private String getDumpUniqueId() {
return "replication-dump-" + Identifiers.makeRandomId(8) + "-" + new SimpleDateFormat("yyyy-MM-dd--HH-mm-ss").format(new Date());
}
}