Blur commands can have their status be returned through the thrift api and the commands execution can now be interrupted.
diff --git a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
index edee060..ad542ef 100644
--- a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
@@ -10,6 +10,7 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
@@ -20,6 +21,7 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
@@ -29,6 +31,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.blur.command.annotation.Description;
import org.apache.blur.concurrent.Executors;
@@ -36,6 +39,9 @@
import org.apache.blur.log.LogFactory;
import org.apache.blur.thrift.generated.Arguments;
import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.CommandStatus;
+import org.apache.blur.thrift.generated.CommandStatusState;
+import org.apache.blur.thrift.generated.User;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -85,9 +91,11 @@
protected final Configuration _configuration;
protected final BlurObjectSerDe _serDe = new BlurObjectSerDe();
protected final long _runningCacheTombstoneTime = TimeUnit.SECONDS.toMillis(60);
+ protected final String _serverName;
public BaseCommandManager(File tmpPath, String commandPath, int workerThreadCount, int driverThreadCount,
- long connectionTimeout, Configuration configuration) throws IOException {
+ long connectionTimeout, Configuration configuration, String serverName) throws IOException {
+ _serverName = serverName;
_configuration = configuration;
lookForCommandsToRegisterInClassPath();
_tmpPath = tmpPath;
@@ -125,8 +133,70 @@
}
}
- public List<String> commandStatusList(CommandStatusStateEnum commandStatus) {
- throw new RuntimeException("Not implemented.");
+ public CommandStatus getCommandStatus(String commandExecutionId) {
+ CommandStatus cso = findCommandStatusObject(commandExecutionId, _workerRunningMap.values());
+ if (cso != null) {
+ return cso;
+ }
+ return findCommandStatusObject(commandExecutionId, _driverRunningMap.values());
+ }
+
+ private CommandStatus findCommandStatusObject(String commandExecutionId, Collection<ResponseFuture<?>> values) {
+ Map<String, Map<CommandStatusState, Long>> serverStateMap = new HashMap<String, Map<CommandStatusState, Long>>();
+ CommandStatus commandStatus = null;
+ for (ResponseFuture<?> responseFuture : values) {
+ Command<?> commandExecuting = responseFuture.getCommandExecuting();
+ if (commandExecuting.getCommandExecutionId().equals(commandExecutionId)) {
+ if (commandStatus == null) {
+ CommandStatus originalCommandStatusObject = responseFuture.getOriginalCommandStatusObject();
+ String commandName = responseFuture.getCommandExecuting().getName();
+ Arguments arguments = originalCommandStatusObject.getArguments();
+ User user = originalCommandStatusObject.getUser();
+ commandStatus = new CommandStatus(commandExecutionId, commandName, arguments, serverStateMap, user);
+ }
+
+ CommandStatusState commandStatusStateEnum = getCommandStatusStateEnum(responseFuture);
+ Map<CommandStatusState, Long> map = serverStateMap.get(_serverName);
+ if (map == null) {
+ serverStateMap.put(_serverName, map = new HashMap<CommandStatusState, Long>());
+ }
+ Long l = map.get(commandStatusStateEnum);
+ if (l == null) {
+ map.put(commandStatusStateEnum, 1L);
+ } else {
+ map.put(commandStatusStateEnum, 1L + l);
+ }
+ }
+ }
+ return commandStatus;
+ }
+
+ public List<String> commandStatusList() {
+ Set<String> result = new TreeSet<String>();
+ result.addAll(getStatusList(_workerRunningMap.values()));
+ result.addAll(getStatusList(_driverRunningMap.values()));
+ return new ArrayList<String>(result);
+ }
+
+ private List<String> getStatusList(Collection<ResponseFuture<?>> values) {
+ List<String> result = new ArrayList<String>();
+ for (ResponseFuture<?> responseFuture : values) {
+ Command<?> commandExecuting = responseFuture.getCommandExecuting();
+ result.add(commandExecuting.getCommandExecutionId());
+ }
+ return result;
+ }
+
+ private CommandStatusState getCommandStatusStateEnum(ResponseFuture<?> responseFuture) {
+ if (responseFuture.isCancelled()) {
+ return CommandStatusState.INTERRUPTED;
+ } else {
+ if (responseFuture.isDone()) {
+ return CommandStatusState.COMPLETE;
+ } else {
+ return CommandStatusState.RUNNING;
+ }
+ }
}
private TimerTask getTimerTaskForRemovalOfOldCommands(final Map<Long, ResponseFuture<?>> runningMap) {
@@ -240,7 +310,7 @@
protected void copyLocal(FileSystem fileSystem, FileStatus fileStatus, File destDir) throws IOException {
Path path = fileStatus.getPath();
File file = new File(destDir, path.getName());
- if (fileStatus.isDir()) {
+ if (fileStatus.isDirectory()) {
if (!file.mkdirs()) {
LOG.error("Error while trying to create a sub directory [{0}].", file.getAbsolutePath());
throw new IOException("Error while trying to create a sub directory [" + file.getAbsolutePath() + "].");
@@ -259,7 +329,7 @@
}
protected BigInteger checkContents(FileStatus fileStatus, FileSystem fileSystem) throws IOException {
- if (fileStatus.isDir()) {
+ if (fileStatus.isDirectory()) {
LOG.debug("Scanning directory [{0}].", fileStatus.getPath());
BigInteger count = BigInteger.ZERO;
Path path = fileStatus.getPath();
@@ -329,12 +399,12 @@
}
}
- protected Response submitDriverCallable(Callable<Response> callable, Command<?> commandExecuting) throws IOException,
- TimeoutException, ExceptionCollector {
+ protected Response submitDriverCallable(Callable<Response> callable, Command<?> commandExecuting,
+ CommandStatus originalCommandStatusObject, AtomicBoolean running) throws IOException, TimeoutException, ExceptionCollector {
Future<Response> future = _executorServiceDriver.submit(callable);
Long instanceExecutionId = getInstanceExecutionId();
_driverRunningMap.put(instanceExecutionId, new ResponseFuture<Response>(_runningCacheTombstoneTime, future,
- commandExecuting));
+ commandExecuting, originalCommandStatusObject,running));
try {
return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
} catch (CancellationException e) {
@@ -368,11 +438,12 @@
}
}
- protected <T> Future<T> submitToExecutorService(Callable<T> callable, Command<?> commandExecuting) {
+ protected <T> Future<T> submitToExecutorService(Callable<T> callable, Command<?> commandExecuting,
+ CommandStatus originalCommandStatusObject, AtomicBoolean running) {
Future<T> future = _executorServiceWorker.submit(callable);
Long instanceExecutionId = getInstanceExecutionId();
_workerRunningMap.put(instanceExecutionId, new ResponseFuture<T>(_runningCacheTombstoneTime, future,
- commandExecuting));
+ commandExecuting, originalCommandStatusObject, running));
return future;
}
diff --git a/blur-core/src/main/java/org/apache/blur/command/CommandStatusStateEnum.java b/blur-core/src/main/java/org/apache/blur/command/CommandStatusStateEnum.java
deleted file mode 100644
index 575bb9d..0000000
--- a/blur-core/src/main/java/org/apache/blur/command/CommandStatusStateEnum.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.command;
-
-public enum CommandStatusStateEnum {
- RUNNING, INTERRUPTED, COMPLETE, BACK_PRESSURE_INTERRUPTED;
-}
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
index 192f743..b7a1a63 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
@@ -11,6 +11,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.blur.BlurConfiguration;
import org.apache.blur.log.Log;
@@ -25,6 +26,7 @@
import org.apache.blur.thrift.generated.Arguments;
import org.apache.blur.thrift.generated.Blur.Client;
import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.CommandStatus;
import org.apache.blur.thrift.generated.Response;
import org.apache.blur.thrift.generated.TimeoutException;
import org.apache.blur.thrift.generated.ValueObject;
@@ -131,6 +133,8 @@
Map<Server, Client> clientMap = getClientMap(command, tables, shards);
final Arguments arguments = _manager.toArguments(command);
+
+ CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, null);
for (Entry<Server, Client> e : clientMap.entrySet()) {
Server server = e.getKey();
final Client client = e.getValue();
@@ -142,7 +146,7 @@
Map<Shard, Object> shardToValue = CommandUtil.fromThriftSupportedObjects(shardToThriftValue, _serDe);
return (Map<Shard, T>) shardToValue;
}
- }, command);
+ }, command, originalCommandStatusObject, new AtomicBoolean(true));
for (Shard shard : getShardsOnServer(server, tables, shards)) {
futureMap.put(shard, new ShardResultFuture<T>(shard, future));
}
@@ -222,6 +226,7 @@
Set<Shard> shards = command.routeShards(this, tables);
Map<Server, Client> clientMap = getClientMap(command, tables, shards);
final Arguments arguments = _manager.toArguments(command);
+ CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, null);
for (Entry<Server, Client> e : clientMap.entrySet()) {
Server server = e.getKey();
final Client client = e.getValue();
@@ -233,7 +238,7 @@
Object thriftObject = CommandUtil.toObject(valueObject);
return (T) _serDe.fromSupportedThriftObject(thriftObject);
}
- }, command);
+ }, command, originalCommandStatusObject, new AtomicBoolean(true));
futureMap.put(server, future);
}
return futureMap;
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
index b11d09a..86d90cd 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
@@ -4,6 +4,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.blur.BlurConfiguration;
import org.apache.blur.command.commandtype.ClusterExecuteCommand;
@@ -14,6 +15,7 @@
import org.apache.blur.server.LayoutFactory;
import org.apache.blur.server.TableContext;
import org.apache.blur.server.TableContextFactory;
+import org.apache.blur.thrift.generated.CommandStatus;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
@@ -37,12 +39,13 @@
public class ControllerCommandManager extends BaseCommandManager {
public ControllerCommandManager(File tmpPath, String commandPath, int workerThreadCount, int driverThreadCount,
- long connectionTimeout, Configuration configuration) throws IOException {
- super(tmpPath, commandPath, workerThreadCount, driverThreadCount, connectionTimeout, configuration);
+ long connectionTimeout, Configuration configuration, String serverName) throws IOException {
+ super(tmpPath, commandPath, workerThreadCount, driverThreadCount, connectionTimeout, configuration, serverName);
}
public Response execute(final TableContextFactory tableContextFactory, LayoutFactory layoutFactory,
- String commandName, ArgumentOverlay argumentOverlay) throws IOException, TimeoutException, ExceptionCollector {
+ String commandName, ArgumentOverlay argumentOverlay, CommandStatus originalCommandStatusObject)
+ throws IOException, TimeoutException, ExceptionCollector {
final ControllerClusterContext context = createCommandContext(tableContextFactory, layoutFactory);
final Command<?> command = getCommandObject(commandName, argumentOverlay);
if (command == null) {
@@ -78,7 +81,7 @@
}
}
- }, command);
+ }, command, originalCommandStatusObject, new AtomicBoolean(true));
}
private CombiningContext getCombiningContext(final TableContextFactory tableContextFactory) {
diff --git a/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java b/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
index eb9b30d..a5a629e 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
@@ -20,19 +20,31 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.blur.thrift.generated.CommandStatus;
+
public class ResponseFuture<T> implements Future<T> {
private final Future<T> _future;
private final AtomicLong _timeWhenNotRunningObserved = new AtomicLong();
private final long _tombstone;
private final Command<?> _commandExecuting;
+ private final CommandStatus _originalCommandStatusObject;
+ private final AtomicBoolean _running;
- public ResponseFuture(long tombstone, Future<T> future, Command<?> commandExecuting) {
+ public ResponseFuture(long tombstone, Future<T> future, Command<?> commandExecuting,
+ CommandStatus originalCommandStatusObject, AtomicBoolean running) {
_tombstone = tombstone;
_future = future;
_commandExecuting = commandExecuting;
+ _originalCommandStatusObject = originalCommandStatusObject;
+ _running = running;
+ }
+
+ public CommandStatus getOriginalCommandStatusObject() {
+ return _originalCommandStatusObject;
}
public Command<?> getCommandExecuting() {
@@ -40,6 +52,7 @@
}
public boolean cancel(boolean mayInterruptIfRunning) {
+ _running.set(false);
return _future.cancel(mayInterruptIfRunning);
}
diff --git a/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
index d518523..5806874 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
@@ -26,14 +26,17 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.blur.BlurConfiguration;
import org.apache.blur.lucene.search.IndexSearcherCloseable;
+import org.apache.blur.manager.IndexManager;
import org.apache.blur.manager.IndexServer;
import org.apache.blur.manager.writer.BlurIndex;
import org.apache.blur.server.ShardServerContext;
import org.apache.blur.server.TableContext;
import org.apache.blur.server.TableContextFactory;
+import org.apache.blur.thrift.generated.CommandStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.lucene.index.IndexReader;
@@ -42,14 +45,16 @@
private final IndexServer _indexServer;
public ShardCommandManager(IndexServer indexServer, File tmpPath, String commandPath, int workerThreadCount,
- int driverThreadCount, long connectionTimeout, Configuration configuration) throws IOException {
- super(tmpPath, commandPath, workerThreadCount, driverThreadCount, connectionTimeout, configuration);
+ int driverThreadCount, long connectionTimeout, Configuration configuration, String serverName) throws IOException {
+ super(tmpPath, commandPath, workerThreadCount, driverThreadCount, connectionTimeout, configuration, serverName);
_indexServer = indexServer;
}
public Response execute(final TableContextFactory tableContextFactory, final String commandName,
- final ArgumentOverlay argumentOverlay) throws IOException, TimeoutException, ExceptionCollector {
+ final ArgumentOverlay argumentOverlay, CommandStatus originalCommandStatusObject) throws IOException,
+ TimeoutException, ExceptionCollector {
final ShardServerContext shardServerContext = getShardServerContext();
+ AtomicBoolean running = new AtomicBoolean(true);
final Command<?> command = getCommandObject(commandName, argumentOverlay);
Callable<Response> callable = new Callable<Response>() {
@Override
@@ -58,8 +63,9 @@
throw new IOException("Command with name [" + commandName + "] not found.");
}
if (command instanceof IndexRead || command instanceof ServerRead) {
- return toResponse(executeReadCommand(shardServerContext, command, tableContextFactory), command,
- getServerContext(tableContextFactory));
+ return toResponse(
+ executeReadCommand(shardServerContext, command, tableContextFactory, originalCommandStatusObject, running),
+ command, getServerContext(tableContextFactory));
}
throw new IOException("Command type of [" + command.getClass() + "] not supported.");
}
@@ -79,7 +85,7 @@
};
}
};
- return submitDriverCallable(callable, command);
+ return submitDriverCallable(callable, command, originalCommandStatusObject, running);
}
private ShardServerContext getShardServerContext() {
@@ -102,7 +108,8 @@
}
private Map<Shard, Object> executeReadCommand(ShardServerContext shardServerContext, Command<?> command,
- final TableContextFactory tableContextFactory) throws IOException, ExceptionCollector {
+ final TableContextFactory tableContextFactory, CommandStatus originalCommandStatusObject, AtomicBoolean running)
+ throws IOException, ExceptionCollector {
BaseContext context = new BaseContext() {
@Override
public TableContext getTableContext(String table) throws IOException {
@@ -139,14 +146,15 @@
Command<?> clone = command.clone();
if (clone instanceof IndexRead) {
final IndexRead<?> readCommand = (IndexRead<?>) clone;
- callable = getCallable(shardServerContext, tableContextFactory, table, shard, blurIndex, readCommand);
+ callable = getCallable(shardServerContext, tableContextFactory, table, shard, blurIndex, readCommand, running);
} else if (clone instanceof ServerRead) {
final ServerRead<?, ?> readCombiningCommand = (ServerRead<?, ?>) clone;
- callable = getCallable(shardServerContext, tableContextFactory, table, shard, blurIndex, readCombiningCommand);
+ callable = getCallable(shardServerContext, tableContextFactory, table, shard, blurIndex,
+ readCombiningCommand, running);
} else {
throw new IOException("Command type of [" + clone.getClass() + "] not supported.");
}
- Future<Object> future = submitToExecutorService(callable, clone);
+ Future<Object> future = submitToExecutorService(callable, clone, originalCommandStatusObject, running);
futureMap.put(shard, future);
}
}
@@ -189,7 +197,7 @@
private Callable<Object> getCallable(final ShardServerContext shardServerContext,
final TableContextFactory tableContextFactory, final String table, final Shard shard, final BlurIndex blurIndex,
- final ServerRead<?, ?> readCombiningCommand) {
+ final ServerRead<?, ?> readCombiningCommand, AtomicBoolean running) {
return new Callable<Object>() {
@Override
public Object call() throws Exception {
@@ -199,14 +207,15 @@
searcher = blurIndex.getIndexSearcher();
shardServerContext.setIndexSearcherClosable(table, shardId, searcher);
}
- return readCombiningCommand.execute(new ShardIndexContext(tableContextFactory, table, shard, searcher));
+ return readCombiningCommand
+ .execute(new ShardIndexContext(tableContextFactory, table, shard, searcher, running));
}
};
}
private Callable<Object> getCallable(final ShardServerContext shardServerContext,
final TableContextFactory tableContextFactory, final String table, final Shard shard, final BlurIndex blurIndex,
- final IndexRead<?> readCommand) {
+ final IndexRead<?> readCommand, AtomicBoolean running) {
return new Callable<Object>() {
@Override
public Object call() throws Exception {
@@ -217,7 +226,7 @@
searcher = blurIndex.getIndexSearcher();
shardServerContext.setIndexSearcherClosable(table, shardId, searcher);
}
- return readCommand.execute(new ShardIndexContext(tableContextFactory, table, shard, searcher));
+ return readCommand.execute(new ShardIndexContext(tableContextFactory, table, shard, searcher, running));
}
};
}
@@ -229,11 +238,13 @@
private final TableContextFactory _tableContextFactory;
private final String _table;
- public ShardIndexContext(TableContextFactory tableContextFactory, String table, Shard shard, IndexSearcherCloseable searcher) {
+ public ShardIndexContext(TableContextFactory tableContextFactory, String table, Shard shard,
+ IndexSearcherCloseable searcher, AtomicBoolean running) {
_tableContextFactory = tableContextFactory;
_table = table;
_shard = shard;
_searcher = searcher;
+ IndexManager.resetExitableReader(getIndexReader(), running);
}
@Override
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index de225d5..cca43a8 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -166,7 +166,7 @@
private final ExecutorService _facetExecutor;
private final ExecutorService _mutateExecutor;
- private final QueryStatusManager _statusManager = new QueryStatusManager();
+ private final QueryStatusManager _statusManager;
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final BlurPartitioner _blurPartitioner = new BlurPartitioner();
private final BlurFilterCache _filterCache;
@@ -184,8 +184,9 @@
public static AtomicBoolean DEBUG_RUN_SLOW = new AtomicBoolean(false);
public IndexManager(IndexServer indexServer, ClusterStatus clusterStatus, BlurFilterCache filterCache,
- int maxHeapPerRowFetch, int fetchCount, int threadCount, int mutateThreadCount, long statusCleanupTimerDelay,
- int facetThreadCount, DeepPagingCache deepPagingCache, MemoryAllocationWatcher memoryAllocationWatcher) {
+ int maxHeapPerRowFetch, int fetchCount, int threadCount, int mutateThreadCount, int facetThreadCount,
+ DeepPagingCache deepPagingCache, MemoryAllocationWatcher memoryAllocationWatcher, QueryStatusManager statusManager) {
+ _statusManager = statusManager;
_memoryAllocationWatcher = memoryAllocationWatcher;
_deepPagingCache = deepPagingCache;
_indexServer = indexServer;
@@ -219,8 +220,6 @@
_facetExecutor = Executors.newThreadPool(new SynchronousQueue<Runnable>(), "facet-execution", facetThreadCount);
}
- _statusManager.setStatusCleanupTimerDelay(statusCleanupTimerDelay);
- _statusManager.init();
LOG.info("Init Complete");
}
@@ -228,7 +227,6 @@
public synchronized void close() {
if (!_closed.get()) {
_closed.set(true);
- _statusManager.close();
_executor.shutdownNow();
_mutateExecutor.shutdownNow();
if (_facetExecutor != null) {
@@ -1296,7 +1294,7 @@
}
- private static boolean resetExitableReader(IndexReader indexReader, AtomicBoolean running) {
+ public static boolean resetExitableReader(IndexReader indexReader, AtomicBoolean running) {
if (indexReader instanceof ExitableReader) {
ExitableReader exitableReader = (ExitableReader) indexReader;
exitableReader.setRunning(running);
diff --git a/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java b/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
index bb419b2..a86858a 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
@@ -16,6 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
@@ -24,7 +25,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.blur.log.Log;
@@ -36,18 +37,19 @@
import org.apache.blur.utils.GCAction;
import org.apache.blur.utils.GCWatcher;
-public class QueryStatusManager {
+public class QueryStatusManager implements Closeable {
private static final Log LOG = LogFactory.getLog(QueryStatusManager.class);
private static final Object CONSTANT_VALUE = new Object();
- private Timer statusCleanupTimer;
- private long statusCleanupTimerDelay = TimeUnit.SECONDS.toMillis(10);
- private ConcurrentHashMap<QueryStatus, Object> currentQueryStatusCollection = new ConcurrentHashMap<QueryStatus, Object>();
-
- public void init() {
- statusCleanupTimer = new Timer("Query-Status-Cleanup", true);
- statusCleanupTimer.schedule(new TimerTask() {
+ private final Timer _statusCleanupTimer;
+ private final long _statusCleanupTimerDelay;
+ private final ConcurrentMap<QueryStatus, Object> _currentQueryStatusCollection = new ConcurrentHashMap<QueryStatus, Object>();
+
+ public QueryStatusManager(long statusCleanupTimerDelay) {
+ _statusCleanupTimerDelay = statusCleanupTimerDelay;
+ _statusCleanupTimer = new Timer("Query-Status-Cleanup", true);
+ _statusCleanupTimer.schedule(new TimerTask() {
@Override
public void run() {
try {
@@ -56,7 +58,7 @@
LOG.error("Unknown error while trying to cleanup finished queries.", e);
}
}
- }, statusCleanupTimerDelay, statusCleanupTimerDelay);
+ }, _statusCleanupTimerDelay, _statusCleanupTimerDelay);
GCWatcher.registerAction(new GCAction() {
@Override
public void takeAction() throws Exception {
@@ -65,14 +67,15 @@
});
}
+ @Override
public void close() {
- statusCleanupTimer.cancel();
- statusCleanupTimer.purge();
+ _statusCleanupTimer.cancel();
+ _statusCleanupTimer.purge();
}
public QueryStatus newQueryStatus(String table, BlurQuery blurQuery, int maxNumberOfThreads, AtomicBoolean running, User user) {
- QueryStatus queryStatus = new QueryStatus(statusCleanupTimerDelay, table, blurQuery, running, user);
- currentQueryStatusCollection.put(queryStatus, CONSTANT_VALUE);
+ QueryStatus queryStatus = new QueryStatus(_statusCleanupTimerDelay, table, blurQuery, running, user);
+ _currentQueryStatusCollection.put(queryStatus, CONSTANT_VALUE);
return queryStatus;
}
@@ -81,27 +84,23 @@
}
private void cleanupFinishedQueryStatuses() {
- LOG.debug("QueryStatus Start count [{0}].", currentQueryStatusCollection.size());
- Iterator<QueryStatus> iterator = currentQueryStatusCollection.keySet().iterator();
+ LOG.debug("QueryStatus Start count [{0}].", _currentQueryStatusCollection.size());
+ Iterator<QueryStatus> iterator = _currentQueryStatusCollection.keySet().iterator();
while (iterator.hasNext()) {
QueryStatus status = iterator.next();
if (status.isValidForCleanUp()) {
- currentQueryStatusCollection.remove(status);
+ _currentQueryStatusCollection.remove(status);
}
}
- LOG.debug("QueryStatus Finish count [{0}].", currentQueryStatusCollection.size());
+ LOG.debug("QueryStatus Finish count [{0}].", _currentQueryStatusCollection.size());
}
public long getStatusCleanupTimerDelay() {
- return statusCleanupTimerDelay;
- }
-
- public void setStatusCleanupTimerDelay(long statusCleanupTimerDelay) {
- this.statusCleanupTimerDelay = statusCleanupTimerDelay;
+ return _statusCleanupTimerDelay;
}
public void cancelQuery(String table, String uuid) {
- for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+ for (QueryStatus status : _currentQueryStatusCollection.keySet()) {
String userUuid = status.getUserUuid();
if (userUuid != null && userUuid.equals(uuid) && status.getTable().equals(table)) {
status.cancelQuery();
@@ -111,7 +110,7 @@
public List<BlurQueryStatus> currentQueries(String table) {
List<BlurQueryStatus> result = new ArrayList<BlurQueryStatus>();
- for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+ for (QueryStatus status : _currentQueryStatusCollection.keySet()) {
if (status.getTable().equals(table)) {
result.add(status.getQueryStatus());
}
@@ -120,7 +119,7 @@
}
public BlurQueryStatus queryStatus(String table, String uuid) {
- for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+ for (QueryStatus status : _currentQueryStatusCollection.keySet()) {
String userUuid = status.getUserUuid();
if (userUuid != null && userUuid.equals(uuid) && status.getTable().equals(table)) {
return status.getQueryStatus();
@@ -131,7 +130,7 @@
public List<String> queryStatusIdList(String table) {
Set<String> ids = new HashSet<String>();
- for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+ for (QueryStatus status : _currentQueryStatusCollection.keySet()) {
if (status.getTable().equals(table)) {
if (status.getUserUuid() != null) {
ids.add(status.getUserUuid());
@@ -143,7 +142,7 @@
public void stopAllQueriesForBackPressure() {
LOG.warn("Stopping all queries for back pressure.");
- for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+ for (QueryStatus status : _currentQueryStatusCollection.keySet()) {
QueryState state = status.getQueryStatus().getState();
if (state == QueryState.RUNNING) {
status.stopQueryForBackPressure();
diff --git a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
index ff40e5a..1e819ce 100644
--- a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
+++ b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
@@ -31,7 +31,6 @@
import org.apache.blur.thrift.generated.ColumnDefinition;
import org.apache.blur.thrift.generated.CommandDescriptor;
import org.apache.blur.thrift.generated.CommandStatus;
-import org.apache.blur.thrift.generated.CommandStatusState;
import org.apache.blur.thrift.generated.FetchResult;
import org.apache.blur.thrift.generated.Level;
import org.apache.blur.thrift.generated.Metric;
@@ -261,9 +260,9 @@
}
@Override
- public List<String> commandStatusList(int startingAt, short fetch, CommandStatusState state) throws BlurException,
+ public List<String> commandStatusList(int startingAt, short fetch) throws BlurException,
TException {
- return _iface.commandStatusList(startingAt, fetch, state);
+ return _iface.commandStatusList(startingAt, fetch);
}
@Override
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index 8ee32af..f43be5b 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -112,9 +112,9 @@
import org.apache.blur.utils.BlurIterator;
import org.apache.blur.utils.BlurUtil;
import org.apache.blur.utils.ForkJoin;
-import org.apache.blur.utils.ShardUtil;
import org.apache.blur.utils.ForkJoin.Merger;
import org.apache.blur.utils.ForkJoin.ParallelCall;
+import org.apache.blur.utils.ShardUtil;
import org.apache.blur.zookeeper.WatchChildren;
import org.apache.blur.zookeeper.WatchChildren.OnChange;
import org.apache.blur.zookeeper.WatchNodeExistance;
@@ -1514,8 +1514,9 @@
throws BlurException, TException {
try {
BlurObject args = CommandUtil.toBlurObject(arguments);
+ CommandStatus originalCommandStatusObject = new CommandStatus(null, commandName, arguments, null, null);
Response response = _commandManager.execute(getTableContextFactory(), getLayoutFactory(), commandName,
- new ArgumentOverlay(args, _serDe));
+ new ArgumentOverlay(args, _serDe), originalCommandStatusObject);
return CommandUtil.fromObjectToThrift(response, _serDe);
} catch (Exception e) {
if (e instanceof org.apache.blur.command.TimeoutException) {
@@ -1659,44 +1660,6 @@
}
@Override
- public List<String> commandStatusList(int startingAt, short fetch, CommandStatusState state) throws BlurException,
- TException {
- throw new BException("Not Implemented");
- }
-
- @Override
- public CommandStatus commandStatus(String commandExecutionId) throws BlurException, TException {
- throw new BException("Not Implemented");
- }
-
- @Override
- public void commandCancel(String commandExecutionId) throws BlurException, TException {
- throw new BException("Not Implemented");
- }
-
- // @Override
- // public void bulkMutateStart(final String bulkId) throws BlurException,
- // TException {
- // String cluster = getCluster(table);
- // try {
- // scatter(cluster, new BlurCommand<Void>() {
- // @Override
- // public Void call(Client client) throws BlurException, TException {
- // client.bulkMutateStart(bulkId);
- // return null;
- // }
- // });
- // } catch (Exception e) {
- // LOG.error("Unknown error while trying to get start a bulk mutate [{0}] [{1}]",
- // e, bulkId);
- // if (e instanceof BlurException) {
- // throw (BlurException) e;
- // }
- // throw new BException(e.getMessage(), e);
- // }
- // }
- //
- @Override
public void bulkMutateAdd(final String bulkId, final RowMutation mutation) throws BlurException, TException {
try {
String table = mutation.getTable();
@@ -1873,4 +1836,137 @@
throw new BException("Unknown error while trying to validate indexes for table [{0}]", e, table);
}
}
+
+ @Override
+ public List<String> commandStatusList(int startingAt, short fetch) throws BlurException, TException {
+ try {
+ List<String> shardClusterList = shardClusterList();
+ SortedSet<String> result = new TreeSet<String>();
+ result.addAll(_commandManager.commandStatusList());
+ for (String cluster : shardClusterList) {
+ result.addAll(scatterGather(cluster, new BlurCommand<List<String>>() {
+ @Override
+ public List<String> call(Client client) throws BlurException, TException {
+ return client.commandStatusList(0, Short.MAX_VALUE);
+ }
+ }, new Merger<List<String>>() {
+ @Override
+ public List<String> merge(BlurExecutorCompletionService<List<String>> service) throws BlurException {
+ SortedSet<String> ids = new TreeSet<String>();
+ while (service.getRemainingCount() > 0) {
+ Future<List<String>> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true);
+ ids.addAll(service.getResultThrowException(future));
+ }
+ return new ArrayList<String>(ids);
+ }
+ }));
+ }
+ return new ArrayList<String>().subList(startingAt, fetch);
+ } catch (Exception e) {
+ throw new BException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public CommandStatus commandStatus(String commandExecutionId) throws BlurException, TException {
+ try {
+ List<String> shardClusterList = shardClusterList();
+ CommandStatus commandStatus = _commandManager.getCommandStatus(commandExecutionId);
+ for (String cluster : shardClusterList) {
+ CommandStatus cs = scatterGather(cluster, new BlurCommand<CommandStatus>() {
+ @Override
+ public CommandStatus call(Client client) throws BlurException, TException {
+ return client.commandStatus(commandExecutionId);
+ }
+ }, new Merger<CommandStatus>() {
+ @Override
+ public CommandStatus merge(BlurExecutorCompletionService<CommandStatus> service) throws BlurException {
+ CommandStatus commandStatus = null;
+ while (service.getRemainingCount() > 0) {
+ Future<CommandStatus> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true);
+ commandStatus = mergeCommandStatus(commandStatus, service.getResultThrowException(future));
+ }
+ return commandStatus;
+ }
+ });
+ commandStatus = mergeCommandStatus(commandStatus, cs);
+ }
+ return commandStatus;
+ } catch (Exception e) {
+ throw new BException(e.getMessage(), e);
+ }
+ }
+
+ private static CommandStatus mergeCommandStatus(CommandStatus cs1, CommandStatus cs2) {
+ if (cs1 == null && cs2 == null) {
+ return null;
+ } else if (cs1 == null) {
+ return cs2;
+ } else if (cs2 == null) {
+ return cs1;
+ } else {
+ Map<String, Map<CommandStatusState, Long>> serverStateMap1 = cs1.getServerStateMap();
+ Map<String, Map<CommandStatusState, Long>> serverStateMap2 = cs2.getServerStateMap();
+ Map<String, Map<CommandStatusState, Long>> merge = mergeServerStateMap(serverStateMap1, serverStateMap2);
+ return new CommandStatus(cs1.getExecutionId(), cs1.getCommandName(), cs1.getArguments(), merge, cs1.getUser());
+ }
+ }
+
+ private static Map<String, Map<CommandStatusState, Long>> mergeServerStateMap(
+ Map<String, Map<CommandStatusState, Long>> serverStateMap1,
+ Map<String, Map<CommandStatusState, Long>> serverStateMap2) {
+ Map<String, Map<CommandStatusState, Long>> result = new HashMap<String, Map<CommandStatusState, Long>>();
+ Set<String> keys = new HashSet<String>();
+ keys.addAll(serverStateMap1.keySet());
+ keys.addAll(serverStateMap2.keySet());
+ for (String key : keys) {
+ Map<CommandStatusState, Long> css1 = serverStateMap2.get(key);
+ Map<CommandStatusState, Long> css2 = serverStateMap2.get(key);
+ result.put(key, mergeCommandStatusState(css1, css2));
+ }
+ return result;
+ }
+
+ private static Map<CommandStatusState, Long> mergeCommandStatusState(Map<CommandStatusState, Long> css1,
+ Map<CommandStatusState, Long> css2) {
+ if (css1 == null && css2 == null) {
+ return new HashMap<CommandStatusState, Long>();
+ } else if (css1 == null) {
+ return css2;
+ } else if (css2 == null) {
+ return css1;
+ } else {
+ Map<CommandStatusState, Long> result = new HashMap<CommandStatusState, Long>(css1);
+ for (Entry<CommandStatusState, Long> e : css2.entrySet()) {
+ CommandStatusState key = e.getKey();
+ Long l = result.get(key);
+ Long value = e.getValue();
+ if (l == null) {
+ result.put(key, value);
+ } else {
+ result.put(key, l + value);
+ }
+ }
+ return result;
+ }
+ }
+
+ @Override
+ public void commandCancel(String commandExecutionId) throws BlurException, TException {
+ try {
+ List<String> shardClusterList = shardClusterList();
+ _commandManager.cancelCommand(commandExecutionId);
+ for (String cluster : shardClusterList) {
+ scatter(cluster, new BlurCommand<Void>() {
+ @Override
+ public Void call(Client client) throws BlurException, TException {
+ client.commandCancel(commandExecutionId);
+ return null;
+ }
+ });
+ }
+ } catch (Exception e) {
+ throw new BException(e.getMessage(), e);
+ }
+ }
}
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index 625ed26..08b4400 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -35,7 +35,6 @@
import org.apache.blur.command.ArgumentOverlay;
import org.apache.blur.command.BlurObject;
import org.apache.blur.command.BlurObjectSerDe;
-import org.apache.blur.command.CommandStatusStateEnum;
import org.apache.blur.command.CommandUtil;
import org.apache.blur.command.Response;
import org.apache.blur.command.ShardCommandManager;
@@ -59,7 +58,6 @@
import org.apache.blur.thrift.generated.BlurResults;
import org.apache.blur.thrift.generated.CommandDescriptor;
import org.apache.blur.thrift.generated.CommandStatus;
-import org.apache.blur.thrift.generated.CommandStatusState;
import org.apache.blur.thrift.generated.FetchResult;
import org.apache.blur.thrift.generated.HighlightOptions;
import org.apache.blur.thrift.generated.Query;
@@ -71,6 +69,7 @@
import org.apache.blur.thrift.generated.TableStats;
import org.apache.blur.thrift.generated.TimeoutException;
import org.apache.blur.thrift.generated.User;
+import org.apache.blur.user.UserContext;
import org.apache.blur.utils.BlurConstants;
import org.apache.blur.utils.BlurUtil;
import org.apache.blur.utils.QueryCache;
@@ -615,7 +614,10 @@
}
};
BlurObject args = CommandUtil.toBlurObject(arguments);
- Response response = _commandManager.execute(tableContextFactory, commandName, new ArgumentOverlay(args, _serDe));
+ User thriftUser = UserConverter.toThriftUser(UserContext.getUser());
+ CommandStatus originalCommandStatusObject = new CommandStatus(null, commandName, arguments, null, thriftUser);
+ Response response = _commandManager.execute(tableContextFactory, commandName, new ArgumentOverlay(args, _serDe),
+ originalCommandStatusObject);
return CommandUtil.fromObjectToThrift(response, _serDe);
} catch (Exception e) {
if (e instanceof org.apache.blur.command.TimeoutException) {
@@ -670,11 +672,10 @@
}
@Override
- public List<String> commandStatusList(int startingAt, short fetch, CommandStatusState state) throws BlurException,
- TException {
+ public List<String> commandStatusList(int startingAt, short fetch) throws BlurException, TException {
try {
- List<String> ids = _commandManager.commandStatusList(toCommandStatus(state));
- return ids.subList(startingAt, fetch);
+ List<String> ids = _commandManager.commandStatusList();
+ return ids.subList(startingAt, Math.min(ids.size(), fetch));
} catch (Exception e) {
throw new BException(e.getMessage(), e);
}
@@ -682,7 +683,11 @@
@Override
public CommandStatus commandStatus(String commandExecutionId) throws BlurException, TException {
- throw new BException("Not Implemented");
+ try {
+ return _commandManager.getCommandStatus(commandExecutionId);
+ } catch (Exception e) {
+ throw new BException(e.getMessage(), e);
+ }
}
@Override
@@ -694,10 +699,6 @@
}
}
- private CommandStatusStateEnum toCommandStatus(CommandStatusState state) {
- return CommandStatusStateEnum.valueOf(state.name());
- }
-
@Override
public void bulkMutateAdd(String bulkId, RowMutation rowMutation) throws BlurException, TException {
String table = rowMutation.getTable();
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
index a1c0f88..2df447c 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
@@ -110,7 +110,7 @@
Configuration config = BlurUtil.newHadoopConfiguration(configuration);
TableContext.setSystemBlurConfiguration(configuration);
TableContext.setSystemConfiguration(config);
-
+
Thread.setDefaultUncaughtExceptionHandler(new SimpleUncaughtExceptionHandler());
String bindAddress = configuration.get(BLUR_CONTROLLER_BIND_ADDRESS);
int configBindPort = configuration.getInt(BLUR_CONTROLLER_BIND_PORT, -1);
@@ -127,12 +127,11 @@
String nodeName = getNodeName(configuration, BLUR_CONTROLLER_HOSTNAME);
nodeName = nodeName + ":" + instanceBindPort;
configuration.set(BLUR_NODENAME, nodeName);
-
BlurQueryChecker queryChecker = new BlurQueryChecker(configuration);
final ZooKeeper zooKeeper = setupZookeeper(configuration, null);
-
+
final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(zooKeeper, configuration, config);
int timeout = configuration.getInt(BLUR_CONTROLLER_SHARD_CONNECTION_TIMEOUT, 60000);
@@ -157,7 +156,7 @@
}
final ControllerCommandManager controllerCommandManager = new ControllerCommandManager(tmpPath, commandPath,
numberOfControllerWorkerCommandThreads, numberOfControllerDriverCommandThreads, Connection.DEFAULT_TIMEOUT,
- config);
+ config, nodeName);
final BlurControllerServer controllerServer = new BlurControllerServer();
controllerServer.setClient(client);
@@ -187,7 +186,8 @@
Trace.setStorage(traceStorage);
Trace.setNodeName(nodeName);
- List<ServerSecurityFilter> serverSecurity = getServerSecurityList(configuration, ServerSecurityFilterFactory.ServerType.CONTROLLER);
+ List<ServerSecurityFilter> serverSecurity = getServerSecurityList(configuration,
+ ServerSecurityFilterFactory.ServerType.CONTROLLER);
Iface iface = BlurUtil.wrapFilteredBlurServer(configuration, controllerServer, false);
iface = ServerSecurityUtil.applySecurity(iface, serverSecurity, false);
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index b7f969c..0b4e290 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -88,6 +88,7 @@
import org.apache.blur.manager.indexserver.DistributedIndexServer;
import org.apache.blur.manager.indexserver.DistributedLayoutFactory;
import org.apache.blur.manager.indexserver.DistributedLayoutFactoryImpl;
+import org.apache.blur.manager.status.QueryStatusManager;
import org.apache.blur.memory.MemoryAllocationWatcher;
import org.apache.blur.memory.Watcher;
import org.apache.blur.metrics.JSONReporter;
@@ -252,9 +253,11 @@
}
};
+ QueryStatusManager statusManager = new QueryStatusManager(statusCleanupTimerDelay);
+
final IndexManager indexManager = new IndexManager(indexServer, clusterStatus, filterCache, maxHeapPerRowFetch,
- fetchCount, indexManagerThreadCount, mutateThreadCount, statusCleanupTimerDelay, facetThreadCount,
- deepPagingCache, memoryAllocationWatcher);
+ fetchCount, indexManagerThreadCount, mutateThreadCount, facetThreadCount, deepPagingCache,
+ memoryAllocationWatcher, statusManager);
File tmpPath = getTmpPath(configuration);
int numberOfShardWorkerCommandThreads = configuration.getInt(BLUR_SHARD_COMMAND_WORKER_THREADS, 16);
@@ -266,7 +269,8 @@
LOG.info("Command Path was not set.");
}
final ShardCommandManager commandManager = new ShardCommandManager(indexServer, tmpPath, commandPath,
- numberOfShardWorkerCommandThreads, numberOfShardDriverCommandThreads, Connection.DEFAULT_TIMEOUT, config);
+ numberOfShardWorkerCommandThreads, numberOfShardDriverCommandThreads, Connection.DEFAULT_TIMEOUT, config,
+ nodeName);
clusterStatus.registerActionOnTableStateChange(new Action() {
@Override
@@ -361,7 +365,7 @@
ThreadWatcher threadWatcher = ThreadWatcher.instance();
quietClose(streamServer, makeCloseable(hdfsKeyValueTimer), makeCloseable(indexImporterTimer),
makeCloseable(indexBulkTimer), blockCacheDirectoryFactory, commandManager, traceStorage, server,
- shardServer, indexManager, indexServer, threadWatcher, clusterStatus, zooKeeper, httpServer);
+ shardServer, indexManager, statusManager, indexServer, threadWatcher, clusterStatus, zooKeeper, httpServer);
}
};
server.setShutdown(shutdown);
diff --git a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
index 862c509..64bb2d0 100644
--- a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
@@ -44,6 +44,7 @@
import org.apache.blur.server.TableContextFactory;
import org.apache.blur.thrift.generated.Arguments;
import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.CommandStatus;
import org.apache.blur.thrift.generated.RowMutation;
import org.apache.blur.thrift.generated.ShardState;
import org.apache.blur.thrift.generated.TableDescriptor;
@@ -83,7 +84,7 @@
@Before
public void setup() throws IOException {
_config = new Configuration();
- _manager = new ShardCommandManager(getIndexServer(), null, null, 10, 10, 1000, _config);
+ _manager = new ShardCommandManager(getIndexServer(), null, null, 10, 10, 1000, _config, "test");
}
@After
@@ -136,12 +137,14 @@
output.close();
}
ShardCommandManager manager = new ShardCommandManager(getIndexServer(), _tmpPath, _commandPath, 10, 10, 1000,
- _config);
+ _config, "test");
{
BlurObject args = new BlurObject();
args.put("table", "test");
ArgumentOverlay argumentOverlay = new ArgumentOverlay(args, new BlurObjectSerDe());
- Response response = manager.execute(getTableContextFactory(), "test", argumentOverlay);
+ CommandStatus originalCommandStatusObject = new CommandStatus(null, "test", null, null, null);
+ Response response = manager.execute(getTableContextFactory(), "test", argumentOverlay,
+ originalCommandStatusObject);
Map<Shard, Object> shardResults = response.getShardResults();
for (Object o : shardResults.values()) {
assertEquals("test1", o);
@@ -163,7 +166,9 @@
BlurObject args = new BlurObject();
args.put("table", "test");
ArgumentOverlay argumentOverlay = new ArgumentOverlay(args, new BlurObjectSerDe());
- Response response = manager.execute(getTableContextFactory(), "test", argumentOverlay);
+ CommandStatus originalCommandStatusObject = new CommandStatus(null, "test", null, null, null);
+ Response response = manager.execute(getTableContextFactory(), "test", argumentOverlay,
+ originalCommandStatusObject);
Map<Shard, Object> shardResults = response.getShardResults();
for (Object o : shardResults.values()) {
assertEquals("test2", o);
@@ -205,7 +210,8 @@
try {
if (instanceExecutionId == null) {
TableContextFactory tableContextFactory = getTableContextFactory();
- response = _manager.execute(tableContextFactory, "wait", argumentOverlay);
+ CommandStatus originalCommandStatusObject = new CommandStatus(null, "test", null, null, null);
+ response = _manager.execute(tableContextFactory, "wait", argumentOverlay, originalCommandStatusObject);
} else {
response = _manager.reconnect(instanceExecutionId);
}
@@ -225,7 +231,8 @@
ArgumentOverlay argumentOverlay = new ArgumentOverlay(args, new BlurObjectSerDe());
TableContextFactory tableContextFactory = getTableContextFactory();
try {
- _manager.execute(tableContextFactory, "error", argumentOverlay);
+ CommandStatus originalCommandStatusObject = new CommandStatus(null, "test", null, null, null);
+ _manager.execute(tableContextFactory, "error", argumentOverlay, originalCommandStatusObject);
fail();
} catch (ExceptionCollector e) {
Throwable t = e.getCause();
@@ -263,7 +270,8 @@
try {
Response response;
if (instanceExecutionId == null) {
- response = _manager.execute(tableContextFactory, "wait", argumentOverlay);
+ CommandStatus originalCommandStatusObject = new CommandStatus(null, "test", null, null, null);
+ response = _manager.execute(tableContextFactory, "wait", argumentOverlay, originalCommandStatusObject);
} else {
response = _manager.reconnect(instanceExecutionId);
}
diff --git a/blur-core/src/test/java/org/apache/blur/command/WaitForSeconds.java b/blur-core/src/test/java/org/apache/blur/command/WaitForSeconds.java
index 3216d7f..338527b 100644
--- a/blur-core/src/test/java/org/apache/blur/command/WaitForSeconds.java
+++ b/blur-core/src/test/java/org/apache/blur/command/WaitForSeconds.java
@@ -21,14 +21,26 @@
import org.apache.blur.command.annotation.OptionalArgument;
import org.apache.blur.command.commandtype.IndexReadCommandSingleTable;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
public class WaitForSeconds extends IndexReadCommandSingleTable<Boolean> {
+ private static final Log LOG = LogFactory.getLog(WaitForSeconds.class);
+
+ public static void main(String[] args) throws IOException {
+ WaitForSeconds waitForSecond = new WaitForSeconds();
+ waitForSecond.setSeconds(30);
+ waitForSecond.setTable("test1");
+ waitForSecond.run("localhost:40010");
+ }
+
@OptionalArgument("The number of seconds to sleep, the default is 30 seconds.")
private int seconds = 30;
@Override
public Boolean execute(IndexContext context) throws IOException, InterruptedException {
+ LOG.info(Thread.currentThread().isInterrupted());
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
return true;
}
diff --git a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
index e5bd42d..893d65b 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
@@ -51,6 +51,7 @@
import org.apache.blur.manager.clusterstatus.ClusterStatus;
import org.apache.blur.manager.indexserver.LocalIndexServer;
import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.manager.status.QueryStatusManager;
import org.apache.blur.memory.MemoryAllocationWatcher;
import org.apache.blur.memory.Watcher;
import org.apache.blur.server.TableContext;
@@ -106,6 +107,8 @@
private IndexManager indexManager;
private File base;
+ private QueryStatusManager _statusManager;
+
@Before
public void setUp() throws BlurException, IOException, InterruptedException {
TableContext.clear();
@@ -128,8 +131,10 @@
BlurFilterCache filterCache = new DefaultBlurFilterCache(new BlurConfiguration());
long statusCleanupTimerDelay = 1000;
- indexManager = new IndexManager(server, getClusterStatus(tableDescriptor), filterCache, 10000000, 100, 1, 1,
- statusCleanupTimerDelay, 0, new DeepPagingCache(), NOTHING);
+ _statusManager = new QueryStatusManager(statusCleanupTimerDelay);
+
+ indexManager = new IndexManager(server, getClusterStatus(tableDescriptor), filterCache, 10000000, 100, 1, 1, 0,
+ new DeepPagingCache(), NOTHING, _statusManager);
setupData();
}
@@ -231,6 +236,7 @@
@After
public void teardown() {
if (indexManager != null) {
+ _statusManager.close();
indexManager.close();
indexManager = null;
}
diff --git a/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java b/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java
index 73ef9ed..29bb600 100644
--- a/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java
@@ -41,7 +41,6 @@
import org.apache.blur.thrift.generated.ColumnDefinition;
import org.apache.blur.thrift.generated.CommandDescriptor;
import org.apache.blur.thrift.generated.CommandStatus;
-import org.apache.blur.thrift.generated.CommandStatusState;
import org.apache.blur.thrift.generated.FetchResult;
import org.apache.blur.thrift.generated.Level;
import org.apache.blur.thrift.generated.Metric;
@@ -463,8 +462,7 @@
}
@Override
- public List<String> commandStatusList(int startingAt, short fetch, CommandStatusState state)
- throws BlurException, TException {
+ public List<String> commandStatusList(int startingAt, short fetch) throws BlurException, TException {
throw new RuntimeException("Not implemented.");
}