blob: 66413f8897c05bee89427be3c9f0dad05f269862 [file] [log] [blame]
package org.apache.helix.rest.server;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
import com.codahale.metrics.Clock;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Reservoir;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.SlidingTimeWindowReservoir;
import com.codahale.metrics.jersey2.InstrumentedResourceMethodApplicationListener;
import com.codahale.metrics.jmx.JmxReporter;
import org.apache.helix.HelixException;
import org.apache.helix.rest.acl.AclRegister;
import org.apache.helix.rest.acl.NoopAclRegister;
import org.apache.helix.rest.common.ContextPropertyKeys;
import org.apache.helix.rest.common.HelixRestNamespace;
import org.apache.helix.rest.common.ServletType;
import org.apache.helix.rest.server.auditlog.AuditLogger;
import org.apache.helix.rest.server.authValidator.AuthValidator;
import org.apache.helix.rest.server.authValidator.NoopAuthValidator;
import org.apache.helix.rest.server.filters.AuditLogFilter;
import org.apache.helix.rest.server.filters.CORSFilter;
import org.apache.helix.rest.server.filters.ClusterAuthFilter;
import org.apache.helix.rest.server.filters.NamespaceAuthFilter;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HelixRestServer {
private static Logger LOG = LoggerFactory.getLogger(HelixRestServer.class);
private static final String REST_DOMAIN = "org.apache.helix.rest";
private static final String CORS_ENABLED = "cors.enabled";
// TODO: consider moving the following static context to ServerContext or any other place
public static SSLContext REST_SERVER_SSL_CONTEXT;
private int _port;
private String _urlPrefix;
private Server _server;
private List<JmxReporter> _jmxReporterList;
private List<HelixRestNamespace> _helixNamespaces;
private ServletContextHandler _servletContextHandler;
private List<AuditLogger> _auditLoggers;
private AuthValidator _clusterAuthValidator;
private AuthValidator _namespaceAuthValidator;
private AclRegister _aclRegister;
// Key is name of namespace, value of the resource config of that namespace
private Map<String, ResourceConfig> _resourceConfigMap;
public HelixRestServer(String zkAddr, int port, String urlPrefix) {
this(zkAddr, port, urlPrefix, Collections.<AuditLogger>emptyList());
}
public HelixRestServer(String zkAddr, int port, String urlPrefix, List<AuditLogger> auditLoggers) {
// Create default namespace using zkAddr
ArrayList<HelixRestNamespace> namespaces = new ArrayList<>();
namespaces.add(new HelixRestNamespace(HelixRestNamespace.DEFAULT_NAMESPACE_NAME,
HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, zkAddr, true));
init(namespaces, port, urlPrefix, auditLoggers);
}
public HelixRestServer(List<HelixRestNamespace> namespaces, int port, String urlPrefix,
List<AuditLogger> auditLoggers) {
init(namespaces, port, urlPrefix, auditLoggers);
}
public HelixRestServer(List<HelixRestNamespace> namespaces, int port, String urlPrefix,
List<AuditLogger> auditLoggers, AuthValidator clusterAuthValidator,
AuthValidator namespaceAuthValidator, AclRegister aclRegister) {
init(namespaces, port, urlPrefix, auditLoggers, clusterAuthValidator, namespaceAuthValidator,
aclRegister);
}
private void init(List<HelixRestNamespace> namespaces, int port, String urlPrefix,
List<AuditLogger> auditLoggers) {
init(namespaces, port, urlPrefix, auditLoggers, new NoopAuthValidator(),
new NoopAuthValidator(), new NoopAclRegister());
}
private void init(List<HelixRestNamespace> namespaces, int port, String urlPrefix,
List<AuditLogger> auditLoggers, AuthValidator clusterAuthValidator,
AuthValidator namespaceAuthValidator, AclRegister aclRegister) {
if (namespaces.size() == 0) {
throw new IllegalArgumentException(
"No namespace specified! Please provide ZOOKEEPER address or namespace manifest.");
}
_port = port;
_urlPrefix = urlPrefix;
_server = new Server(_port);
_jmxReporterList = new ArrayList<>();
_auditLoggers = auditLoggers;
_resourceConfigMap = new HashMap<>();
_servletContextHandler = new ServletContextHandler(_server, _urlPrefix);
_helixNamespaces = namespaces;
_clusterAuthValidator = clusterAuthValidator;
_namespaceAuthValidator = namespaceAuthValidator;
_aclRegister = aclRegister;
// Initialize all namespaces.
// If there is not a default namespace (namespace.isDefault() is false),
// endpoint "/namespaces" will be disabled.
try {
for (HelixRestNamespace namespace : _helixNamespaces) {
if (namespace.isDefault()) {
LOG.info("Creating default servlet for default namespace");
prepareServlet(namespace, ServletType.DEFAULT_SERVLET);
} else {
LOG.info("Creating common servlet for namespace {}", namespace.getName());
prepareServlet(namespace, ServletType.COMMON_SERVLET);
}
}
} catch (Exception e) {
cleanupResourceConfigs();
throw e;
}
// Start special servlet for serving namespaces
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override public void run() {
shutdown();
}
}));
}
private void prepareServlet(HelixRestNamespace namespace, ServletType type) {
String resourceConfigMapKey = getResourceConfigMapKey(type, namespace);
if (_resourceConfigMap.containsKey(resourceConfigMapKey)) {
throw new IllegalArgumentException(
String.format("Duplicated namespace name \"%s\"", namespace.getName()));
}
// Prepare resource config
ResourceConfig config = getResourceConfig(namespace, type);
_resourceConfigMap.put(resourceConfigMapKey, config);
initMetricRegistry(config, namespace.getName());
// Initialize servlet
initServlet(config, String.format(type.getServletPathSpecTemplate(), namespace.getName()));
}
private String getResourceConfigMapKey(ServletType type, HelixRestNamespace namespace) {
return String.format("%s_%s", type.name(), namespace.getName());
}
protected ResourceConfig getResourceConfig(HelixRestNamespace namespace, ServletType type) {
ResourceConfig cfg = new ResourceConfig();
cfg.packages(type.getServletPackageArray());
cfg.setApplicationName(namespace.getName());
cfg.property(ContextPropertyKeys.SERVER_CONTEXT.name(),
new ServerContext(namespace.getMetadataStoreAddress(), namespace.isMultiZkEnabled(),
namespace.getMsdsEndpoint()));
if (type == ServletType.DEFAULT_SERVLET) {
cfg.property(ContextPropertyKeys.ALL_NAMESPACES.name(), _helixNamespaces);
}
cfg.property(ContextPropertyKeys.METADATA.name(), namespace);
cfg.property(ContextPropertyKeys.ACL_REGISTER.name(), _aclRegister);
if (Boolean.getBoolean(CORS_ENABLED)) {
// NOTE: CORS is disabled by default unless otherwise specified in System Properties.
cfg.register(new CORSFilter());
}
cfg.register(new AuditLogFilter(_auditLoggers));
cfg.register(new ClusterAuthFilter(_clusterAuthValidator));
cfg.register(new NamespaceAuthFilter(_namespaceAuthValidator));
return cfg;
}
/*
* Initialize metric registry and jmx reporter for each namespace.
*/
private void initMetricRegistry(ResourceConfig cfg, String namespace) {
MetricRegistry metricRegistry = new MetricRegistry();
// Set the sliding time window to be 1 minute for now
Supplier<Reservoir> reservoirSupplier = () -> {
return new SlidingTimeWindowReservoir(60, TimeUnit.SECONDS);
};
cfg.register(
new InstrumentedResourceMethodApplicationListener(metricRegistry, Clock.defaultClock(),
false, reservoirSupplier));
SharedMetricRegistries.add(namespace, metricRegistry);
// JmxReporter doesn't have an option to specify namespace for each servlet,
// we use a customized object name factory to get and insert namespace to object name.
JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry)
.inDomain(REST_DOMAIN)
.createsObjectNamesWith(new HelixRestObjectNameFactory(namespace))
.build();
jmxReporter.start();
_jmxReporterList.add(jmxReporter);
}
private void initServlet(ResourceConfig cfg, String servletPathSpec) {
ServletHolder servlet = new ServletHolder(new ServletContainer(cfg));
_servletContextHandler.addServlet(servlet, servletPathSpec);
}
public void start() throws HelixException, InterruptedException {
try {
_server.start();
} catch (Exception ex) {
LOG.error("Failed to start Helix rest server, " + ex);
throw new HelixException("Failed to start Helix rest server! " + ex);
}
LOG.info("Helix rest server started!");
}
public void join() {
if (_server != null) {
try {
_server.join();
} catch (InterruptedException e) {
LOG.warn("Join on Helix rest server get interrupted!" + e);
}
}
}
public synchronized void shutdown() {
if (_server != null) {
try {
_server.stop();
LOG.info("Helix rest server stopped!");
} catch (Exception ex) {
LOG.error("Failed to stop Helix rest server, " + ex);
}
}
_jmxReporterList.forEach(JmxReporter::stop);
_jmxReporterList.clear();
cleanupResourceConfigs();
}
private void cleanupResourceConfigs() {
for (Map.Entry<String, ResourceConfig> e : _resourceConfigMap.entrySet()) {
ServerContext ctx = (ServerContext) e.getValue().getProperty(ContextPropertyKeys.SERVER_CONTEXT.name());
if (ctx == null) {
LOG.info("Server context for servlet " + e.getKey() + " is null.");
} else {
LOG.info("Closing context for servlet " + e.getKey());
ctx.close();
}
}
}
public void setupSslServer(int port, SslContextFactory sslContextFactory) {
if (_server != null && port > 0) {
try {
HttpConfiguration https = new HttpConfiguration();
https.addCustomizer(new SecureRequestCustomizer());
ServerConnector sslConnector = new ServerConnector(
_server,
new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.asString()),
new HttpConnectionFactory(https));
sslConnector.setPort(port);
_server.addConnector(sslConnector);
LOG.info("Helix SSL rest server is ready to start.");
} catch (Exception ex) {
LOG.error("Failed to setup Helix SSL rest server, " + ex);
}
}
}
/**
* Register a SSLContext so that it could be used to create HTTPS clients.
* @param sslContext
*/
public void registerServerSSLContext(SSLContext sslContext) {
REST_SERVER_SSL_CONTEXT = sslContext;
}
}