blob: 33479a5a1811ef561e7fc529cb6598c4c0832ad2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.master.replication;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.trace.TraceSamplers;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.master.Master;
import org.apache.htrace.impl.ProbabilitySampler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Daemon wrapper around the {@link WorkMaker} that separates it from the Master
*/
public class ReplicationDriver extends Daemon {
private static final Logger log = LoggerFactory.getLogger(ReplicationDriver.class);
private final Master master;
private final AccumuloConfiguration conf;
private WorkMaker workMaker;
private StatusMaker statusMaker;
private FinishedWorkUpdater finishedWorkUpdater;
private RemoveCompleteReplicationRecords rcrr;
private AccumuloClient client;
public ReplicationDriver(Master master) {
super("Replication Driver");
this.master = master;
this.conf = master.getConfiguration();
}
@Override
public void run() {
ProbabilitySampler sampler = TraceSamplers
.probabilitySampler(conf.getFraction(Property.REPLICATION_TRACE_PERCENT));
long millisToWait = conf.getTimeInMillis(Property.REPLICATION_DRIVER_DELAY);
log.debug("Waiting {}ms before starting main replication loop", millisToWait);
UtilWaitThread.sleep(millisToWait);
log.debug("Starting replication loop");
while (master.stillMaster()) {
if (null == workMaker) {
try {
client = master.getClient();
} catch (AccumuloException | AccumuloSecurityException e) {
// couldn't get a client, try again in a "short" amount of time
log.warn("Error trying to get client to process replication records", e);
UtilWaitThread.sleep(2000);
continue;
}
statusMaker = new StatusMaker(client, master.getFileSystem());
workMaker = new WorkMaker(master.getContext(), client);
finishedWorkUpdater = new FinishedWorkUpdater(client);
rcrr = new RemoveCompleteReplicationRecords(client);
}
Trace.on("masterReplicationDriver", sampler);
// Make status markers from replication records in metadata, removing entries in
// metadata which are no longer needed (closed records)
// This will end up creating the replication table too
try {
statusMaker.run();
} catch (Exception e) {
log.error("Caught Exception trying to create Replication status records", e);
}
// Tell the work maker to make work
try {
workMaker.run();
} catch (Exception e) {
log.error("Caught Exception trying to create Replication work records", e);
}
// Update the status records from the work records
try {
finishedWorkUpdater.run();
} catch (Exception e) {
log.error(
"Caught Exception trying to update Replication records using finished work records", e);
}
// Clean up records we no longer need.
// It must be running at the same time as the StatusMaker or WorkMaker
// So it's important that we run these sequentially and not concurrently
try {
rcrr.run();
} catch (Exception e) {
log.error("Caught Exception trying to remove finished Replication records", e);
}
Trace.off();
// Sleep for a bit
long sleepMillis = conf.getTimeInMillis(Property.MASTER_REPLICATION_SCAN_INTERVAL);
log.debug("Sleeping for {}ms before re-running", sleepMillis);
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
log.error("Interrupted while sleeping", e);
}
}
}
}