blob: 0532179cfb40c5281fb381912b7431baafa86aa1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.clientImpl;
import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.clientImpl.thrift.ClientService;
import org.apache.accumulo.core.clientImpl.thrift.ClientService.Client;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
import org.apache.accumulo.fate.zookeeper.ServiceLock;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.TServiceClientFactory;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerClient {
private static final Logger log = LoggerFactory.getLogger(ServerClient.class);
public static <T> T execute(ClientContext context, ClientExecReturn<T,ClientService.Client> exec)
throws AccumuloException, AccumuloSecurityException {
return execute(context, new ClientService.Client.Factory(), exec);
}
public static <CT extends TServiceClient,RT> RT execute(ClientContext context,
TServiceClientFactory<CT> factory, ClientExecReturn<RT,CT> exec)
throws AccumuloException, AccumuloSecurityException {
try {
return executeRaw(context, factory, exec);
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e.user, e.code, e);
} catch (AccumuloException e) {
throw e;
} catch (Exception e) {
throw new AccumuloException(e);
}
}
public static void executeVoid(ClientContext context, ClientExec<ClientService.Client> exec)
throws AccumuloException, AccumuloSecurityException {
try {
executeRawVoid(context, exec);
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e.user, e.code, e);
} catch (AccumuloException e) {
throw e;
} catch (Exception e) {
throw new AccumuloException(e);
}
}
public static <T> T executeRaw(ClientContext context,
ClientExecReturn<T,ClientService.Client> exec) throws Exception {
return executeRaw(context, new ClientService.Client.Factory(), exec);
}
public static <CT extends TServiceClient,RT> RT executeRaw(ClientContext context,
TServiceClientFactory<CT> factory, ClientExecReturn<RT,CT> exec) throws Exception {
while (true) {
CT client = null;
String server = null;
try {
Pair<String,CT> pair = ServerClient.getConnection(context, factory, true);
server = pair.getFirst();
client = pair.getSecond();
return exec.execute(client);
} catch (TApplicationException tae) {
throw new AccumuloServerException(server, tae);
} catch (TTransportException tte) {
log.debug("ClientService request failed " + server + ", retrying ... ", tte);
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
if (client != null)
ServerClient.close(client);
}
}
}
public static void executeRawVoid(ClientContext context, ClientExec<ClientService.Client> exec)
throws Exception {
while (true) {
ClientService.Client client = null;
String server = null;
try {
Pair<String,Client> pair =
ServerClient.getConnection(context, new ClientService.Client.Factory(), true);
server = pair.getFirst();
client = pair.getSecond();
exec.execute(client);
break;
} catch (TApplicationException tae) {
throw new AccumuloServerException(server, tae);
} catch (TTransportException tte) {
log.debug("ClientService request failed " + server + ", retrying ... ", tte);
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} finally {
if (client != null)
ServerClient.close(client);
}
}
}
static volatile boolean warnedAboutTServersBeingDown = false;
public static <CT extends TServiceClient> Pair<String,CT> getConnection(ClientContext context,
TServiceClientFactory<CT> factory, boolean preferCachedConnections)
throws TTransportException {
checkArgument(context != null, "context is null");
long rpcTimeout = context.getClientTimeoutInMillis();
// create list of servers
ArrayList<ThriftTransportKey> servers = new ArrayList<>();
// add tservers
ZooCache zc = context.getZooCache();
for (String tserver : zc.getChildren(context.getZooKeeperRoot() + Constants.ZTSERVERS)) {
var zLocPath =
ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS + "/" + tserver);
byte[] data = zc.getLockData(zLocPath);
if (data != null) {
String strData = new String(data, UTF_8);
if (!strData.equals("manager"))
servers.add(new ThriftTransportKey(
new ServerServices(strData).getAddress(Service.TSERV_CLIENT), rpcTimeout, context));
}
}
boolean opened = false;
try {
Pair<String,TTransport> pair =
ThriftTransportPool.getInstance().getAnyTransport(servers, preferCachedConnections);
CT client = ThriftUtil.createClient(factory, pair.getSecond());
opened = true;
warnedAboutTServersBeingDown = false;
return new Pair<>(pair.getFirst(), client);
} finally {
if (!opened) {
if (!warnedAboutTServersBeingDown) {
if (servers.isEmpty()) {
log.warn("There are no tablet servers: check that zookeeper and accumulo are running.");
} else {
log.warn("Failed to find an available server in the list of servers: {}", servers);
}
warnedAboutTServersBeingDown = true;
}
}
}
}
public static void close(TServiceClient client) {
if (client != null && client.getInputProtocol() != null
&& client.getInputProtocol().getTransport() != null) {
ThriftTransportPool.getInstance().returnTransport(client.getInputProtocol().getTransport());
} else {
log.debug("Attempt to close null connection to a server", new Exception());
}
}
}