blob: 86d90cdbff68fd513b872e97f3f8c13574bae2d6 [file] [log] [blame]
package org.apache.blur.command;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.blur.BlurConfiguration;
import org.apache.blur.command.commandtype.ClusterExecuteCommand;
import org.apache.blur.command.commandtype.ClusterIndexReadCommand;
import org.apache.blur.command.commandtype.ClusterServerReadCommand;
import org.apache.blur.command.commandtype.IndexReadCommand;
import org.apache.blur.command.commandtype.ServerReadCommand;
import org.apache.blur.server.LayoutFactory;
import org.apache.blur.server.TableContext;
import org.apache.blur.server.TableContextFactory;
import org.apache.blur.thrift.generated.CommandStatus;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
@SuppressWarnings("unchecked")
public class ControllerCommandManager extends BaseCommandManager {
public ControllerCommandManager(File tmpPath, String commandPath, int workerThreadCount, int driverThreadCount,
long connectionTimeout, Configuration configuration, String serverName) throws IOException {
super(tmpPath, commandPath, workerThreadCount, driverThreadCount, connectionTimeout, configuration, serverName);
}
public Response execute(final TableContextFactory tableContextFactory, LayoutFactory layoutFactory,
String commandName, ArgumentOverlay argumentOverlay, CommandStatus originalCommandStatusObject)
throws IOException, TimeoutException, ExceptionCollector {
final ControllerClusterContext context = createCommandContext(tableContextFactory, layoutFactory);
final Command<?> command = getCommandObject(commandName, argumentOverlay);
if (command == null) {
throw new IOException("Command with name [" + commandName + "] not found.");
}
return submitDriverCallable(new Callable<Response>() {
@Override
public Response call() throws Exception {
// For those commands that do not implement cluster command, run them in
// a base impl.
try {
if (command instanceof IndexReadCommand) {
return executeIndexReadCommand(context, command);
}
if (command instanceof ServerReadCommand) {
return executeIndexReadCombiningCommand(context, command);
}
if (command instanceof ClusterIndexReadCommand) {
throw new RuntimeException("Not implemented");
}
if (command instanceof ClusterServerReadCommand) {
CombiningContext combiningContext = getCombiningContext(tableContextFactory);
return executeClusterReadCombiningCommand(context, command, combiningContext);
}
if (command instanceof ClusterExecuteCommand) {
return executeClusterCommand(context, command);
}
throw new IOException("Command type of [" + command.getClass() + "] not supported.");
} finally {
IOUtils.closeQuietly(context);
}
}
}, command, originalCommandStatusObject, new AtomicBoolean(true));
}
private CombiningContext getCombiningContext(final TableContextFactory tableContextFactory) {
return new CombiningContext() {
@Override
public TableContext getTableContext(String table) throws IOException {
return tableContextFactory.getTableContext(table);
}
@Override
public BlurConfiguration getBlurConfiguration(String table) throws IOException {
return getTableContext(table).getBlurConfiguration();
}
};
}
private Response executeClusterCommand(ClusterContext context, Command<?> command) throws IOException,
InterruptedException {
ClusterExecuteCommand<Object> clusterCommand = (ClusterExecuteCommand<Object>) command;
Object object = clusterCommand.clusterExecute(context);
return Response.createNewAggregateResponse(object);
}
private Response executeIndexReadCommand(ClusterContext context, Command<?> command) throws IOException {
Map<Shard, Object> result = context.readIndexes((IndexReadCommand<Object>) command);
return Response.createNewShardResponse(result);
}
private Response executeClusterReadCombiningCommand(ClusterContext context, Command<?> command,
CombiningContext combiningContext) throws IOException, InterruptedException {
Map<Server, Object> results = context.readServers((ClusterServerReadCommand<Object>) command);
ClusterServerReadCommand<Object> clusterReadCombiningCommand = (ClusterServerReadCommand<Object>) command;
Object result = clusterReadCombiningCommand.combine(combiningContext, (Map<? extends Location<?>, Object>) results);
return Response.createNewAggregateResponse(result);
}
private Response executeIndexReadCombiningCommand(ClusterContext context, Command<?> command) throws IOException {
Map<Server, Object> result = context.readServers((ServerReadCommand<Object, Object>) command);
return Response.createNewServerResponse(result);
}
private ControllerClusterContext createCommandContext(TableContextFactory tableContextFactory,
LayoutFactory layoutFactory) throws IOException {
return new ControllerClusterContext(tableContextFactory, layoutFactory, this);
}
}