blob: ab5fc78c891e77a3340bf83c4a3f38b4ba36d041 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest;
import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse;
import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RowProcessor;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
* This class demonstrates how to implement atomic read-modify-writes
* using {@link Region#processRowsWithLocks} and Coprocessor endpoints.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public abstract class BaseRowProcessorEndpoint<S extends Message, T extends Message>
extends RowProcessorService implements CoprocessorService, Coprocessor {
private RegionCoprocessorEnvironment env;
/**
* Pass a processor to region to process multiple rows atomically.
*
* The RowProcessor implementations should be the inner classes of your
* RowProcessorEndpoint. This way the RowProcessor can be class-loaded with
* the Coprocessor endpoint together.
*
* See {@code TestRowProcessorEndpoint} for example.
*
* The request contains information for constructing processor
* (see {@link #constructRowProcessorFromRequest}. The processor object defines
* the read-modify-write procedure.
*/
@Override
public void process(RpcController controller, ProcessRequest request,
RpcCallback<ProcessResponse> done) {
ProcessResponse resultProto = null;
try {
RowProcessor<S,T> processor = constructRowProcessorFromRequest(request);
Region region = env.getRegion();
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
region.processRowsWithLocks(processor, nonceGroup, nonce);
T result = processor.getResult();
ProcessResponse.Builder b = ProcessResponse.newBuilder();
b.setRowProcessorResult(result.toByteString());
resultProto = b.build();
} catch (Exception e) {
ResponseConverter.setControllerException(controller, new IOException(e));
}
done.run(resultProto);
}
@Override
public Service getService() {
return this;
}
/**
* Stores a reference to the coprocessor environment provided by the
* {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
* coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded
* on a table region, so always expects this to be an instance of
* {@link RegionCoprocessorEnvironment}.
* @param env the environment provided by the coprocessor host
* @throws IOException if the provided environment is not an instance of
* {@code RegionCoprocessorEnvironment}
*/
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// nothing to do
}
@SuppressWarnings("unchecked")
RowProcessor<S,T> constructRowProcessorFromRequest(ProcessRequest request)
throws IOException {
String className = request.getRowProcessorClassName();
Class<?> cls;
try {
cls = Class.forName(className);
RowProcessor<S,T> ci = (RowProcessor<S,T>) cls.newInstance();
if (request.hasRowProcessorInitializerMessageName()) {
Class<?> imn = Class.forName(request.getRowProcessorInitializerMessageName())
.asSubclass(Message.class);
Method m;
try {
m = imn.getMethod("parseFrom", ByteString.class);
} catch (SecurityException e) {
throw new IOException(e);
} catch (NoSuchMethodException e) {
throw new IOException(e);
}
S s;
try {
s = (S)m.invoke(null,request.getRowProcessorInitializerMessage());
} catch (IllegalArgumentException e) {
throw new IOException(e);
} catch (InvocationTargetException e) {
throw new IOException(e);
}
ci.initialize(s);
}
return ci;
} catch (ClassNotFoundException e) {
throw new IOException(e);
} catch (InstantiationException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
}
}
}