blob: c557c2021b4000b3ac22f03ad6800db014bec91a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.Any;
import org.apache.hbase.thirdparty.com.google.protobuf.Internal;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
* Helper to convert to/from ProcedureProtos
*/
@InterfaceAudience.Private
public final class ProcedureUtil {
private ProcedureUtil() { }
// ==========================================================================
// Reflection helpers to create/validate a Procedure object
// ==========================================================================
private static Procedure<?> newProcedure(String className) throws BadProcedureException {
try {
Class<?> clazz = Class.forName(className);
if (!Modifier.isPublic(clazz.getModifiers())) {
throw new Exception("the " + clazz + " class is not public");
}
@SuppressWarnings("rawtypes")
Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor();
assert ctor != null : "no constructor found";
if (!Modifier.isPublic(ctor.getModifiers())) {
throw new Exception("the " + clazz + " constructor is not public");
}
return ctor.newInstance();
} catch (Exception e) {
throw new BadProcedureException(
"The procedure class " + className + " must be accessible and have an empty constructor",
e);
}
}
static void validateClass(Procedure<?> proc) throws BadProcedureException {
try {
Class<?> clazz = proc.getClass();
if (!Modifier.isPublic(clazz.getModifiers())) {
throw new Exception("the " + clazz + " class is not public");
}
Constructor<?> ctor = clazz.getConstructor();
assert ctor != null;
if (!Modifier.isPublic(ctor.getModifiers())) {
throw new Exception("the " + clazz + " constructor is not public");
}
} catch (Exception e) {
throw new BadProcedureException("The procedure class " + proc.getClass().getName() +
" must be accessible and have an empty constructor", e);
}
}
// ==========================================================================
// convert to and from Procedure object
// ==========================================================================
/**
* A serializer for our Procedures. Instead of the previous serializer, it
* uses the stateMessage list to store the internal state of the Procedures.
*/
private static class StateSerializer implements ProcedureStateSerializer {
private final ProcedureProtos.Procedure.Builder builder;
private int deserializeIndex;
public StateSerializer(ProcedureProtos.Procedure.Builder builder) {
this.builder = builder;
}
@Override
public void serialize(Message message) throws IOException {
Any packedMessage = Any.pack(message);
builder.addStateMessage(packedMessage);
}
@Override
public <M extends Message> M deserialize(Class<M> clazz)
throws IOException {
if (deserializeIndex >= builder.getStateMessageCount()) {
throw new IOException("Invalid state message index: " + deserializeIndex);
}
try {
Any packedMessage = builder.getStateMessage(deserializeIndex++);
return packedMessage.unpack(clazz);
} catch (InvalidProtocolBufferException e) {
throw e.unwrapIOException();
}
}
}
/**
* A serializer (deserializer) for those Procedures which were serialized
* before this patch. It deserializes the old, binary stateData field.
*/
private static class CompatStateSerializer implements ProcedureStateSerializer {
private InputStream inputStream;
public CompatStateSerializer(InputStream inputStream) {
this.inputStream = inputStream;
}
@Override
public void serialize(Message message) throws IOException {
throw new UnsupportedOperationException();
}
@SuppressWarnings("unchecked")
@Override
public <M extends Message> M deserialize(Class<M> clazz)
throws IOException {
Parser<M> parser = (Parser<M>) Internal.getDefaultInstance(clazz).getParserForType();
try {
return parser.parseDelimitedFrom(inputStream);
} catch (InvalidProtocolBufferException e) {
throw e.unwrapIOException();
}
}
}
/**
* Helper to convert the procedure to protobuf.
* <p/>
* Used by ProcedureStore implementations.
*/
public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc)
throws IOException {
Preconditions.checkArgument(proc != null);
validateClass(proc);
final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
.setClassName(proc.getClass().getName())
.setProcId(proc.getProcId())
.setState(proc.getState())
.setSubmittedTime(proc.getSubmittedTime())
.setLastUpdate(proc.getLastUpdate());
if (proc.hasParent()) {
builder.setParentId(proc.getParentProcId());
}
if (proc.hasTimeout()) {
builder.setTimeout(proc.getTimeout());
}
if (proc.hasOwner()) {
builder.setOwner(proc.getOwner());
}
final int[] stackIds = proc.getStackIndexes();
if (stackIds != null) {
for (int i = 0; i < stackIds.length; ++i) {
builder.addStackId(stackIds[i]);
}
}
if (proc.hasException()) {
RemoteProcedureException exception = proc.getException();
builder.setException(
RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
}
final byte[] result = proc.getResult();
if (result != null) {
builder.setResult(UnsafeByteOperations.unsafeWrap(result));
}
ProcedureStateSerializer serializer = new StateSerializer(builder);
proc.serializeStateData(serializer);
if (proc.getNonceKey() != null) {
builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
builder.setNonce(proc.getNonceKey().getNonce());
}
if (proc.hasLock()) {
builder.setLocked(true);
}
if (proc.isBypass()) {
builder.setBypass(true);
}
return builder.build();
}
/**
* Helper to convert the protobuf procedure.
* <p/>
* Used by ProcedureStore implementations.
* <p/>
* TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className,
* procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of
* it by storing the data only on insert().
*/
public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto)
throws IOException {
// Procedure from class name
Procedure<?> proc = newProcedure(proto.getClassName());
// set fields
proc.setProcId(proto.getProcId());
proc.setState(proto.getState());
proc.setSubmittedTime(proto.getSubmittedTime());
proc.setLastUpdate(proto.getLastUpdate());
if (proto.hasParentId()) {
proc.setParentProcId(proto.getParentId());
}
if (proto.hasOwner()) {
proc.setOwner(proto.getOwner());
}
if (proto.hasTimeout()) {
proc.setTimeout(proto.getTimeout());
}
if (proto.getStackIdCount() > 0) {
proc.setStackIndexes(proto.getStackIdList());
}
if (proto.hasException()) {
assert proc.getState() == ProcedureProtos.ProcedureState.FAILED ||
proc.getState() == ProcedureProtos.ProcedureState.ROLLEDBACK :
"The procedure must be failed (waiting to rollback) or rolledback";
proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
}
if (proto.hasResult()) {
proc.setResult(proto.getResult().toByteArray());
}
if (proto.getNonce() != HConstants.NO_NONCE) {
proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce()));
}
if (proto.getLocked()) {
proc.lockedWhenLoading();
}
if (proto.getBypass()) {
proc.bypass(null);
}
ProcedureStateSerializer serializer = null;
if (proto.getStateMessageCount() > 0) {
serializer = new StateSerializer(proto.toBuilder());
} else if (proto.hasStateData()) {
InputStream inputStream = proto.getStateData().newInput();
serializer = new CompatStateSerializer(inputStream);
}
if (serializer != null) {
proc.deserializeStateData(serializer);
}
return proc;
}
// ==========================================================================
// convert from LockedResource object
// ==========================================================================
public static LockServiceProtos.LockedResourceType convertToProtoResourceType(
LockedResourceType resourceType) {
return LockServiceProtos.LockedResourceType.valueOf(resourceType.name());
}
public static LockServiceProtos.LockType convertToProtoLockType(LockType lockType) {
return LockServiceProtos.LockType.valueOf(lockType.name());
}
public static LockServiceProtos.LockedResource convertToProtoLockedResource(
LockedResource lockedResource) throws IOException {
LockServiceProtos.LockedResource.Builder builder =
LockServiceProtos.LockedResource.newBuilder();
builder
.setResourceType(convertToProtoResourceType(lockedResource.getResourceType()))
.setResourceName(lockedResource.getResourceName())
.setLockType(convertToProtoLockType(lockedResource.getLockType()));
Procedure<?> exclusiveLockOwnerProcedure = lockedResource.getExclusiveLockOwnerProcedure();
if (exclusiveLockOwnerProcedure != null) {
ProcedureProtos.Procedure exclusiveLockOwnerProcedureProto =
convertToProtoProcedure(exclusiveLockOwnerProcedure);
builder.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto);
}
builder.setSharedLockCount(lockedResource.getSharedLockCount());
for (Procedure<?> waitingProcedure : lockedResource.getWaitingProcedures()) {
ProcedureProtos.Procedure waitingProcedureProto =
convertToProtoProcedure(waitingProcedure);
builder.addWaitingProcedures(waitingProcedureProto);
}
return builder.build();
}
public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS =
"hbase.procedure.retry.sleep.interval.ms";
// default to 1 second
public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 1000;
public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
"hbase.procedure.retry.max.sleep.time.ms";
// default to 10 minutes
public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
TimeUnit.MINUTES.toMillis(10);
/**
* Get a retry counter for getting the backoff time. We will use the
* {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time
* is 10 minutes by default.
* <p/>
* For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and
* {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not
* timeout.
*/
public static RetryCounter createRetryCounter(Configuration conf) {
long sleepIntervalMs =
conf.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS);
long maxSleepTimeMs =
conf.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS);
RetryConfig retryConfig = new RetryConfig().setSleepInterval(sleepIntervalMs)
.setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit());
return new RetryCounter(retryConfig);
}
public static boolean isFinished(ProcedureProtos.Procedure proc) {
if (!proc.hasParentId()) {
switch (proc.getState()) {
case ROLLEDBACK:
case SUCCESS:
return true;
default:
break;
}
}
return false;
}
}