blob: 2c3e66f540287d116980ccd579884a7bd388c7ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.master.replication;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.impl.Table;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.replication.ReplicationSchema;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Reads replication records from the replication table and creates work records which include
* target replication system information.
*/
public class WorkMaker {
private static final Logger log = LoggerFactory.getLogger(WorkMaker.class);
private final ServerContext context;
private AccumuloClient client;
private BatchWriter writer;
public WorkMaker(ServerContext context, AccumuloClient client) {
this.context = context;
this.client = client;
}
public void run() {
if (!ReplicationTable.isOnline(client)) {
log.debug("Replication table is not yet online");
return;
}
Span span = Trace.start("replicationWorkMaker");
try {
final Scanner s;
try {
s = ReplicationTable.getScanner(client);
if (null == writer) {
setBatchWriter(ReplicationTable.getBatchWriter(client));
}
} catch (ReplicationTableOfflineException e) {
log.warn("Replication table was online, but not anymore");
writer = null;
return;
}
// Only pull records about data that has been ingested and is ready for replication
StatusSection.limit(s);
TableConfiguration tableConf;
Text file = new Text();
for (Entry<Key,Value> entry : s) {
// Extract the useful bits from the status key
ReplicationSchema.StatusSection.getFile(entry.getKey(), file);
Table.ID tableId = ReplicationSchema.StatusSection.getTableId(entry.getKey());
log.debug("Processing replication status record for {} on table {}", file, tableId);
Status status;
try {
status = Status.parseFrom(entry.getValue().get());
} catch (InvalidProtocolBufferException e) {
log.error("Could not parse protobuf for {} from table {}", file, tableId);
continue;
}
// Don't create the record if we have nothing to do.
// TODO put this into a filter on serverside
if (!shouldCreateWork(status)) {
log.debug("Not creating work: {}", status);
continue;
}
// Get the table configuration for the table specified by the status record
tableConf = context.getServerConfFactory().getTableConfiguration(tableId);
// getTableConfiguration(String) returns null if the table no longer exists
if (null == tableConf) {
continue;
}
// Pull the relevant replication targets
// TODO Cache this instead of pulling it every time
Map<String,String> replicationTargets = getReplicationTargets(tableConf);
// If we have targets, we need to make a work record
// TODO Don't replicate if it's a only a newFile entry (nothing to replicate yet)
// -- Another scanner over the WorkSection can make this relatively cheap
if (!replicationTargets.isEmpty()) {
Span workSpan = Trace.start("createWorkMutations");
try {
addWorkRecord(file, entry.getValue(), replicationTargets, tableId);
} finally {
workSpan.stop();
}
} else {
log.warn("No configured targets for table with ID {}", tableId);
}
}
} finally {
span.stop();
}
}
protected void setBatchWriter(BatchWriter bw) {
this.writer = bw;
}
protected Map<String,String> getReplicationTargets(TableConfiguration tableConf) {
final Map<String,String> props = tableConf
.getAllPropertiesWithPrefix(Property.TABLE_REPLICATION_TARGET);
final Map<String,String> targets = new HashMap<>();
final int propKeyLength = Property.TABLE_REPLICATION_TARGET.getKey().length();
for (Entry<String,String> prop : props.entrySet()) {
targets.put(prop.getKey().substring(propKeyLength), prop.getValue());
}
return targets;
}
/**
* @return Should a Work entry be created for this status
*/
protected boolean shouldCreateWork(Status status) {
// Only creating work when there is work to do (regardless of closed status) is safe
// as long as the ReplicaSystem implementation is correctly observing
// that a file is completely replicated only when the file is closed
return StatusUtil.isWorkRequired(status);
}
protected void addWorkRecord(Text file, Value v, Map<String,String> targets,
Table.ID sourceTableId) {
log.info("Adding work records for {} to targets {}", file, targets);
try {
Mutation m = new Mutation(file);
ReplicationTarget target = new ReplicationTarget();
DataOutputBuffer buffer = new DataOutputBuffer();
Text t = new Text();
for (Entry<String,String> entry : targets.entrySet()) {
buffer.reset();
// Set up the writable
target.setPeerName(entry.getKey());
target.setRemoteIdentifier(entry.getValue());
target.setSourceTableId(sourceTableId);
target.write(buffer);
// Throw it in a text for the mutation
t.set(buffer.getData(), 0, buffer.getLength());
// Add it to the work section
WorkSection.add(m, t, v);
}
try {
writer.addMutation(m);
} catch (MutationsRejectedException e) {
log.warn("Failed to write work mutations for replication, will retry", e);
}
} catch (IOException e) {
log.warn("Failed to serialize data to Text, will retry", e);
} finally {
try {
writer.flush();
} catch (MutationsRejectedException e) {
log.warn("Failed to write work mutations for replication, will retry", e);
}
}
}
}