blob: c191324d5a1eadaa536d204acd6f920169f52e42 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.gateway;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.gateway.audit.api.Action;
import org.apache.hadoop.gateway.audit.api.ActionOutcome;
import org.apache.hadoop.gateway.audit.api.AuditServiceFactory;
import org.apache.hadoop.gateway.audit.api.Auditor;
import org.apache.hadoop.gateway.audit.api.ResourceType;
import org.apache.hadoop.gateway.audit.log4j.audit.AuditConstants;
import org.apache.hadoop.gateway.config.GatewayConfig;
import org.apache.hadoop.gateway.config.impl.GatewayConfigImpl;
import org.apache.hadoop.gateway.deploy.DeploymentFactory;
import org.apache.hadoop.gateway.filter.CorrelationHandler;
import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
import org.apache.hadoop.gateway.i18n.resources.ResourcesFactory;
import org.apache.hadoop.gateway.services.GatewayServices;
import org.apache.hadoop.gateway.services.registry.ServiceRegistry;
import org.apache.hadoop.gateway.services.security.SSLService;
import org.apache.hadoop.gateway.services.topology.TopologyService;
import org.apache.hadoop.gateway.topology.Topology;
import org.apache.hadoop.gateway.topology.TopologyEvent;
import org.apache.hadoop.gateway.topology.TopologyListener;
import org.apache.hadoop.gateway.trace.AccessHandler;
import org.apache.hadoop.gateway.trace.ErrorHandler;
import org.apache.hadoop.gateway.trace.TraceHandler;
import org.apache.log4j.PropertyConfigurator;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.webapp.WebAppContext;
import org.jboss.shrinkwrap.api.exporter.ExplodedExporter;
import org.jboss.shrinkwrap.api.spec.WebArchive;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.ProviderException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
public class GatewayServer {
private static GatewayResources res = ResourcesFactory.get(GatewayResources.class);
private static GatewayMessages log = MessagesFactory.get(GatewayMessages.class);
private static Auditor auditor = AuditServiceFactory.getAuditService().getAuditor(AuditConstants.DEFAULT_AUDITOR_NAME,
AuditConstants.KNOX_SERVICE_NAME, AuditConstants.KNOX_COMPONENT_NAME);
private static GatewayServer server;
private static GatewayServices services;
private static Properties buildProperties;
private Server jetty;
private ErrorHandler errorHandler;
private GatewayConfig config;
private ContextHandlerCollection contexts;
private TopologyService monitor;
private TopologyListener listener;
private Map<String, WebAppContext> deployments;
public static void main( String[] args ) {
try {
configureLogging();
logSysProps();
CommandLine cmd = GatewayCommandLine.parse( args );
if( cmd.hasOption( GatewayCommandLine.HELP_LONG ) ) {
GatewayCommandLine.printHelp();
} else if( cmd.hasOption( GatewayCommandLine.VERSION_LONG ) ) {
printVersion();
} else if( cmd.hasOption( GatewayCommandLine.REDEPLOY_LONG ) ) {
redeployTopologies( cmd.getOptionValue( GatewayCommandLine.REDEPLOY_LONG ) );
} else {
buildProperties = loadBuildProperties();
services = instantiateGatewayServices();
if (services == null) {
log.failedToInstantiateGatewayServices();
}
GatewayConfig config = new GatewayConfigImpl();
if (config.isHadoopKerberosSecured()) {
configureKerberosSecurity( config );
}
Map<String,String> options = new HashMap<String,String>();
options.put(GatewayCommandLine.PERSIST_LONG, Boolean.toString(cmd.hasOption(GatewayCommandLine.PERSIST_LONG)));
services.init(config, options);
if (!cmd.hasOption(GatewayCommandLine.NOSTART_LONG)) {
startGateway( config, services );
}
}
} catch ( ParseException e ) {
log.failedToParseCommandLine( e );
GatewayCommandLine.printHelp();
} catch ( Exception e ) {
log.failedToStartGateway( e );
// Make sure the process exits.
System.exit(1);
}
}
private static void printVersion() {
System.out.println( res.gatewayVersionMessage( // I18N not required.
getBuildVersion(),
getBuildHash() ) );
}
public static String getBuildHash() {
String hash = "unknown";
if( buildProperties != null ) {
hash = buildProperties.getProperty( "build.hash", hash );
}
return hash;
}
public static String getBuildVersion() {
String version = "unknown";
if( buildProperties != null ) {
version = buildProperties.getProperty( "build.version", version );
}
return version;
}
private static GatewayServices instantiateGatewayServices() {
ServiceLoader<GatewayServices> loader = ServiceLoader.load( GatewayServices.class );
Iterator<GatewayServices> services = loader.iterator();
if (services.hasNext()) {
return services.next();
}
return null;
}
public static synchronized GatewayServices getGatewayServices() {
return services;
}
private static void logSysProp( String name ) {
log.logSysProp( name, System.getProperty( name ) );
}
private static void logSysProps() {
logSysProp( "user.name" );
logSysProp( "user.dir" );
logSysProp( "java.runtime.name" );
logSysProp( "java.runtime.version" );
logSysProp( "java.home" );
}
private static void configureLogging() {
PropertyConfigurator.configure( System.getProperty( "log4j.configuration" ) );
// String fileName = config.getGatewayConfDir() + File.separator + "log4j.properties";
// File file = new File( fileName );
// if( file.isFile() && file.canRead() ) {
// FileInputStream stream;
// try {
// stream = new FileInputStream( file );
// Properties properties = new Properties();
// properties.load( stream );
// PropertyConfigurator.configure( properties );
// log.loadedLoggingConfig( fileName );
// } catch( IOException e ) {
// log.failedToLoadLoggingConfig( fileName );
// }
// }
}
private static void configureKerberosSecurity( GatewayConfig config ) {
System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "true");
System.setProperty(GatewayConfig.KRB5_CONFIG, config.getKerberosConfig());
System.setProperty(GatewayConfig.KRB5_DEBUG,
Boolean.toString(config.isKerberosDebugEnabled()));
System.setProperty(GatewayConfig.KRB5_LOGIN_CONFIG, config.getKerberosLoginConfig());
System.setProperty(GatewayConfig.KRB5_USE_SUBJECT_CREDS_ONLY, "false");
}
private static Properties loadBuildProperties() {
Properties properties = new Properties();
InputStream inputStream = GatewayServer.class.getClassLoader().getResourceAsStream( "build.properties" );
if( inputStream != null ) {
try {
properties.load( inputStream );
inputStream.close();
} catch( IOException e ) {
// Ignore.
}
}
return properties;
}
private static void extractToFile( String resource, File file ) throws IOException {
InputStream input = ClassLoader.getSystemResourceAsStream( resource );
OutputStream output = new FileOutputStream( file );
IOUtils.copy( input, output );
output.close();
input.close();
}
public static void redeployTopologies( String topologyName ) {
TopologyService ts = getGatewayServices().getService(GatewayServices.TOPOLOGY_SERVICE);
ts.reloadTopologies();
ts.redeployTopologies(topologyName);
}
public static GatewayServer startGateway( GatewayConfig config, GatewayServices svcs ) throws Exception {
log.startingGateway();
server = new GatewayServer( config );
synchronized ( server ) {
//KM[ Commented this out because is causes problems with
// multiple services instance used in a single test process.
// I'm not sure what drive including this check though.
//if (services == null) {
services = svcs;
//}
//KM]
services.start();
DeploymentFactory.setGatewayServices(services);
server.start();
log.startedGateway( server.jetty.getConnectors()[ 0 ].getLocalPort() );
return server;
}
}
public GatewayServer( GatewayConfig config ) {
this(config, null);
}
public GatewayServer( GatewayConfig config, Properties options ) {
this.config = config;
this.listener = new InternalTopologyListener();
}
private static Connector createConnector( final GatewayConfig config ) throws IOException {
Connector connector;
// Determine the socket address and check availability.
InetSocketAddress address = config.getGatewayAddress();
checkAddressAvailability( address );
if (config.isSSLEnabled()) {
SSLService ssl = services.getService("SSLService");
String keystoreFileName = config.getGatewaySecurityDir() + File.separatorChar + "keystores" + File.separatorChar + "gateway.jks";
connector = (Connector) ssl.buildSSlConnector(keystoreFileName);
} else {
connector = new SelectChannelConnector();
}
connector.setHost( address.getHostName() );
connector.setPort( address.getPort() );
connector.setRequestHeaderSize( config.getHttpServerRequestHeaderBuffer() );
connector.setRequestBufferSize( config.getHttpServerRequestBuffer() );
connector.setResponseHeaderSize( config.getHttpServerResponseHeaderBuffer() );
connector.setResponseBufferSize( config.getHttpServerResponseBuffer() );
return connector;
}
private static HandlerCollection createHandlers( final GatewayConfig config, final ContextHandlerCollection contexts ) {
HandlerCollection handlers = new HandlerCollection();
RequestLogHandler logHandler = new RequestLogHandler();
logHandler.setRequestLog( new AccessHandler() );
TraceHandler traceHandler = new TraceHandler();
traceHandler.setHandler( contexts );
traceHandler.setTracedBodyFilter( System.getProperty( "org.apache.knox.gateway.trace.body.status.filter" ) );
CorrelationHandler correlationHandler = new CorrelationHandler();
correlationHandler.setHandler( traceHandler );
handlers.setHandlers( new Handler[]{ correlationHandler, logHandler } );
return handlers;
}
private synchronized void start() throws Exception {
// Create the global context handler.
contexts = new ContextHandlerCollection();
// A map to keep track of current deployments by cluster name.
deployments = new ConcurrentHashMap<String, WebAppContext>();
// Start Jetty.
jetty = new Server();
jetty.addConnector( createConnector( config ) );
jetty.setHandler( createHandlers( config, contexts ) );
jetty.setThreadPool( new QueuedThreadPool( config.getThreadPoolMax() ) );
try {
jetty.start();
}
catch (IOException e) {
log.failedToStartGateway( e );
throw e;
}
// Create a dir/file based cluster topology provider.
File topologiesDir = calculateAbsoluteTopologiesDir();
monitor = services.getService(GatewayServices.TOPOLOGY_SERVICE);
monitor.addTopologyChangeListener(listener);
// Load the current topologies.
log.loadingTopologiesFromDirectory(topologiesDir.getAbsolutePath());
monitor.reloadTopologies();
// Start the topology monitor.
log.monitoringTopologyChangesInDirectory(topologiesDir.getAbsolutePath());
monitor.startMonitor();
}
public synchronized void stop() throws Exception {
log.stoppingGateway();
services.stop();
monitor.stopMonitor();
jetty.stop();
jetty.join();
log.stoppedGateway();
}
public InetSocketAddress[] getAddresses() {
InetSocketAddress[] addresses = new InetSocketAddress[ jetty.getConnectors().length ];
for( int i=0, n=addresses.length; i<n; i++ ) {
Connector connector = jetty.getConnectors()[ i ];
String host = connector.getHost();
if( host == null ) {
addresses[ i ] = new InetSocketAddress( connector.getLocalPort() );
} else {
addresses[ i ] = new InetSocketAddress( host, connector.getLocalPort() );
}
}
return addresses;
}
private synchronized void internalDeploy( Topology topology, File warFile ) {
String name = topology.getName();
String warPath = warFile.getAbsolutePath();
errorHandler = new ErrorHandler();
errorHandler.setShowStacks(false);
errorHandler.setTracedBodyFilter( System.getProperty( "org.apache.knox.gateway.trace.body.status.filter" ) );
WebAppContext context = new WebAppContext();
context.setDefaultsDescriptor( null );
if (!name.equals("_default")) {
context.setContextPath( "/" + config.getGatewayPath() + "/" + name );
}
else {
context.setContextPath( "/" );
}
context.setWar( warPath );
context.setErrorHandler(errorHandler);
// internalUndeploy( topology ); KNOX-152
context.setAttribute( GatewayServices.GATEWAY_CLUSTER_ATTRIBUTE, name );
context.setAttribute( "org.apache.knox.gateway.frontend.uri", getFrontendUri( context, config ) );
context.setAttribute( GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE, config );
deployments.put( name, context );
contexts.addHandler( context );
try {
context.start();
} catch( Exception e ) {
auditor
.audit(Action.DEPLOY, topology.getName(), ResourceType.TOPOLOGY, ActionOutcome.FAILURE);
log.failedToDeployTopology( name, e );
}
}
private synchronized void internalUndeploy( Topology topology ) {
WebAppContext context = deployments.remove( topology.getName() );
if( context != null ) {
ServiceRegistry sr = getGatewayServices().getService(GatewayServices.SERVICE_REGISTRY_SERVICE);
if (sr != null) {
sr.removeClusterServices(topology.getName());
}
contexts.removeHandler( context ) ;
try {
context.stop();
} catch( Exception e ) {
auditor.audit(Action.UNDEPLOY, topology.getName(), ResourceType.TOPOLOGY,
ActionOutcome.FAILURE);
log.failedToUndeployTopology( topology.getName(), e );
}
}
}
// Using an inner class to hide the handleTopologyEvent method from consumers of GatewayServer.
private class InternalTopologyListener implements TopologyListener {
@Override
public void handleTopologyEvent( List<TopologyEvent> events ) {
synchronized ( GatewayServer.this ) {
for( TopologyEvent event : events ) {
Topology topology = event.getTopology();
File deployDir = calculateAbsoluteDeploymentsDir();
if( event.getType().equals( TopologyEvent.Type.DELETED ) ) {
handleDeleteDeployment(topology, deployDir);
} else {
handleCreateDeployment(topology, deployDir);
}
}
}
}
private void handleDeleteDeployment(Topology topology, File deployDir) {
File[] files = deployDir.listFiles( new WarDirFilter( topology.getName() + "\\.war\\.[0-9A-Fa-f]+" ) );
if( files != null ) {
auditor.audit(Action.UNDEPLOY, topology.getName(), ResourceType.TOPOLOGY,
ActionOutcome.UNAVAILABLE);
for( File file : files ) {
log.deletingDeployment( file.getAbsolutePath() );
internalUndeploy( topology );
FileUtils.deleteQuietly( file );
}
}
}
private void handleCreateDeployment(Topology topology, File deployDir) {
try {
File warDir = calculateDeploymentDir( topology );
if( !warDir.exists() ) {
auditor.audit( Action.DEPLOY, topology.getName(), ResourceType.TOPOLOGY, ActionOutcome.UNAVAILABLE );
// KNOX-564 - Topology should fail to deploy with no providers configured.
if(topology.getProviders().isEmpty()) {
throw new ProviderException("No providers found inside topology.");
}
log.deployingTopology( topology.getName(), warDir.getAbsolutePath() );
internalUndeploy( topology ); // KNOX-152
WebArchive war = null;
war = DeploymentFactory.createDeployment( config, topology );
if( !deployDir.exists() ) {
deployDir.mkdirs();
}
File tmp = war.as( ExplodedExporter.class ).exportExploded( deployDir, warDir.getName() + ".tmp" );
tmp.renameTo( warDir );
internalDeploy( topology, warDir );
if (topology.getName().equals(config.getDefaultTopologyName())) {
topology.setName("_default");
handleCreateDeployment(topology, deployDir);
topology.setName(config.getDefaultTopologyName());
}
log.deployedTopology( topology.getName());
} else {
auditor.audit( Action.REDEPLOY, topology.getName(), ResourceType.TOPOLOGY, ActionOutcome.UNAVAILABLE );
log.redeployingTopology( topology.getName(), warDir.getAbsolutePath() );
internalDeploy( topology, warDir );
log.redeployedTopology( topology.getName() );
}
} catch( Throwable e ) {
auditor.audit( Action.DEPLOY, topology.getName(), ResourceType.TOPOLOGY, ActionOutcome.FAILURE );
log.failedToDeployTopology( topology.getName(), e );
}
}
}
private static File calculateAbsoluteTopologiesDir( GatewayConfig config ) {
File topoDir = new File( config.getGatewayTopologyDir() );
topoDir = topoDir.getAbsoluteFile();
return topoDir;
}
private static File calculateAbsoluteDeploymentsDir( GatewayConfig config ) {
File deployDir = new File( config.getGatewayDeploymentDir() );
deployDir = deployDir.getAbsoluteFile();
return deployDir;
}
private File calculateAbsoluteTopologiesDir() {
return calculateAbsoluteTopologiesDir( config );
}
private File calculateAbsoluteDeploymentsDir() {
return calculateAbsoluteDeploymentsDir( config );
}
private File calculateDeploymentDir( Topology topology ) {
File warDir = new File( calculateAbsoluteDeploymentsDir(), calculateDeploymentName( topology ) );
return warDir;
}
private String calculateDeploymentName( Topology topology ) {
String name = topology.getName() + ".war." + Long.toHexString( topology.getTimestamp() );
return name;
}
private static void checkAddressAvailability( InetSocketAddress address ) throws IOException {
ServerSocket socket = new ServerSocket();
socket.bind( address );
socket.close();
}
private class WarDirFilter implements FilenameFilter {
Pattern pattern;
WarDirFilter( String regex ) {
pattern = Pattern.compile( regex );
}
@Override
public boolean accept( File dir, String name ) {
return pattern.matcher( name ).matches();
}
}
public URI getFrontendUri( WebAppContext context, GatewayConfig config ) {
URI frontendUri = null;
String frontendStr = config.getFrontendUrl();
if( frontendStr != null && !frontendStr.trim().isEmpty() ) {
String topoName = (String)context.getAttribute( GatewayServices.GATEWAY_CLUSTER_ATTRIBUTE );
try {
frontendStr = frontendStr.trim();
if( frontendStr.endsWith( "/" ) ) {
frontendUri = new URI( frontendStr + topoName );
} else {
frontendUri = new URI( frontendStr + "/" + topoName );
}
} catch( URISyntaxException e ) {
throw new IllegalArgumentException( e );
}
}
return frontendUri;
}
}