blob: 96919cb0a58f4d7aa4e0d3931c64733988b650b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.manager;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
import org.apache.accumulo.core.clientImpl.AuthenticationTokenIdentifier;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.DelegationTokenConfigSerializer;
import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.DeprecatedPropertyUtil;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.master.thrift.ManagerClientService;
import org.apache.accumulo.core.master.thrift.ManagerGoalState;
import org.apache.accumulo.core.master.thrift.ManagerMonitorInfo;
import org.apache.accumulo.core.master.thrift.ManagerState;
import org.apache.accumulo.core.master.thrift.TabletLoadState;
import org.apache.accumulo.core.master.thrift.TabletSplit;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.metadata.schema.TabletDeletedException;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.securityImpl.thrift.TDelegationToken;
import org.apache.accumulo.core.securityImpl.thrift.TDelegationTokenConfig;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.manager.tableOps.TraceRepo;
import org.apache.accumulo.manager.tserverOps.ShutdownTServer;
import org.apache.accumulo.server.client.ClientServiceHandler;
import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretManager;
import org.apache.accumulo.server.util.NamespacePropUtil;
import org.apache.accumulo.server.util.SystemPropUtil;
import org.apache.accumulo.server.util.TablePropUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.thrift.TException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.InvalidProtocolBufferException;
public class ManagerClientServiceHandler extends FateServiceHandler
implements ManagerClientService.Iface {
private static final Logger log = Manager.log;
private static final Logger drainLog =
LoggerFactory.getLogger("org.apache.accumulo.manager.ManagerDrainImpl");
protected ManagerClientServiceHandler(Manager manager) {
super(manager);
}
@Override
public long initiateFlush(TInfo tinfo, TCredentials c, String tableIdStr)
throws ThriftSecurityException, ThriftTableOperationException {
TableId tableId = TableId.of(tableIdStr);
NamespaceId namespaceId = getNamespaceIdFromTableId(TableOperation.FLUSH, tableId);
if (!manager.security.canFlush(c, tableId, namespaceId))
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
String zTablePath = Constants.ZROOT + "/" + manager.getInstanceID() + Constants.ZTABLES + "/"
+ tableId + Constants.ZTABLE_FLUSH_ID;
ZooReaderWriter zoo = manager.getContext().getZooReaderWriter();
byte[] fid;
try {
fid = zoo.mutateExisting(zTablePath, currentValue -> {
long flushID = Long.parseLong(new String(currentValue, UTF_8));
return Long.toString(flushID + 1).getBytes(UTF_8);
});
} catch (NoNodeException nne) {
throw new ThriftTableOperationException(tableId.canonical(), null, TableOperation.FLUSH,
TableOperationExceptionType.NOTFOUND, null);
} catch (Exception e) {
Manager.log.warn("{}", e.getMessage(), e);
throw new ThriftTableOperationException(tableId.canonical(), null, TableOperation.FLUSH,
TableOperationExceptionType.OTHER, null);
}
return Long.parseLong(new String(fid));
}
@Override
public void waitForFlush(TInfo tinfo, TCredentials c, String tableIdStr, ByteBuffer startRowBB,
ByteBuffer endRowBB, long flushID, long maxLoops)
throws ThriftSecurityException, ThriftTableOperationException {
TableId tableId = TableId.of(tableIdStr);
NamespaceId namespaceId = getNamespaceIdFromTableId(TableOperation.FLUSH, tableId);
if (!manager.security.canFlush(c, tableId, namespaceId))
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
Text startRow = ByteBufferUtil.toText(startRowBB);
Text endRow = ByteBufferUtil.toText(endRowBB);
if (endRow != null && startRow != null && startRow.compareTo(endRow) >= 0)
throw new ThriftTableOperationException(tableId.canonical(), null, TableOperation.FLUSH,
TableOperationExceptionType.BAD_RANGE, "start row must be less than end row");
Set<TServerInstance> serversToFlush = new HashSet<>(manager.tserverSet.getCurrentServers());
for (long l = 0; l < maxLoops; l++) {
for (TServerInstance instance : serversToFlush) {
try {
final TServerConnection server = manager.tserverSet.getConnection(instance);
if (server != null)
server.flush(manager.managerLock, tableId, ByteBufferUtil.toBytes(startRowBB),
ByteBufferUtil.toBytes(endRowBB));
} catch (TException ex) {
Manager.log.error(ex.toString());
}
}
if (tableId.equals(RootTable.ID))
break; // this code does not properly handle the root tablet. See #798
if (l == maxLoops - 1)
break;
sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
serversToFlush.clear();
try (TabletsMetadata tablets =
TabletsMetadata.builder().forTable(tableId).overlapping(startRow, endRow)
.fetch(FLUSH_ID, LOCATION, LOGS, PREV_ROW).build(manager.getContext())) {
int tabletsToWaitFor = 0;
int tabletCount = 0;
for (TabletMetadata tablet : tablets) {
int logs = tablet.getLogs().size();
// when tablet is not online and has no logs, there is no reason to wait for it
if ((tablet.hasCurrent() || logs > 0) && tablet.getFlushId().orElse(-1) < flushID) {
tabletsToWaitFor++;
if (tablet.hasCurrent())
serversToFlush.add(tablet.getLocation());
}
tabletCount++;
}
if (tabletsToWaitFor == 0)
break;
// TODO detect case of table offline AND tablets w/ logs? - ACCUMULO-1296
if (tabletCount == 0 && !Tables.exists(manager.getContext(), tableId))
throw new ThriftTableOperationException(tableId.canonical(), null, TableOperation.FLUSH,
TableOperationExceptionType.NOTFOUND, null);
} catch (TabletDeletedException e) {
Manager.log.debug("Failed to scan {} table to wait for flush {}", MetadataTable.NAME,
tableId, e);
}
}
}
private NamespaceId getNamespaceIdFromTableId(TableOperation tableOp, TableId tableId)
throws ThriftTableOperationException {
NamespaceId namespaceId;
try {
namespaceId = Tables.getNamespaceId(manager.getContext(), tableId);
} catch (TableNotFoundException e) {
throw new ThriftTableOperationException(tableId.canonical(), null, tableOp,
TableOperationExceptionType.NOTFOUND, e.getMessage());
}
return namespaceId;
}
@Override
public ManagerMonitorInfo getManagerStats(TInfo info, TCredentials credentials) {
return manager.getManagerMonitorInfo();
}
@Override
public void removeTableProperty(TInfo info, TCredentials credentials, String tableName,
String property) throws ThriftSecurityException, ThriftTableOperationException {
alterTableProperty(credentials, tableName, property, null, TableOperation.REMOVE_PROPERTY);
}
@Override
public void setTableProperty(TInfo info, TCredentials credentials, String tableName,
String property, String value) throws ThriftSecurityException, ThriftTableOperationException {
alterTableProperty(credentials, tableName, property, value, TableOperation.SET_PROPERTY);
}
@Override
public void shutdown(TInfo info, TCredentials c, boolean stopTabletServers)
throws ThriftSecurityException {
if (!manager.security.canPerformSystemActions(c))
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
if (stopTabletServers) {
manager.setManagerGoalState(ManagerGoalState.CLEAN_STOP);
EventCoordinator.Listener eventListener = manager.nextEvent.getListener();
do {
eventListener.waitForEvents(Manager.ONE_SECOND);
} while (manager.tserverSet.size() > 0);
}
manager.setManagerState(ManagerState.STOP);
}
@Override
public void shutdownTabletServer(TInfo info, TCredentials c, String tabletServer, boolean force)
throws ThriftSecurityException {
if (!manager.security.canPerformSystemActions(c))
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
final TServerInstance doomed = manager.tserverSet.find(tabletServer);
if (!force) {
final TServerConnection server = manager.tserverSet.getConnection(doomed);
if (server == null) {
Manager.log.warn("No server found for name {}", tabletServer);
return;
}
}
long tid = manager.fate.startTransaction();
log.debug("Seeding FATE op to shutdown " + tabletServer + " with tid " + tid);
manager.fate.seedTransaction(tid, new TraceRepo<>(new ShutdownTServer(doomed, force)), false);
manager.fate.waitForCompletion(tid);
manager.fate.delete(tid);
log.debug("FATE op shutting down " + tabletServer + " finished");
}
@Override
public void reportSplitExtent(TInfo info, TCredentials credentials, String serverName,
TabletSplit split) {
KeyExtent oldTablet = KeyExtent.fromThrift(split.oldTablet);
if (manager.migrations.remove(oldTablet) != null) {
Manager.log.info("Canceled migration of {}", split.oldTablet);
}
for (TServerInstance instance : manager.tserverSet.getCurrentServers()) {
if (serverName.equals(instance.getHostPort())) {
manager.nextEvent.event("%s reported split %s, %s", serverName,
KeyExtent.fromThrift(split.newTablets.get(0)),
KeyExtent.fromThrift(split.newTablets.get(1)));
return;
}
}
Manager.log.warn("Got a split from a server we don't recognize: {}", serverName);
}
@Override
public void reportTabletStatus(TInfo info, TCredentials credentials, String serverName,
TabletLoadState status, TKeyExtent ttablet) {
KeyExtent tablet = KeyExtent.fromThrift(ttablet);
switch (status) {
case LOAD_FAILURE:
Manager.log.error("{} reports assignment failed for tablet {}", serverName, tablet);
break;
case LOADED:
manager.nextEvent.event("tablet %s was loaded on %s", tablet, serverName);
break;
case UNLOADED:
manager.nextEvent.event("tablet %s was unloaded from %s", tablet, serverName);
break;
case UNLOAD_ERROR:
Manager.log.error("{} reports unload failed for tablet {}", serverName, tablet);
break;
case UNLOAD_FAILURE_NOT_SERVING:
if (Manager.log.isTraceEnabled()) {
Manager.log.trace("{} reports unload failed: not serving tablet, could be a split: {}",
serverName, tablet);
}
break;
case CHOPPED:
manager.nextEvent.event("tablet %s chopped", tablet);
break;
}
}
@Override
public void setManagerGoalState(TInfo info, TCredentials c, ManagerGoalState state)
throws ThriftSecurityException {
if (!manager.security.canPerformSystemActions(c))
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
manager.setManagerGoalState(state);
}
@Override
public void removeSystemProperty(TInfo info, TCredentials c, String property)
throws ThriftSecurityException {
if (!manager.security.canPerformSystemActions(c))
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
try {
SystemPropUtil.removeSystemProperty(manager.getContext(), property);
updatePlugins(property);
} catch (Exception e) {
Manager.log.error("Problem removing config property in zookeeper", e);
throw new RuntimeException(e.getMessage());
}
}
@Override
public void setSystemProperty(TInfo info, TCredentials c, String property, String value)
throws TException {
if (!manager.security.canPerformSystemActions(c))
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
try {
SystemPropUtil.setSystemProperty(manager.getContext(), property, value);
updatePlugins(property);
} catch (IllegalArgumentException iae) {
// throw the exception here so it is not caught and converted to a generic TException
throw iae;
} catch (Exception e) {
Manager.log.error("Problem setting config property in zookeeper", e);
throw new TException(e.getMessage());
}
}
@Override
public void setNamespaceProperty(TInfo tinfo, TCredentials credentials, String ns,
String property, String value) throws ThriftSecurityException, ThriftTableOperationException {
alterNamespaceProperty(credentials, ns, property, value, TableOperation.SET_PROPERTY);
}
@Override
public void removeNamespaceProperty(TInfo tinfo, TCredentials credentials, String ns,
String property) throws ThriftSecurityException, ThriftTableOperationException {
alterNamespaceProperty(credentials, ns, property, null, TableOperation.REMOVE_PROPERTY);
}
private void alterNamespaceProperty(TCredentials c, String namespace, String property,
String value, TableOperation op)
throws ThriftSecurityException, ThriftTableOperationException {
NamespaceId namespaceId =
ClientServiceHandler.checkNamespaceId(manager.getContext(), namespace, op);
if (!manager.security.canAlterNamespace(c, namespaceId))
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
try {
if (value == null) {
NamespacePropUtil.removeNamespaceProperty(manager.getContext(), namespaceId, property);
} else {
NamespacePropUtil.setNamespaceProperty(manager.getContext(), namespaceId, property, value);
}
} catch (KeeperException.NoNodeException e) {
// race condition... namespace no longer exists? This call will throw an exception if the
// namespace was deleted:
ClientServiceHandler.checkNamespaceId(manager.getContext(), namespace, op);
log.info("Error altering namespace property", e);
throw new ThriftTableOperationException(namespaceId.canonical(), namespace, op,
TableOperationExceptionType.OTHER, "Problem altering namespaceproperty");
} catch (Exception e) {
log.error("Problem altering namespace property", e);
throw new ThriftTableOperationException(namespaceId.canonical(), namespace, op,
TableOperationExceptionType.OTHER, "Problem altering namespace property");
}
}
private void alterTableProperty(TCredentials c, String tableName, String property, String value,
TableOperation op) throws ThriftSecurityException, ThriftTableOperationException {
final TableId tableId = ClientServiceHandler.checkTableId(manager.getContext(), tableName, op);
NamespaceId namespaceId = getNamespaceIdFromTableId(op, tableId);
if (!manager.security.canAlterTable(c, tableId, namespaceId))
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
try {
if (value == null || value.isEmpty()) {
TablePropUtil.removeTableProperty(manager.getContext(), tableId, property);
} else if (!TablePropUtil.setTableProperty(manager.getContext(), tableId, property, value)) {
throw new Exception("Invalid table property.");
}
} catch (KeeperException.NoNodeException e) {
// race condition... table no longer exists? This call will throw an exception if the table
// was deleted:
ClientServiceHandler.checkTableId(manager.getContext(), tableName, op);
log.info("Error altering table property", e);
throw new ThriftTableOperationException(tableId.canonical(), tableName, op,
TableOperationExceptionType.OTHER, "Problem altering table property");
} catch (Exception e) {
log.error("Problem altering table property", e);
throw new ThriftTableOperationException(tableId.canonical(), tableName, op,
TableOperationExceptionType.OTHER, "Problem altering table property");
}
}
private void updatePlugins(String property) {
// resolve without warning; any warnings should have already occurred
String resolved = DeprecatedPropertyUtil.getReplacementName(property, (log, replacement) -> {});
if (resolved.equals(Property.MANAGER_TABLET_BALANCER.getKey())) {
manager.initializeBalancer();
log.info("tablet balancer changed to {}", manager.getBalancerClass().getName());
}
}
@Override
public void waitForBalance(TInfo tinfo) {
manager.waitForBalance();
}
@Override
public List<String> getActiveTservers(TInfo tinfo, TCredentials credentials) {
Set<TServerInstance> tserverInstances = manager.onlineTabletServers();
List<String> servers = new ArrayList<>();
for (TServerInstance tserverInstance : tserverInstances) {
servers.add(tserverInstance.getHostPort());
}
return servers;
}
@Override
public TDelegationToken getDelegationToken(TInfo tinfo, TCredentials credentials,
TDelegationTokenConfig tConfig) throws ThriftSecurityException, TException {
if (!manager.security.canObtainDelegationToken(credentials)) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
}
// Make sure we're actually generating the secrets to make delegation tokens
// Round-about way to verify that SASL is also enabled.
if (!manager.delegationTokensAvailable()) {
throw new TException("Delegation tokens are not available for use");
}
final DelegationTokenConfig config = DelegationTokenConfigSerializer.deserialize(tConfig);
final AuthenticationTokenSecretManager secretManager = manager.getContext().getSecretManager();
try {
Entry<Token<AuthenticationTokenIdentifier>,AuthenticationTokenIdentifier> pair =
secretManager.generateToken(credentials.principal, config);
return new TDelegationToken(ByteBuffer.wrap(pair.getKey().getPassword()),
pair.getValue().getThriftIdentifier());
} catch (Exception e) {
throw new TException(e.getMessage());
}
}
@Override
public boolean drainReplicationTable(TInfo tfino, TCredentials credentials, String tableName,
Set<String> logsToWatch) throws TException {
AccumuloClient client = manager.getContext();
final Text tableId = new Text(getTableId(manager.getContext(), tableName).canonical());
drainLog.trace("Waiting for {} to be replicated for {}", logsToWatch, tableId);
drainLog.trace("Reading from metadata table");
final Set<Range> range = Collections.singleton(new Range(ReplicationSection.getRange()));
BatchScanner bs;
try {
bs = client.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
} catch (TableNotFoundException e) {
throw new RuntimeException("Could not read metadata table", e);
}
bs.setRanges(range);
bs.fetchColumnFamily(ReplicationSection.COLF);
try {
// Return immediately if there are records in metadata for these WALs
if (!allReferencesReplicated(bs, tableId, logsToWatch)) {
return false;
}
} finally {
bs.close();
}
drainLog.trace("reading from replication table");
try {
bs = client.createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4);
} catch (TableNotFoundException e) {
throw new RuntimeException("Replication table was not found", e);
}
bs.setRanges(Collections.singleton(new Range()));
try {
// No records in metadata, check replication table
return allReferencesReplicated(bs, tableId, logsToWatch);
} finally {
bs.close();
}
}
protected TableId getTableId(ClientContext context, String tableName)
throws ThriftTableOperationException {
return ClientServiceHandler.checkTableId(context, tableName, null);
}
/**
* @return return true records are only in place which are fully replicated
*/
protected boolean allReferencesReplicated(BatchScanner bs, Text tableId,
Set<String> relevantLogs) {
Text rowHolder = new Text(), colfHolder = new Text();
for (Entry<Key,Value> entry : bs) {
drainLog.trace("Got key {}", entry.getKey().toStringNoTruncate());
entry.getKey().getColumnQualifier(rowHolder);
if (tableId.equals(rowHolder)) {
entry.getKey().getRow(rowHolder);
entry.getKey().getColumnFamily(colfHolder);
String file;
if (colfHolder.equals(ReplicationSection.COLF)) {
file = rowHolder.toString();
file = file.substring(ReplicationSection.getRowPrefix().length());
} else if (colfHolder.equals(OrderSection.NAME)) {
file = OrderSection.getFile(entry.getKey(), rowHolder);
long timeClosed = OrderSection.getTimeClosed(entry.getKey(), rowHolder);
drainLog.trace("Order section: {} and {}", timeClosed, file);
} else {
file = rowHolder.toString();
}
// Skip files that we didn't observe when we started (new files/data)
if (relevantLogs.contains(file)) {
drainLog.trace("Found file that we *do* care about {}", file);
} else {
drainLog.trace("Found file that we didn't care about {}", file);
continue;
}
try {
Status stat = Status.parseFrom(entry.getValue().get());
if (!StatusUtil.isFullyReplicated(stat)) {
drainLog.trace("{} and {} is not replicated", file, ProtobufUtil.toString(stat));
return false;
}
drainLog.trace("{} and {} is replicated", file, ProtobufUtil.toString(stat));
} catch (InvalidProtocolBufferException e) {
drainLog.trace("Could not parse protobuf for {}", entry.getKey(), e);
}
}
}
return true;
}
}