blob: 56851af89c92a3d7a9e4ad8ba0d69d7e01d6dfcd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.clientImpl;
import static java.util.Objects.requireNonNull;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.ReplicationOperations;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReplicationOperationsImpl implements ReplicationOperations {
private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImpl.class);
private final ClientContext context;
public ReplicationOperationsImpl(ClientContext context) {
requireNonNull(context);
this.context = context;
}
@Override
public void addPeer(final String name, final String replicaType)
throws AccumuloException, AccumuloSecurityException {
requireNonNull(name);
requireNonNull(replicaType);
context.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + name,
replicaType);
}
@Override
public void removePeer(final String name) throws AccumuloException, AccumuloSecurityException {
requireNonNull(name);
context.instanceOperations().removeProperty(Property.REPLICATION_PEERS.getKey() + name);
}
@Override
public void drain(String tableName)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
requireNonNull(tableName);
Set<String> wals = referencedFiles(tableName);
drain(tableName, wals);
}
@Override
public void drain(final String tableName, final Set<String> wals)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
requireNonNull(tableName);
final TInfo tinfo = TraceUtil.traceInfo();
final TCredentials rpcCreds = context.rpcCreds();
// Ask the manager if the table is fully replicated given these WALs, but don't poll inside the
// manager
boolean drained = false;
while (!drained) {
drained = getManagerDrain(tinfo, rpcCreds, tableName, wals);
if (!drained) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread interrupted", e);
}
}
}
}
protected boolean getManagerDrain(final TInfo tinfo, final TCredentials rpcCreds,
final String tableName, final Set<String> wals)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
return ManagerClient.execute(context,
client -> client.drainReplicationTable(tinfo, rpcCreds, tableName, wals));
}
protected TableId getTableId(AccumuloClient client, String tableName)
throws TableNotFoundException {
TableOperations tops = client.tableOperations();
if (!client.tableOperations().exists(tableName)) {
throw new TableNotFoundException(null, tableName, null);
}
String tableId = null;
while (tableId == null) {
tableId = tops.tableIdMap().get(tableName);
if (tableId == null) {
sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
}
}
return TableId.of(tableId);
}
@Override
public Set<String> referencedFiles(String tableName) throws TableNotFoundException {
requireNonNull(tableName);
log.debug("Collecting referenced files for replication of table {}", tableName);
TableId tableId = getTableId(context, tableName);
log.debug("Found id of {} for name {}", tableId, tableName);
// Get the WALs currently referenced by the table
BatchScanner metaBs = context.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
metaBs.setRanges(Collections.singleton(TabletsSection.getRange(tableId)));
metaBs.fetchColumnFamily(LogColumnFamily.NAME);
Set<String> wals = new HashSet<>();
try {
for (Entry<Key,Value> entry : metaBs) {
LogEntry logEntry = LogEntry.fromMetaWalEntry(entry);
wals.add(new Path(logEntry.filename).toString());
}
} finally {
metaBs.close();
}
// And the WALs that need to be replicated for this table
metaBs = context.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
metaBs.setRanges(Collections.singleton(ReplicationSection.getRange()));
metaBs.fetchColumnFamily(ReplicationSection.COLF);
try {
Text buffer = new Text();
for (Entry<Key,Value> entry : metaBs) {
if (tableId.equals(ReplicationSection.getTableId(entry.getKey()))) {
ReplicationSection.getFile(entry.getKey(), buffer);
wals.add(buffer.toString());
}
}
} finally {
metaBs.close();
}
return wals;
}
}