blob: 5f4a6999058e1a69e5a512528b33d3b5bc5cfbf3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lens.server;
import static org.apache.lens.server.api.LensConfConstants.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Constructor;
import java.util.*;
import java.util.concurrent.*;
import org.apache.lens.api.error.ErrorCollection;
import org.apache.lens.api.error.ErrorCollectionFactory;
import org.apache.lens.server.api.LensConfConstants;
import org.apache.lens.server.api.ServiceProvider;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.events.LensEventService;
import org.apache.lens.server.api.metrics.MetricsService;
import org.apache.lens.server.api.util.LensUtil;
import org.apache.lens.server.metrics.MetricsServiceImpl;
import org.apache.lens.server.model.LogSegregationContext;
import org.apache.lens.server.model.MappedDiagnosticLogSegregationContext;
import org.apache.lens.server.session.LensSessionImpl;
import org.apache.lens.server.stats.StatisticsService;
import org.apache.lens.server.user.UserConfigLoaderFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.CompositeService;
import org.apache.hive.service.Service;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* Manage lifecycle of all Lens services
*/
@Slf4j
public class LensServices extends CompositeService implements ServiceProvider {
/** The Constant LENS_SERVICES_NAME. */
public static final String LENS_SERVICES_NAME = "lens_services";
/** Constant for FileSystem auto close on shutdown config */
private static final String FS_AUTOMATIC_CLOSE = "fs.automatic.close";
private static final String FS_IO_FILE_BUFFER_SIZE = "io.file.buffer.size";
/** The instance. */
private static LensServices instance = new LensServices(LENS_SERVICES_NAME,
new MappedDiagnosticLogSegregationContext());
/** The conf. */
private HiveConf conf;
/** The cli service. */
private CLIService cliService;
/** The services. */
private final Map<String, Service> services = new LinkedHashMap<String, Service>();
/** The lens services. */
private final List<BaseLensService> lensServices = new ArrayList<BaseLensService>();
/** The persist dir. */
private Path persistDir;
/** The persistence file system. */
private FileSystem persistenceFS;
/** The stopping. */
private boolean stopping = false;
/**
* The metrics service.
*/
private MetricsService metricsService;
/**
* The Constant SERVER_STATE_PERSISTENCE_ERRORS.
*/
public static final String SERVER_STATE_PERSISTENCE_ERRORS = "total-server-state-persistence-errors";
/**
* The Constant SERVER_STATE_PERSISTENCE_ERRORS.
*/
public static final String KDC_LOGIN_ERRORS = "total-kdc-login-errors";
/** The service mode. */
@Getter
@Setter
private ServiceMode serviceMode;
/** Scheduled Executor which persists the server state periodically*/
private ScheduledExecutorService serverSnapshotScheduler;
/** Scheduled Executor to refresh kerberos tgt*/
private ScheduledExecutorService kerberosTgtScheduler;
/* Lock for synchronizing persistence of LensServices state */
private final Object statePersistenceLock = new Object();
@Getter
private ErrorCollection errorCollection;
private boolean isServerStatePersistenceEnabled;
private long serverStatePersistenceInterval;
private long serverKdcLoginInterval;
@Getter
private final LogSegregationContext logSegregationContext;
/**
* Incr counter.
*
* @param counter the counter
*/
private void incrCounter(String counter) {
getMetricService().incrCounter(LensServices.class, counter);
}
/**
* Gets counter value.
*
* @param counter the counter
*/
private long getCounter(String counter) {
return getMetricService().getCounter(LensServices.class, counter);
}
public static LensException processLensException(LensException exc) {
if (exc != null) {
exc.buildLensErrorTO(get().getErrorCollection());
}
return exc;
}
/**
* The Enum ServiceMode.
*/
public enum ServiceMode {
/** The read only. */
READ_ONLY, // All requests on sesssion resource and Only GET requests on all other resources
/** The metastore readonly. */
METASTORE_READONLY, // Only GET requests on metastore service and
// all other requests on other services are accepted
/** The metastore nodrop. */
METASTORE_NODROP, // DELETE requests on metastore are not accepted
/** The open. */
OPEN // All requests are accepted
}
/**
* Instantiates a new lens services.
*
* @param name the name
*/
public LensServices(String name, @NonNull final LogSegregationContext logSegregationContext) {
super(name);
this.logSegregationContext = logSegregationContext;
}
// This is only for test, to simulate a restart of the server
public static void setInstance(LensServices newInstance) {
instance = newInstance;
}
/*
* (non-Javadoc)
*
* @see org.apache.hive.service.CompositeService#init(org.apache.hadoop.hive.conf.HiveConf)
*/
@SuppressWarnings("unchecked")
@Override
public synchronized void init(HiveConf hiveConf) {
if (getServiceState() == STATE.NOTINITED) {
initializeErrorCollection();
conf = hiveConf;
conf.setVar(HiveConf.ConfVars.HIVE_SESSION_IMPL_CLASSNAME, LensSessionImpl.class.getCanonicalName());
serviceMode = conf.getEnum(SERVER_MODE,
ServiceMode.valueOf(DEFAULT_SERVER_MODE));
cliService = new CLIService(null);
UserConfigLoaderFactory.init(conf);
// Add default services
addService(cliService);
addService(new EventServiceImpl(LensEventService.NAME));
addService(new MetricsServiceImpl(MetricsService.NAME));
addService(new StatisticsService(StatisticsService.STATS_SVC_NAME));
// Add configured services, these are instances of LensService which need a CLIService instance
// for session management
String[] serviceNames = conf.getStrings(SERVICE_NAMES);
for (String sName : serviceNames) {
try {
String serviceClassName = conf.get(getServiceImplConfKey(sName));
if (StringUtils.isBlank(serviceClassName)) {
log.warn("Invalid class for service {} class={}", sName, serviceClassName);
continue;
}
Class<?> cls = Class.forName(serviceClassName);
if (BaseLensService.class.isAssignableFrom(cls)) {
Class<? extends BaseLensService> serviceClass = (Class<? extends BaseLensService>) cls;
log.info("Adding {} service with {}", sName, serviceClass);
Constructor<?> constructor = serviceClass.getConstructor(CLIService.class);
BaseLensService service = (BaseLensService) constructor.newInstance(cliService);
addService(service);
lensServices.add(service);
} else if (Service.class.isAssignableFrom(cls)) {
Class<? extends Service> serviceClass = (Class<? extends Service>) cls;
// Assuming default constructor
Service svc = serviceClass.newInstance();
addService(svc);
} else {
log.warn("Unsupported service class {} for service {}", serviceClassName, sName);
}
} catch (Exception e) {
log.warn("Could not add service:{}", sName, e);
throw new RuntimeException("Could not add service:" + sName, e);
}
}
for (Service svc : getServices()) {
services.put(svc.getName(), svc);
}
// This will init all services in the order they were added
super.init(conf);
// setup persisted state
isServerStatePersistenceEnabled = conf.getBoolean(SERVER_STATE_PERSISTENCE_ENABLED,
DEFAULT_SERVER_STATE_PERSISTENCE_ENABLED);
if (isServerStatePersistenceEnabled) {
String persistPathStr = conf.get(SERVER_STATE_PERSIST_LOCATION,
DEFAULT_SERVER_STATE_PERSIST_LOCATION);
persistDir = new Path(persistPathStr);
try {
Configuration configuration = new Configuration(conf);
configuration.setBoolean(FS_AUTOMATIC_CLOSE, false);
int outStreamBufferSize = conf.getInt(STATE_PERSIST_OUT_STREAM_BUFF_SIZE,
DEFAULT_STATE_PERSIST_OUT_STREAM_BUFF_SIZE);
configuration.setInt(FS_IO_FILE_BUFFER_SIZE, outStreamBufferSize);
log.info("STATE_PERSIST_OUT_STREAM_BUFF_SIZE IN BYTES:{}", outStreamBufferSize);
persistenceFS = FileSystem.newInstance(persistDir.toUri(), configuration);
setupPersistedState();
} catch (Exception e) {
log.error("Could not recover from persisted state", e);
throw new RuntimeException("Could not recover from persisted state", e);
}
serverStatePersistenceInterval = conf.getLong(SERVER_STATE_PERSISTENCE_INTERVAL_MILLIS,
DEFAULT_SERVER_STATE_PERSISTENCE_INTERVAL_MILLIS);
}
log.info("Initialized services: {}", services.keySet().toString());
}
}
/**
* Setup KDC logint thread.
*
*/
private void enableKDCLoginThread() {
try {
LensUtil.refreshLensTGT(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
serverKdcLoginInterval = conf.getInt(LensConfConstants.KDC_LOGIN_SERVICE_INTERVAL_IN_MINUTES,
LensConfConstants.DEFAULT_KDC_LOGIN_SERVICE_INTERVAL_IN_MINUTES);
ThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("Lens-server-refresh-tgt-Thread-%d")
.daemon(true)
.priority(Thread.NORM_PRIORITY)
.build();
kerberosTgtScheduler = Executors.newSingleThreadScheduledExecutor(factory);
kerberosTgtScheduler.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
final String runId = UUID.randomUUID().toString();
logSegregationContext.setLogSegregationId(runId);
LensUtil.refreshLensTGT(conf);
log.info("KDC login successful for lens.");
} catch (Exception e) {
incrCounter(SERVER_STATE_PERSISTENCE_ERRORS);
log.error("Unable to login to KDC...", e);
}
}
}, 0, serverKdcLoginInterval, TimeUnit.MINUTES);
}
/*
* (non-Javadoc)
*
* @see org.apache.hive.service.CompositeService#start()
*/
public synchronized void start() {
if (cliService.getHiveConf().getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION)
.equals(HiveAuthFactory.AuthTypes.KERBEROS.toString())) {
enableKDCLoginThread();
log.info("Enabled kerberos tgt login at {} minutes interval", serverKdcLoginInterval);
}
if (getServiceState() != STATE.STARTED) {
super.start();
}
if (!isServerStatePersistenceEnabled) {
log.info("Server restart is not enabled. Not persisting lens server state");
} else {
ThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("Lens-server-snapshotter-Thread-%d")
.daemon(true)
.priority(Thread.NORM_PRIORITY)
.build();
serverSnapshotScheduler = Executors.newSingleThreadScheduledExecutor(factory);
serverSnapshotScheduler.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
final String runId = UUID.randomUUID().toString();
logSegregationContext.setLogSegregationId(runId);
persistLensServiceState();
log.info("SnapShot of Lens Services created");
} catch (Exception e) {
incrCounter(SERVER_STATE_PERSISTENCE_ERRORS);
log.error("Unable to persist lens server state", e);
}
}
}, serverStatePersistenceInterval, serverStatePersistenceInterval, TimeUnit.MILLISECONDS);
log.info("Enabled periodic persistence of lens server state at {} millis interval",
serverStatePersistenceInterval);
}
}
/**
* Setup persisted state.
*
* @throws IOException Signals that an I/O exception has occurred.
* @throws ClassNotFoundException the class not found exception
*/
private void setupPersistedState() throws IOException, ClassNotFoundException {
for (BaseLensService service : lensServices) {
ObjectInputStream in = null;
Path path = getServicePersistPath(service);
try {
try {
in = new ObjectInputStream(persistenceFS.open(path));
} catch (FileNotFoundException fe) {
log.warn("Persisted state not available for service: {} at: {}", service.getName(), path);
continue;
}
service.readExternal(in);
log.info("Recovered service {} from persisted state {}", service.getName(), path);
} finally {
if (in != null) {
in.close();
}
}
}
}
/**
* Persist lens service state.
*/
private void persistLensServiceState() {
synchronized (statePersistenceLock) {
log.info("Persisting server state in {}", persistDir);
String now = "" + System.currentTimeMillis();
for (BaseLensService service : lensServices) {
try {
persistState(service, now);
} catch (Exception e) {
incrCounter(SERVER_STATE_PERSISTENCE_ERRORS);
log.error("Error while persisting state for service {}", service.getName(), e);
}
}
}
}
private void persistState(BaseLensService service, String time) throws IOException {
log.info("Persisting state of service: {}", service.getName());
Path serviceWritePath = new Path(persistDir, service.getName() + ".out" + "." + time);
ObjectOutputStream out = null;
try {
out = new ObjectOutputStream(persistenceFS.create(serviceWritePath));
service.writeExternal(out);
} finally {
if (out != null) {
out.close();
}
}
Path servicePath = getServicePersistPath(service);
if (persistenceFS.exists(servicePath)) {
// delete the destination first, because rename is no-op in HDFS, if destination exists
if (!persistenceFS.delete(servicePath, true)) {
throw new IOException("Failed to delete " + servicePath);
}
}
if (!persistenceFS.rename(serviceWritePath, servicePath)) {
throw new IOException("Failed to rename " + serviceWritePath + " to " + servicePath);
}
log.info("Persisted service {} to [{}]", service.getName(), servicePath);
}
/**
* Gets the service persist path.
*
* @param service the service
* @return the service persist path
*/
private Path getServicePersistPath(BaseLensService service) {
return new Path(persistDir, service.getName() + ".final");
}
/*
* (non-Javadoc)
*
* @see org.apache.hive.service.CompositeService#stop()
*/
public synchronized void stop() {
if (getServiceState() != STATE.STOPPED) {
log.info("Stopping lens server");
stopping = true;
for (BaseLensService service : lensServices) {
service.prepareStopping();
}
if (isServerStatePersistenceEnabled) {
try {
//1. shutdown serverSnapshotScheduler gracefully by allowing already triggered task (if any) to finish
if (serverSnapshotScheduler != null) {
serverSnapshotScheduler.shutdown();
try { //Wait for shutdown. Shutdown should be immediate in case no task is running at this point
while (!serverSnapshotScheduler.awaitTermination(1, TimeUnit.MINUTES)) {
log.info("Waiting for Lens-server-snapshotter to shutdown gracefully...");
}
} catch (InterruptedException e) {
log.error("Lens-server-snapshotter interrupted while shutting down", e);
}
log.info("Lens-server-snapshotter was shutdown");
} else {
log.info("Lens-server-snapshotter wasn't started, so no need to shutdown");
}
//2. persist the latest state of all the services
persistLensServiceState();
} finally {
try {
persistenceFS.close();
log.info("Persistence File system object close complete");
} catch (Exception e) {
log.error("Error while closing Persistence File system object", e);
}
}
}
super.stop();
}
}
public STATE getServiceState() {
return super.getServiceState();
}
public boolean isStopping() {
return stopping;
}
/**
* Gets the.
*
* @return the lens services
*/
public static LensServices get() {
return instance;
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.ServiceProvider#getService(java.lang.String)
*/
@Override
@SuppressWarnings("unchecked")
public <T extends Service> T getService(String sName) {
return (T) services.get(sName);
}
public List<BaseLensService> getLensServices() {
return lensServices;
}
private void initializeErrorCollection() {
try {
errorCollection = new ErrorCollectionFactory().createErrorCollection();
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not create error collection.", e);
}
}
private MetricsService getMetricService() {
if (metricsService == null) {
metricsService = LensServices.get().getService(MetricsService.NAME);
if (metricsService == null) {
throw new NullPointerException("Could not get metrics service");
}
}
return metricsService;
}
}