blob: b7575e3f471fa25b375664b6ef0e3ddb325dcac9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.proxy;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.Properties;
import org.apache.accumulo.core.cli.Help;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.clientImpl.ClientConfConverter;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.proxy.thrift.AccumuloProxy;
import org.apache.accumulo.server.metrics.Metrics;
import org.apache.accumulo.server.rpc.SaslServerConnectionParams;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.rpc.TimedProcessor;
import org.apache.accumulo.server.rpc.UGIAssumingProcessor;
import org.apache.accumulo.start.spi.KeywordExecutable;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
import com.google.auto.service.AutoService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@AutoService(KeywordExecutable.class)
public class Proxy implements KeywordExecutable {
private static final Logger log = LoggerFactory.getLogger(Proxy.class);
public static final String USE_MINI_ACCUMULO_KEY = "useMiniAccumulo";
public static final String USE_MINI_ACCUMULO_DEFAULT = "false";
public static final String THRIFT_THREAD_POOL_SIZE_KEY = "numThreads";
// Default number of threads from THsHaServer.Args
public static final String THRIFT_THREAD_POOL_SIZE_DEFAULT = "5";
public static final String THRIFT_MAX_FRAME_SIZE_KEY = "maxFrameSize";
public static final String THRIFT_MAX_FRAME_SIZE_DEFAULT = "16M";
// Type of thrift server to create
public static final String THRIFT_SERVER_TYPE = "thriftServerType";
public static final String THRIFT_SERVER_TYPE_DEFAULT = "";
public static final ThriftServerType DEFAULT_SERVER_TYPE = ThriftServerType.getDefault();
public static final String THRIFT_SERVER_HOSTNAME = "thriftServerHostname";
public static final String THRIFT_SERVER_HOSTNAME_DEFAULT = "0.0.0.0";
public static class PropertiesConverter implements IStringConverter<Properties> {
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN",
justification = "app is run in same security context as user providing the filename")
@Override
public Properties convert(String fileName) {
Properties prop = new Properties();
InputStream is;
try {
is = new FileInputStream(fileName);
try {
prop.load(is);
} finally {
is.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return prop;
}
}
public static class Opts extends Help {
@Parameter(names = "-p", description = "proxy.properties path",
converter = PropertiesConverter.class)
Properties proxyProps;
@Parameter(names = "-c", description = "accumulo-client.properties path",
converter = PropertiesConverter.class)
Properties clientProps;
}
@Override
public String keyword() {
return "proxy";
}
@Override
public UsageGroup usageGroup() {
return UsageGroup.PROCESS;
}
@Override
public String description() {
return "Starts Accumulo proxy";
}
@SuppressFBWarnings(value = "DM_EXIT", justification = "System.exit() from a main class is okay")
@Override
public void execute(final String[] args) throws Exception {
Opts opts = new Opts();
opts.parseArgs(Proxy.class.getName(), args);
Properties proxyProps = opts.proxyProps;
Properties clientProps = opts.clientProps;
boolean useMini = Boolean
.parseBoolean(proxyProps.getProperty(USE_MINI_ACCUMULO_KEY, USE_MINI_ACCUMULO_DEFAULT));
if (!proxyProps.containsKey("port")) {
System.err.println("No port property");
System.exit(1);
}
if (useMini) {
log.info("Creating mini cluster");
final File folder = Files.createTempDirectory(System.currentTimeMillis() + "").toFile();
final MiniAccumuloCluster accumulo = new MiniAccumuloCluster(folder, "secret");
accumulo.start();
clientProps = accumulo.getClientProperties();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
accumulo.stop();
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
} finally {
if (!folder.delete()) {
log.warn("Unexpected error removing {}", folder);
}
}
}));
} else if (clientProps == null) {
System.err.println("The '-c' option must be set with an accumulo-client.properties file or"
+ " proxy.properties must contain either useMiniAccumulo=true");
System.exit(1);
}
Class<? extends TProtocolFactory> protoFactoryClass = Class
.forName(
proxyProps.getProperty("protocolFactory", TCompactProtocol.Factory.class.getName()))
.asSubclass(TProtocolFactory.class);
TProtocolFactory protoFactory = protoFactoryClass.newInstance();
int port = Integer.parseInt(proxyProps.getProperty("port"));
String hostname = proxyProps.getProperty(THRIFT_SERVER_HOSTNAME,
THRIFT_SERVER_HOSTNAME_DEFAULT);
HostAndPort address = HostAndPort.fromParts(hostname, port);
proxyProps.putAll(clientProps);
ServerAddress server = createProxyServer(address, protoFactory, proxyProps);
// Wait for the server to come up
while (!server.server.isServing()) {
Thread.sleep(100);
}
log.info("Proxy server started on {}", server.getAddress());
while (server.server.isServing()) {
Thread.sleep(1000);
}
}
public static void main(String[] args) throws Exception {
new Proxy().execute(args);
}
public static ServerAddress createProxyServer(HostAndPort address,
TProtocolFactory protocolFactory, Properties props) throws Exception {
final int numThreads = Integer
.parseInt(props.getProperty(THRIFT_THREAD_POOL_SIZE_KEY, THRIFT_THREAD_POOL_SIZE_DEFAULT));
final long maxFrameSize = ConfigurationTypeHelper.getFixedMemoryAsBytes(
props.getProperty(THRIFT_MAX_FRAME_SIZE_KEY, THRIFT_MAX_FRAME_SIZE_DEFAULT));
final int simpleTimerThreadpoolSize = Integer
.parseInt(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE.getDefaultValue());
// How frequently to try to resize the thread pool
final long threadpoolResizeInterval = 1000L * 5;
// No timeout
final long serverSocketTimeout = 0L;
// Use the new hadoop metrics2 support
final String serverName = "Proxy", threadName = "Accumulo Thrift Proxy";
// create the implementation of the proxy interface
ProxyServer impl = new ProxyServer(props);
// Wrap the implementation -- translate some exceptions
AccumuloProxy.Iface wrappedImpl = TraceUtil.wrapService(impl);
// Create the processor from the implementation
TProcessor processor = new AccumuloProxy.Processor<>(wrappedImpl);
// Get the type of thrift server to instantiate
final String serverTypeStr = props.getProperty(THRIFT_SERVER_TYPE, THRIFT_SERVER_TYPE_DEFAULT);
ThriftServerType serverType = DEFAULT_SERVER_TYPE;
if (!THRIFT_SERVER_TYPE_DEFAULT.equals(serverTypeStr)) {
serverType = ThriftServerType.get(serverTypeStr);
}
SslConnectionParams sslParams = null;
SaslServerConnectionParams saslParams = null;
switch (serverType) {
case SSL:
sslParams = SslConnectionParams.forClient(ClientConfConverter.toAccumuloConf(props));
break;
case SASL:
if (!ClientProperty.SASL_ENABLED.getBoolean(props)) {
throw new IllegalStateException("SASL thrift server was requested but 'sasl.enabled' is"
+ " not set to true in configuration");
}
// Kerberos needs to be enabled to use it
if (!UserGroupInformation.isSecurityEnabled()) {
throw new IllegalStateException("Hadoop security is not enabled");
}
// Login via principal and keytab
final String kerberosPrincipal = ClientProperty.AUTH_PRINCIPAL.getValue(props);
final AuthenticationToken authToken = ClientProperty.getAuthenticationToken(props);
if (!(authToken instanceof KerberosToken)) {
throw new IllegalStateException("Kerberos authentication must be used with SASL");
}
final KerberosToken kerberosToken = (KerberosToken) authToken;
final String kerberosKeytab = kerberosToken.getKeytab().getAbsolutePath();
if (StringUtils.isBlank(kerberosPrincipal) || StringUtils.isBlank(kerberosKeytab)) {
throw new IllegalStateException(
String.format("Kerberos principal '%s' and keytab '%s'" + " must be provided",
kerberosPrincipal, kerberosKeytab));
}
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytab);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
log.info("Logged in as {}", ugi.getUserName());
// The kerberosPrimary set in the SASL server needs to match the principal we're logged in
// as.
final String shortName = ugi.getShortUserName();
log.info("Setting server primary to {}", shortName);
props.setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY.getKey(), shortName);
KerberosToken token = new KerberosToken();
saslParams = new SaslServerConnectionParams(props, token, null);
processor = new UGIAssumingProcessor(processor);
break;
default:
// nothing to do -- no extra configuration necessary
break;
}
// Hook up support for tracing for thrift calls
TimedProcessor timedProcessor = new TimedProcessor(
Metrics.initSystem(Proxy.class.getSimpleName()), processor, serverName, threadName);
// Create the thrift server with our processor and properties
return TServerUtils.startTServer(serverType, timedProcessor, protocolFactory, serverName,
threadName, numThreads, simpleTimerThreadpoolSize, threadpoolResizeInterval, maxFrameSize,
sslParams, saslParams, serverSocketTimeout, address);
}
}