blob: 60bdbf9999afee2f327cf5cd2101a87e43483ec9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket.jetty;
import dto.SessionInfo;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.websocket.WebSocketClientService;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketMessageRouter;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import util.HeaderMapExtractor;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@Tags({"WebSocket", "Jetty", "client"})
@CapabilityDescription("Implementation of WebSocketClientService." +
" This service uses Jetty WebSocket client module to provide" +
" WebSocket session management throughout the application.")
public class JettyWebSocketClient extends AbstractJettyWebSocketService implements WebSocketClientService {
public static final PropertyDescriptor WS_URI = new PropertyDescriptor.Builder()
.name("websocket-uri")
.displayName("WebSocket URI")
.description("The WebSocket URI this client connects to.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.URI_VALIDATOR)
.addValidator((subject, input, context) -> {
final ValidationResult.Builder result = new ValidationResult.Builder()
.valid(input.startsWith("/"))
.subject(subject);
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
result.explanation("Expression Language Present").valid(true);
} else {
result.explanation("Protocol should be either 'ws' or 'wss'.")
.valid(input.startsWith("ws://") || input.startsWith("wss://"));
}
return result.build();
})
.build();
public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
.name("connection-timeout")
.displayName("Connection Timeout")
.description("The timeout to connect the WebSocket URI.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("3 sec")
.build();
public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
.name("session-maintenance-interval")
.displayName("Session Maintenance Interval")
.description("The interval between session maintenance activities." +
" A WebSocket session established with a WebSocket server can be terminated due to different reasons" +
" including restarting the WebSocket server or timing out inactive sessions." +
" This session maintenance activity is periodically executed in order to reconnect those lost sessions," +
" so that a WebSocket client can reuse the same session id transparently after it reconnects successfully. " +
" The maintenance activity is executed until corresponding processors or this controller service is stopped.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10 sec")
.build();
public static final PropertyDescriptor USER_NAME = new PropertyDescriptor.Builder()
.name("user-name")
.displayName("User Name")
.description("The user name for Basic Authentication.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor USER_PASSWORD = new PropertyDescriptor.Builder()
.name("user-password")
.displayName("User Password")
.description("The user password for Basic Authentication.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor AUTH_CHARSET = new PropertyDescriptor.Builder()
.name("authentication-charset")
.displayName("Authentication Header Charset")
.description("The charset for Basic Authentication header base64 string.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.defaultValue("US-ASCII")
.build();
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
.name("proxy-host")
.displayName("HTTP Proxy Host")
.description("The host name of the HTTP Proxy.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
.name("proxy-port")
.displayName("HTTP Proxy Port")
.description("The port number of the HTTP Proxy.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
private static final List<PropertyDescriptor> properties;
static {
final List<PropertyDescriptor> props = new ArrayList<>(getAbstractPropertyDescriptors());
props.add(WS_URI);
props.add(SSL_CONTEXT);
props.add(CONNECTION_TIMEOUT);
props.add(SESSION_MAINTENANCE_INTERVAL);
props.add(USER_NAME);
props.add(USER_PASSWORD);
props.add(AUTH_CHARSET);
props.add(PROXY_HOST);
props.add(PROXY_PORT);
properties = Collections.unmodifiableList(props);
}
private final Map<String, SessionInfo> activeSessions = new ConcurrentHashMap<>();
private final ReentrantLock connectionLock = new ReentrantLock();
private WebSocketClient client;
private URI webSocketUri;
private String authorizationHeader;
private long connectionTimeoutMillis;
private volatile ScheduledExecutorService sessionMaintenanceScheduler;
private ConfigurationContext configurationContext;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@OnEnabled
@Override
public void startClient(final ConfigurationContext context) throws Exception {
configurationContext = context;
final SSLContextService sslService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
SslContextFactory sslContextFactory = null;
if (sslService != null) {
sslContextFactory = createSslFactory(sslService, false, false, null);
}
HttpClient httpClient = new HttpClient(sslContextFactory);
final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
if (proxyHost != null && proxyPort != null) {
HttpProxy httpProxy = new HttpProxy(proxyHost, proxyPort);
httpClient.getProxyConfiguration().getProxies().add(httpProxy);
}
client = new WebSocketClient(httpClient);
configurePolicy(context, client.getPolicy());
final String userName = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue();
final String userPassword = context.getProperty(USER_PASSWORD).evaluateAttributeExpressions().getValue();
if (!StringUtils.isEmpty(userName) && !StringUtils.isEmpty(userPassword)) {
final String charsetName = context.getProperty(AUTH_CHARSET).evaluateAttributeExpressions().getValue();
if (StringUtils.isEmpty(charsetName)) {
throw new IllegalArgumentException(AUTH_CHARSET.getDisplayName() + " was not specified.");
}
final Charset charset = Charset.forName(charsetName);
final String base64String = Base64.getEncoder().encodeToString((userName + ":" + userPassword).getBytes(charset));
authorizationHeader = "Basic " + base64String;
} else {
authorizationHeader = null;
}
client.start();
activeSessions.clear();
webSocketUri = new URI(context.getProperty(WS_URI).evaluateAttributeExpressions(new HashMap<>()).getValue());
connectionTimeoutMillis = context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final Long sessionMaintenanceInterval = context.getProperty(SESSION_MAINTENANCE_INTERVAL).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
sessionMaintenanceScheduler = Executors.newSingleThreadScheduledExecutor();
sessionMaintenanceScheduler.scheduleAtFixedRate(() -> {
try {
maintainSessions();
} catch (final Exception e) {
getLogger().warn("Failed to maintain sessions due to {}", new Object[]{e}, e);
}
}, sessionMaintenanceInterval, sessionMaintenanceInterval, TimeUnit.MILLISECONDS);
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(1);
final boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet();
final boolean proxyPortSet = validationContext.getProperty(PROXY_PORT).isSet();
if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
results.add(new ValidationResult.Builder().subject("HTTP Proxy Host and Port").valid(false).explanation(
"If HTTP Proxy Host or HTTP Proxy Port is set, both must be set").build());
}
return results;
}
@OnDisabled
@OnShutdown
@Override
public void stopClient() throws Exception {
activeSessions.clear();
if (sessionMaintenanceScheduler != null) {
try {
sessionMaintenanceScheduler.shutdown();
} catch (Exception e) {
getLogger().warn("Failed to shutdown session maintainer due to {}", new Object[]{e}, e);
}
sessionMaintenanceScheduler = null;
}
if (client == null) {
return;
}
client.stop();
client = null;
}
@Override
public void connect(final String clientId) throws IOException {
connect(clientId, null, Collections.emptyMap());
}
@Override
public void connect(final String clientId, final Map<String, String> flowFileAttributes) throws IOException {
connect(clientId, null, flowFileAttributes);
}
private void connect(final String clientId, final String sessionId, final Map<String, String> flowFileAttributes) throws IOException {
try {
webSocketUri = new URI(configurationContext.getProperty(WS_URI).evaluateAttributeExpressions(flowFileAttributes).getValue());
} catch (URISyntaxException e) {
throw new ProcessException("Could not create websocket URI", e);
}
connectionLock.lock();
try {
final WebSocketMessageRouter router;
try {
router = routers.getRouterOrFail(clientId);
} catch (WebSocketConfigurationException e) {
throw new IllegalStateException("Failed to get router due to: " + e, e);
}
final RoutingWebSocketListener listener = new RoutingWebSocketListener(router);
listener.setSessionId(sessionId);
final ClientUpgradeRequest request = new ClientUpgradeRequest();
if (!flowFileAttributes.isEmpty()) {
request.setHeaders(HeaderMapExtractor.getHeaderMap(flowFileAttributes));
}
if (!StringUtils.isEmpty(authorizationHeader)) {
request.setHeader(HttpHeader.AUTHORIZATION.asString(), authorizationHeader);
}
final Future<Session> connect = client.connect(listener, webSocketUri, request);
getLogger().info("Connecting to : {}", webSocketUri);
final Session session;
try {
session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e);
}
getLogger().info("Connected, session={}", session);
activeSessions.put(clientId, new SessionInfo(listener.getSessionId(), flowFileAttributes));
} finally {
connectionLock.unlock();
}
}
void maintainSessions() throws Exception {
if (client == null) {
return;
}
connectionLock.lock();
final ComponentLog logger = getLogger();
try {
// Loop through existing sessions and reconnect.
for (String clientId : activeSessions.keySet()) {
final WebSocketMessageRouter router;
try {
router = routers.getRouterOrFail(clientId);
} catch (final WebSocketConfigurationException e) {
if (logger.isDebugEnabled()) {
logger.debug("The clientId {} is no longer active. Discarding the clientId.", clientId);
}
activeSessions.remove(clientId);
continue;
}
final SessionInfo sessionInfo = activeSessions.get(clientId);
// If this session is still alive, do nothing.
if (!router.containsSession(sessionInfo.getSessionId())) {
// This session is no longer active, reconnect it.
// If it fails, the sessionId will remain in activeSessions, and retries later.
// This reconnect attempt is continued until user explicitly stops a processor or this controller service.
connect(clientId, sessionInfo.getSessionId(), sessionInfo.getFlowFileAttributes());
}
}
} finally {
connectionLock.unlock();
}
if (logger.isDebugEnabled()) {
logger.debug("Session maintenance completed. activeSessions={}", activeSessions);
}
}
@Override
public String getTargetUri() {
return webSocketUri.toString();
}
}