| package org.apache.blur.command; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.blur.BlurConfiguration; |
| import org.apache.blur.command.commandtype.ClusterExecuteCommand; |
| import org.apache.blur.command.commandtype.ClusterIndexReadCommand; |
| import org.apache.blur.command.commandtype.ClusterServerReadCommand; |
| import org.apache.blur.command.commandtype.IndexReadCommand; |
| import org.apache.blur.command.commandtype.ServerReadCommand; |
| import org.apache.blur.server.LayoutFactory; |
| import org.apache.blur.server.TableContext; |
| import org.apache.blur.server.TableContextFactory; |
| import org.apache.blur.thrift.generated.CommandStatus; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.hadoop.conf.Configuration; |
| |
| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| @SuppressWarnings("unchecked") |
| public class ControllerCommandManager extends BaseCommandManager { |
| |
| public ControllerCommandManager(File tmpPath, String commandPath, int workerThreadCount, int driverThreadCount, |
| long connectionTimeout, Configuration configuration, String serverName) throws IOException { |
| super(tmpPath, commandPath, workerThreadCount, driverThreadCount, connectionTimeout, configuration, serverName); |
| } |
| |
| public Response execute(final TableContextFactory tableContextFactory, LayoutFactory layoutFactory, |
| String commandName, ArgumentOverlay argumentOverlay, CommandStatus originalCommandStatusObject) |
| throws IOException, TimeoutException, ExceptionCollector { |
| final ControllerClusterContext context = createCommandContext(tableContextFactory, layoutFactory); |
| final Command<?> command = getCommandObject(commandName, argumentOverlay); |
| if (command == null) { |
| throw new IOException("Command with name [" + commandName + "] not found."); |
| } |
| return submitDriverCallable(new Callable<Response>() { |
| @Override |
| public Response call() throws Exception { |
| // For those commands that do not implement cluster command, run them in |
| // a base impl. |
| try { |
| |
| if (command instanceof IndexReadCommand) { |
| return executeIndexReadCommand(context, command); |
| } |
| if (command instanceof ServerReadCommand) { |
| return executeIndexReadCombiningCommand(context, command); |
| } |
| if (command instanceof ClusterIndexReadCommand) { |
| throw new RuntimeException("Not implemented"); |
| } |
| if (command instanceof ClusterServerReadCommand) { |
| CombiningContext combiningContext = getCombiningContext(tableContextFactory); |
| return executeClusterReadCombiningCommand(context, command, combiningContext); |
| } |
| if (command instanceof ClusterExecuteCommand) { |
| return executeClusterCommand(context, command); |
| } |
| |
| throw new IOException("Command type of [" + command.getClass() + "] not supported."); |
| } finally { |
| IOUtils.closeQuietly(context); |
| } |
| } |
| |
| }, command, originalCommandStatusObject, new AtomicBoolean(true)); |
| } |
| |
| private CombiningContext getCombiningContext(final TableContextFactory tableContextFactory) { |
| return new CombiningContext() { |
| |
| @Override |
| public TableContext getTableContext(String table) throws IOException { |
| return tableContextFactory.getTableContext(table); |
| } |
| |
| @Override |
| public BlurConfiguration getBlurConfiguration(String table) throws IOException { |
| return getTableContext(table).getBlurConfiguration(); |
| } |
| }; |
| } |
| |
| private Response executeClusterCommand(ClusterContext context, Command<?> command) throws IOException, |
| InterruptedException { |
| ClusterExecuteCommand<Object> clusterCommand = (ClusterExecuteCommand<Object>) command; |
| Object object = clusterCommand.clusterExecute(context); |
| return Response.createNewAggregateResponse(object); |
| } |
| |
| private Response executeIndexReadCommand(ClusterContext context, Command<?> command) throws IOException { |
| Map<Shard, Object> result = context.readIndexes((IndexReadCommand<Object>) command); |
| return Response.createNewShardResponse(result); |
| } |
| |
| private Response executeClusterReadCombiningCommand(ClusterContext context, Command<?> command, |
| CombiningContext combiningContext) throws IOException, InterruptedException { |
| Map<Server, Object> results = context.readServers((ClusterServerReadCommand<Object>) command); |
| ClusterServerReadCommand<Object> clusterReadCombiningCommand = (ClusterServerReadCommand<Object>) command; |
| Object result = clusterReadCombiningCommand.combine(combiningContext, (Map<? extends Location<?>, Object>) results); |
| return Response.createNewAggregateResponse(result); |
| } |
| |
| private Response executeIndexReadCombiningCommand(ClusterContext context, Command<?> command) throws IOException { |
| Map<Server, Object> result = context.readServers((ServerReadCommand<Object, Object>) command); |
| return Response.createNewServerResponse(result); |
| } |
| |
| private ControllerClusterContext createCommandContext(TableContextFactory tableContextFactory, |
| LayoutFactory layoutFactory) throws IOException { |
| return new ControllerClusterContext(tableContextFactory, layoutFactory, this); |
| } |
| |
| } |