blob: 59f5b7cba81d5797c6c4f4b91d508114a107fa31 [file] [log] [blame]
package org.apache.blur.command;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.blur.BlurConfiguration;
import org.apache.blur.log.Log;
import org.apache.blur.log.LogFactory;
import org.apache.blur.server.LayoutFactory;
import org.apache.blur.server.TableContext;
import org.apache.blur.server.TableContextFactory;
import org.apache.blur.thirdparty.thrift_0_9_0.TException;
import org.apache.blur.thrift.BlurClientManager;
import org.apache.blur.thrift.ClientPool;
import org.apache.blur.thrift.Connection;
import org.apache.blur.thrift.UserConverter;
import org.apache.blur.thrift.generated.Arguments;
import org.apache.blur.thrift.generated.Blur.Client;
import org.apache.blur.thrift.generated.BlurException;
import org.apache.blur.thrift.generated.CommandStatus;
import org.apache.blur.thrift.generated.Response;
import org.apache.blur.thrift.generated.TimeoutException;
import org.apache.blur.thrift.generated.ValueObject;
import org.apache.blur.trace.Tracer;
import org.apache.blur.user.UserContext;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
public class ControllerClusterContext extends ClusterContext implements Closeable {
private final static Log LOG = LogFactory.getLog(ControllerClusterContext.class);
private final TableContextFactory _tableContextFactory;
private final Map<Server, ClientWithConnection> _clientMap;
private final ControllerCommandManager _manager;
private final LayoutFactory _layoutFactory;
private final BlurObjectSerDe _serDe = new BlurObjectSerDe();
static class ClientWithConnection {
final Client _client;
final Connection _connection;
ClientWithConnection(Client client, Connection connection) {
_client = client;
_connection = connection;
}
}
public ControllerClusterContext(TableContextFactory tableContextFactory, LayoutFactory layoutFactory,
ControllerCommandManager manager) throws IOException {
_tableContextFactory = tableContextFactory;
_clientMap = getBlurClientsForCluster(layoutFactory.getServerConnections());
_manager = manager;
_layoutFactory = layoutFactory;
}
private Map<Server, ClientWithConnection> getBlurClientsForCluster(Set<Connection> serverConnections)
throws IOException {
Map<Server, ClientWithConnection> clients = new HashMap<Server, ClientWithConnection>();
for (Connection serverConnection : serverConnections) {
try {
Client client = BlurClientManager.getClientPool().getClient(serverConnection);
client.refresh();
ClientWithConnection clientWithConnection = new ClientWithConnection(client, serverConnection);
clients.put(new Server(serverConnection.getHost() + ":" + serverConnection.getPort()), clientWithConnection);
} catch (TException e) {
throw new IOException(e);
}
}
return clients;
}
@Override
public TableContext getTableContext(String table) throws IOException {
return _tableContextFactory.getTableContext(table);
}
@Override
public <T> Map<Shard, T> readIndexes(IndexRead<T> command) throws IOException {
Map<Shard, Future<T>> futures = readIndexesAsync(command);
Map<Shard, T> result = new HashMap<Shard, T>();
return processFutures((Command<?>) command, futures, result);
}
@Override
public <T> Map<Server, T> readServers(ServerRead<?, T> command) throws IOException {
Map<Server, Future<T>> futures = readServersAsync(command);
Map<Server, T> result = new HashMap<Server, T>();
return processFutures((Command<?>) command, futures, result);
}
@Override
public void close() throws IOException {
ClientPool clientPool = BlurClientManager.getClientPool();
Collection<ClientWithConnection> values = _clientMap.values();
_clientMap.clear();
for (ClientWithConnection clientWithConnection : values) {
clientPool.returnClient(clientWithConnection._connection, clientWithConnection._client);
}
}
@SuppressWarnings("unchecked")
@Override
public <T> Map<Shard, Future<T>> readIndexesAsync(IndexRead<T> cmd) throws IOException {
final Command<?> command = (Command<?>) cmd;
_manager.validate(command);
Map<Shard, Future<T>> futureMap = new HashMap<Shard, Future<T>>();
Set<String> tables = command.routeTables(this);
Set<Shard> shards = command.routeShards(this, tables);
Map<Server, Client> clientMap = getClientMap(command, tables, shards);
final Arguments arguments = _manager.toArguments(command);
CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, UserConverter.toThriftUser(UserContext.getUser()));
for (Entry<Server, Client> e : clientMap.entrySet()) {
Server server = e.getKey();
final Client client = e.getValue();
Future<Map<Shard, T>> future = _manager.submitToExecutorService(new Callable<Map<Shard, T>>() {
@Override
public Map<Shard, T> call() throws Exception {
Response response = waitForResponse(client, command, arguments);
Map<Shard, Object> shardToThriftValue = CommandUtil.fromThriftToObjectShard(response.getShardToValue());
Map<Shard, Object> shardToValue = CommandUtil.fromThriftSupportedObjects(shardToThriftValue, _serDe);
return (Map<Shard, T>) shardToValue;
}
}, command, originalCommandStatusObject, new AtomicBoolean(true));
for (Shard shard : getShardsOnServer(server, tables, shards)) {
futureMap.put(shard, new ShardResultFuture<T>(shard, future));
}
}
return futureMap;
}
private Map<Server, Client> getClientMap(Command<?> command, Set<String> tables, Set<Shard> shards)
throws IOException {
Map<Server, Client> result = new HashMap<Server, Client>();
for (Entry<Server, ClientWithConnection> e : _clientMap.entrySet()) {
Server server = e.getKey();
if (_layoutFactory.isValidServer(server, tables, shards)) {
result.put(server, e.getValue()._client);
}
}
return result;
}
protected static Response waitForResponse(Client client, Command<?> command, Arguments arguments) throws TException {
// TODO This should likely be changed to run of a AtomicBoolean used for
// the status of commands.
Long executionId = null;
while (true) {
Tracer tracer = BlurClientManager.setupClientPreCall(client);
try {
if (executionId == null) {
return client.execute(command.getName(), arguments);
} else {
return client.reconnect(executionId);
}
} catch (BlurException e) {
throw e;
} catch (TimeoutException e) {
executionId = e.getInstanceExecutionId();
LOG.info("Execution fetch timed out, reconnecting using [{0}].", executionId);
} catch (TException e) {
throw e;
} finally {
if (tracer != null) {
tracer.done();
}
}
}
}
private Set<Shard> getShardsOnServer(Server server, Set<String> tables, Set<Shard> shards) throws IOException {
Set<Shard> serverLayout = _layoutFactory.getServerLayout(server);
Set<Shard> result = new HashSet<Shard>();
for (Shard shard : serverLayout) {
if (isValid(shard, tables, shards)) {
result.add(shard);
}
}
return result;
}
private boolean isValid(Shard shard, Set<String> tables, Set<Shard> shards) {
String table = shard.getTable();
if (!tables.contains(table)) {
return false;
}
if (shards.isEmpty()) {
return true;
} else {
return shards.contains(shard);
}
}
@SuppressWarnings("unchecked")
@Override
public <T> Map<Server, Future<T>> readServersAsync(ServerRead<?, T> cmd) throws IOException {
final Command<?> command = (Command<?>) cmd;
_manager.validate(command);
Map<Server, Future<T>> futureMap = new HashMap<Server, Future<T>>();
Set<String> tables = command.routeTables(this);
Set<Shard> shards = command.routeShards(this, tables);
Map<Server, Client> clientMap = getClientMap(command, tables, shards);
final Arguments arguments = _manager.toArguments(command);
CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, UserConverter.toThriftUser(UserContext.getUser()));
for (Entry<Server, Client> e : clientMap.entrySet()) {
Server server = e.getKey();
final Client client = e.getValue();
Future<T> future = _manager.submitToExecutorService(new Callable<T>() {
@Override
public T call() throws Exception {
Response response = waitForResponse(client, command, arguments);
ValueObject valueObject = response.getValue();
Object thriftObject = CommandUtil.toObject(valueObject);
return (T) _serDe.fromSupportedThriftObject(thriftObject);
}
}, command, originalCommandStatusObject, new AtomicBoolean(true));
futureMap.put(server, future);
}
return futureMap;
}
private <K, T> Map<K, T> processFutures(Command<?> command, Map<K, Future<T>> futures, Map<K, T> result)
throws IOException {
Throwable firstError = null;
for (Entry<K, Future<T>> e : futures.entrySet()) {
K key = e.getKey();
Future<T> future = e.getValue();
T value;
try {
value = future.get();
result.put(key, value);
} catch (InterruptedException ex) {
throw new IOException(ex);
} catch (ExecutionException ex) {
Throwable cause = ex.getCause();
if (firstError == null) {
firstError = cause;
}
LOG.error("Unknown call while executing command [{0}] on server or shard [{1}]", command, key);
}
}
if (firstError != null) {
throw new IOException(firstError);
}
return result;
}
@Override
public BlurConfiguration getBlurConfiguration(String table) throws IOException {
return _tableContextFactory.getTableContext(table).getBlurConfiguration();
}
@Override
public <T> T readIndex(IndexRead<T> command) throws IOException {
Future<T> future = readIndexAsync(command);
try {
return future.get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
@Override
public <T> Future<T> readIndexAsync(IndexRead<T> command) throws IOException {
throw new RuntimeException("Not Implemented.");
}
}