blob: 054b1935975368db058791102f3892f027e31636 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sentry.tests.e2e.minisentry;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.service.thrift.SentryService;
import org.apache.sentry.service.thrift.SentryServiceFactory;
import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
public class InternalSentrySrv implements SentrySrv {
public static class SentryServerContext implements ServerContext {
private long contextId;
public SentryServerContext(long contextId) {
this.contextId = contextId;
}
public long getContextId() {
return contextId;
}
}
/**
* Thrift even handler class to track client connections to Sentry service
*/
public static class SentryThriftEvenHandler implements TServerEventHandler {
// unique id for each client connection. We could see multiple simultaneous
// client connections, some make it thread safe.
private AtomicLong clientId = new AtomicLong();
// Lists of clientId currently connected
private List<Long> clientList = Lists.newArrayList();
/**
* Thrift callback when a new client is connecting
*/
@Override
public ServerContext createContext(TProtocol inputProto,
TProtocol outputProto) {
clientList.add(clientId.incrementAndGet());
LOGGER.info("Client Connected: " + clientId.get());
return new SentryServerContext(clientId.get());
}
/**
* Thrift callback when a client is disconnecting
*/
@Override
public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
clientList.remove(((SentryServerContext) arg0).getContextId());
LOGGER.info("Client Disonnected: "
+ ((SentryServerContext) arg0).getContextId());
}
@Override
public void preServe() {
}
@Override
public void processContext(ServerContext arg0, TTransport arg1,
TTransport arg2) {
}
public long getClientCount() {
return clientList.size();
}
public List<Long> getClienList() {
return clientList;
}
public long getClientId() {
return clientId.get();
}
}
private List<SentryService> sentryServers = Lists.newArrayList();
private static TestingServer zkServer; // created only if in case of HA
private static final Logger LOGGER = LoggerFactory
.getLogger(InternalSentrySrv.class);
private boolean isActive = false;
public InternalSentrySrv(Configuration sentryConf, int numServers)
throws Exception {
// Enable HA when numServers is more that 1, start Curator TestingServer
if (numServers > 1) {
zkServer = new TestingServer();
zkServer.start();
sentryConf.setBoolean(ServerConfig.SENTRY_HA_ENABLED, true);
sentryConf.set(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM,
zkServer.getConnectString());
} else if (numServers <= 0) {
throw new IllegalArgumentException("Invalid number of Servers: "
+ numServers + " ,must be > 0");
}
for (int count = 0; count < numServers; count++) {
Configuration servConf = new Configuration(sentryConf);
SentryService sentryServer = new SentryServiceFactory().create(servConf);
servConf.set(ClientConfig.SERVER_RPC_ADDRESS, sentryServer.getAddress()
.getHostName());
servConf.setInt(ClientConfig.SERVER_RPC_PORT, sentryServer.getAddress()
.getPort());
sentryServers.add(sentryServer);
}
isActive = true;
}
@Override
public void startAll() throws Exception {
if (!isActive) {
throw new IllegalStateException("SentrySrv is no longer active");
}
for (int sentryServerNum = 0; sentryServerNum < sentryServers.size(); sentryServerNum++) {
start(sentryServerNum);
}
}
@Override
public void start(int serverNum) throws Exception {
if (!isActive) {
throw new IllegalStateException("SentrySrv is no longer active");
}
SentryService sentryServer = sentryServers.get(serverNum);
sentryServer.start();
// wait for startup
final long start = System.currentTimeMillis();
while (!sentryServer.isRunning()) {
Thread.sleep(1000);
if (System.currentTimeMillis() - start > 60000L) {
throw new TimeoutException("Server did not start after 60 seconds");
}
}
sentryServer.setThriftEventHandler(new SentryThriftEvenHandler());
}
@Override
public void stopAll() throws Exception {
boolean cleanStop = true;
if (!isActive) {
throw new IllegalStateException("SentrySrv is no longer active");
}
for (int sentryServerNum = 0; sentryServerNum < sentryServers.size(); sentryServerNum++) {
try {
stop(sentryServerNum);
} catch (Exception e) {
LOGGER.error("Sentry Server " + sentryServerNum + " failed to stop");
cleanStop = false;
}
}
if (!cleanStop) {
throw new IllegalStateException(
"At least one of the servers failed to stop cleanly");
}
}
@Override
public void stop(int serverNum) throws Exception {
if (!isActive) {
throw new IllegalStateException("SentrySrv is no longer active");
}
SentryService sentryServer = sentryServers.get(serverNum);
sentryServer.stop();
}
@Override
public void close() {
for (SentryService sentryServer : sentryServers) {
try {
sentryServer.stop();
} catch (Exception e) {
LOGGER.error("Error stoping Sentry service ", e);
}
}
if (zkServer != null) {
try {
zkServer.stop();
} catch (IOException e) {
LOGGER.warn("Error stoping ZK service ", e);
}
}
sentryServers.clear();
isActive = false;
}
@Override
public SentryService get(int serverNum) {
return sentryServers.get(serverNum);
}
@Override
public String getZKQuorum() throws Exception {
if (zkServer == null) {
throw new IOException("Sentry HA is not enabled");
}
return zkServer.getConnectString();
}
@Override
public boolean isHaEnabled() {
return zkServer != null;
}
@Override
public long getNumActiveClients(int serverNum) {
SentryThriftEvenHandler thriftHandler = (SentryThriftEvenHandler) get(
serverNum).getThriftEventHandler();
LOGGER.warn("Total clients: " + thriftHandler.getClientId());
for (Long clientId: thriftHandler.getClienList()) {
LOGGER.warn("Got clients: " + clientId);
}
return thriftHandler.getClientCount();
}
@Override
public long getNumActiveClients() {
long numClients = 0;
for (int sentryServerNum = 0; sentryServerNum < sentryServers.size(); sentryServerNum++) {
numClients += getNumActiveClients(sentryServerNum);
}
return numClients;
}
@Override
public long getTotalClients() {
long totalClients = 0;
for (int sentryServerNum = 0; sentryServerNum < sentryServers.size(); sentryServerNum++) {
totalClients += getTotalClients(sentryServerNum);
}
return totalClients;
}
@Override
public long getTotalClients(int serverNum) {
SentryThriftEvenHandler thriftHandler = (SentryThriftEvenHandler) get(
serverNum).getThriftEventHandler();
return thriftHandler.getClientId();
}
}