blob: 8cf4f8afae65468bc5698c8a81356fa19efb94f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.splunk;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.splunk.HttpException;
import com.splunk.RequestMessage;
import com.splunk.ResponseMessage;
import com.splunk.SSLSecurityProtocol;
import com.splunk.Service;
import com.splunk.ServiceArgs;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
abstract class SplunkAPICall extends AbstractProcessor {
private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel";
private static final String HTTP_SCHEME = "http";
private static final String HTTPS_SCHEME = "https";
private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name());
private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name());
private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name());
private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name());
static final String ACKNOWLEDGEMENT_ID_ATTRIBUTE = "splunk.acknowledgement.id";
static final String RESPONDED_AT_ATTRIBUTE = "splunk.responded.at";
static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
.name("Scheme")
.displayName("Scheme")
.description("The scheme for connecting to Splunk.")
.allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
.defaultValue(HTTPS_SCHEME)
.required(true)
.build();
static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.displayName("Hostname")
.description("The ip address or hostname of the Splunk server.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("localhost")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor PORT = new PropertyDescriptor
.Builder().name("Port")
.displayName("HTTP Event Collector Port")
.description("The HTTP Event Collector HTTP Port Number.")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.defaultValue("8088")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
.name("Security Protocol")
.displayName("Security Protocol")
.description("The security protocol to use for communicating with Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE)
.defaultValue(TLS_1_2_VALUE.getValue())
.build();
static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
.name("Owner")
.displayName("Owner")
.description("The owner to pass to Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
.name("Token")
.displayName("HTTP Event Collector Token")
.description("HTTP Event Collector token starting with the string Splunk. For example Splunk 1234578-abcd-1234-abcd-1234abcd")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("Username")
.displayName("Username")
.description("The username to authenticate to Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.displayName("Password")
.description("The password to authenticate to Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.sensitive(true)
.build();
static final PropertyDescriptor REQUEST_CHANNEL = new PropertyDescriptor.Builder()
.name("request-channel")
.displayName("Splunk Request Channel")
.description("Identifier of the used request channel.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
SCHEME,
HOSTNAME,
PORT,
SECURITY_PROTOCOL,
OWNER,
TOKEN,
USERNAME,
PASSWORD,
REQUEST_CHANNEL
);
private final JsonFactory jsonFactory = new JsonFactory();
private final ObjectMapper jsonObjectMapper = new ObjectMapper(jsonFactory);
private volatile ServiceArgs splunkServiceArguments;
private volatile Service splunkService;
private volatile String requestChannel;
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return SplunkAPICall.PROPERTIES;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
splunkServiceArguments = getSplunkServiceArgs(context);
splunkService = getSplunkService(splunkServiceArguments);
requestChannel = context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue();
}
private ServiceArgs getSplunkServiceArgs(final ProcessContext context) {
final ServiceArgs splunkServiceArguments = new ServiceArgs();
splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue());
splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger());
if (context.getProperty(OWNER).isSet()) {
splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(TOKEN).isSet()) {
splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(USERNAME).isSet()) {
splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(PASSWORD).isSet()) {
splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue());
}
if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && context.getProperty(SECURITY_PROTOCOL).isSet()) {
splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue()));
}
return splunkServiceArguments;
}
protected Service getSplunkService(final ServiceArgs splunkServiceArguments) {
return Service.connect(splunkServiceArguments);
}
@OnStopped
public void onStopped() {
if (splunkService != null) {
splunkService.logout();
splunkService = null;
}
requestChannel = null;
splunkServiceArguments = null;
}
protected ResponseMessage call(final String endpoint, final RequestMessage request) {
request.getHeader().put(REQUEST_CHANNEL_HEADER_NAME, requestChannel);
try {
return splunkService.send(endpoint, request);
//Catch Stale connection exception, reinitialize, and retry
} catch (final HttpException e) {
getLogger().error("Splunk request status code: {}. Retrying the request.", new Object[] {e.getStatus()});
splunkService.logout();
splunkService = getSplunkService(splunkServiceArguments);
return splunkService.send(endpoint, request);
}
}
protected <T> T unmarshallResult(final InputStream responseBody, final Class<T> type) throws IOException {
return jsonObjectMapper.readValue(responseBody, type);
}
protected String marshalRequest(final Object request) throws IOException {
return jsonObjectMapper.writeValueAsString(request);
}
}