blob: bf0e0d4fcbc2cf682adb469bba2a450246b8e24f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.master.replication;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.util.Collection;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.WorkAssigner;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Common methods for {@link WorkAssigner}s
*/
public abstract class DistributedWorkQueueWorkAssigner implements WorkAssigner {
private static final Logger log = LoggerFactory.getLogger(DistributedWorkQueueWorkAssigner.class);
protected boolean isWorkRequired(Status status) {
return StatusUtil.isWorkRequired(status);
}
protected AccumuloClient client;
protected AccumuloConfiguration conf;
protected DistributedWorkQueue workQueue;
protected int maxQueueSize;
protected ZooCache zooCache;
/*
* Getters/setters for testing purposes
*/
protected AccumuloClient getClient() {
return client;
}
protected void setClient(AccumuloClient client) {
this.client = client;
}
protected AccumuloConfiguration getConf() {
return conf;
}
protected void setConf(AccumuloConfiguration conf) {
this.conf = conf;
}
protected DistributedWorkQueue getWorkQueue() {
return workQueue;
}
protected void setWorkQueue(DistributedWorkQueue workQueue) {
this.workQueue = workQueue;
}
protected int getMaxQueueSize() {
return maxQueueSize;
}
protected void setMaxQueueSize(int maxQueueSize) {
this.maxQueueSize = maxQueueSize;
}
protected ZooCache getZooCache() {
return zooCache;
}
protected void setZooCache(ZooCache zooCache) {
this.zooCache = zooCache;
}
/**
* Initialize the DistributedWorkQueue using the proper ZK location
*/
protected void initializeWorkQueue(AccumuloConfiguration conf) {
workQueue = new DistributedWorkQueue(
ZooUtil.getRoot(client.getInstanceID()) + ReplicationConstants.ZOO_WORK_QUEUE, conf);
}
@Override
public void configure(AccumuloConfiguration conf, AccumuloClient client) {
this.conf = conf;
this.client = client;
}
@Override
public void assignWork() {
if (null == workQueue) {
initializeWorkQueue(conf);
}
initializeQueuedWork();
if (null == zooCache) {
zooCache = new ZooCache(workQueue.getZooReaderWriter());
}
// Get the maximum number of entries we want to queue work for (or the default)
this.maxQueueSize = conf.getCount(Property.REPLICATION_MAX_WORK_QUEUE);
// Scan over the work records, adding the work to the queue
createWork();
// Keep the state of the work we queued correct
cleanupFinishedWork();
}
/**
* Scan over the {@link WorkSection} of the replication table adding work for entries that have
* data to replicate and have not already been queued.
*/
protected void createWork() {
// Create a scanner over the replication table's order entries
Scanner s;
try {
s = ReplicationTable.getScanner(client);
} catch (ReplicationTableOfflineException e) {
// no work to do; replication is off
return;
}
OrderSection.limit(s);
Text buffer = new Text();
for (Entry<Key,Value> orderEntry : s) {
// If we're not working off the entries, we need to not shoot ourselves in the foot by
// continuing
// to add more work entries
if (getQueueSize() > maxQueueSize) {
log.warn("Queued replication work exceeds configured maximum ({}),"
+ " sleeping to allow work to occur", maxQueueSize);
return;
}
String file = OrderSection.getFile(orderEntry.getKey(), buffer);
OrderSection.getTableId(orderEntry.getKey(), buffer);
String sourceTableId = buffer.toString();
log.info("Determining if {} from {} needs to be replicated", file, sourceTableId);
Scanner workScanner;
try {
workScanner = ReplicationTable.getScanner(client);
} catch (ReplicationTableOfflineException e) {
log.warn("Replication table is offline. Will retry...");
sleepUninterruptibly(5, TimeUnit.SECONDS);
return;
}
WorkSection.limit(workScanner);
workScanner.setRange(Range.exact(file));
int newReplicationTasksSubmitted = 0, workEntriesRead = 0;
// For a file, we can concurrently replicate it to multiple targets
for (Entry<Key,Value> workEntry : workScanner) {
workEntriesRead++;
Status status;
try {
status = StatusUtil.fromValue(workEntry.getValue());
} catch (InvalidProtocolBufferException e) {
log.warn("Could not deserialize protobuf from work entry for {} to {}, will retry", file,
ReplicationTarget.from(workEntry.getKey().getColumnQualifier()), e);
continue;
}
// Get the ReplicationTarget for this Work record
ReplicationTarget target = WorkSection.getTarget(workEntry.getKey(), buffer);
// Get the file (if any) currently being replicated to the given peer for the given source
// table
Collection<String> keysBeingReplicated = getQueuedWork(target);
Path p = new Path(file);
String filename = p.getName();
String key = DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename, target);
if (!shouldQueueWork(target)) {
if (!isWorkRequired(status) && keysBeingReplicated.contains(key)) {
log.debug("Removing {} from replication state to {} because replication is complete",
key, target.getPeerName());
this.removeQueuedWork(target, key);
}
continue;
}
// If there is work to do
if (isWorkRequired(status)) {
if (queueWork(p, target)) {
newReplicationTasksSubmitted++;
}
} else {
log.debug("Not queueing work for {} to {} because {} doesn't need replication", file,
target, ProtobufUtil.toString(status));
if (keysBeingReplicated.contains(key)) {
log.debug("Removing {} from replication state to {} because replication is complete",
key, target.getPeerName());
this.removeQueuedWork(target, key);
}
}
}
log.debug("Read {} replication entries from the WorkSection of the replication table",
workEntriesRead);
log.info("Assigned {} replication work entries for {}", newReplicationTasksSubmitted, file);
}
}
/**
* @return Can replication work for the given {@link ReplicationTarget} be submitted to be worked
* on.
*/
protected abstract boolean shouldQueueWork(ReplicationTarget target);
/**
* @return the size of the queued work
*/
protected abstract int getQueueSize();
/**
* Set up any internal state before using the WorkAssigner
*/
protected abstract void initializeQueuedWork();
/**
* Queue the given work for the target
*
* @param path
* File to replicate
* @param target
* Target for the work
* @return True if the work was queued, false otherwise
*/
protected abstract boolean queueWork(Path path, ReplicationTarget target);
/**
* @param target
* Target for the work
* @return Queued work for the given target
*/
protected abstract Set<String> getQueuedWork(ReplicationTarget target);
/**
* Remove the given work from the internal state
*/
protected abstract void removeQueuedWork(ReplicationTarget target, String queueKey);
/**
* Remove finished replication work from the internal state
*/
protected abstract void cleanupFinishedWork();
}