blob: e1d46ef92eef64ca8f468e0bfb390a4ee8e09c0b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MetricsMaster;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleMasterProcedureManager extends MasterProcedureManager {
public static final String SIMPLE_SIGNATURE = "simple_test";
public static final String SIMPLE_DATA = "simple_test_data";
private static final Logger LOG = LoggerFactory.getLogger(SimpleMasterProcedureManager.class);
private MasterServices master;
private ProcedureCoordinator coordinator;
private boolean done;
@Override
public void stop(String why) {
LOG.info("stop: " + why);
}
@Override
public boolean isStopped() {
return false;
}
@Override
public void initialize(MasterServices master, MetricsMaster metricsMaster)
throws KeeperException, IOException, UnsupportedOperationException {
this.master = master;
this.done = false;
// setup the default procedure coordinator
String name = master.getServerName().toString();
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(
master.getZooKeeper(), getProcedureSignature(), name);
this.coordinator = new ProcedureCoordinator(comms, tpool);
}
@Override
public String getProcedureSignature() {
return SIMPLE_SIGNATURE;
}
@Override
public byte[] execProcedureWithRet(ProcedureDescription desc) throws IOException {
this.done = false;
// start the process on the RS
ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
List<String> servers = new ArrayList<>();
for (ServerName sn : serverNames) {
servers.add(sn.toString());
}
Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers);
if (proc == null) {
String msg = "Failed to submit distributed procedure for '"
+ getProcedureSignature() + "'";
LOG.error(msg);
throw new IOException(msg);
}
HashMap<String, byte[]> returnData = null;
try {
// wait for the procedure to complete. A timer thread is kicked off that should cancel this
// if it takes too long.
returnData = proc.waitForCompletedWithRet();
LOG.info("Done waiting - exec procedure for " + desc.getInstance());
this.done = true;
} catch (InterruptedException e) {
ForeignException ee =
new ForeignException("Interrupted while waiting for procdure to finish", e);
monitor.receive(ee);
Thread.currentThread().interrupt();
} catch (ForeignException e) {
monitor.receive(e);
}
// return the first value for testing
return returnData.values().iterator().next();
}
@Override
public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user)
throws IOException {}
@Override
public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
return done;
}
}