blob: c25ddbfd62167ac59e3fa0faccf1f7454ba8f268 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.http;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
@Tags({"http", "request", "response"})
@SeeAlso(classNames = {
"org.apache.nifi.processors.standard.HandleHttpRequest",
"org.apache.nifi.processors.standard.HandleHttpResponse"})
@CapabilityDescription("Provides the ability to store and retrieve HTTP requests and responses external to a Processor, so that "
+ "multiple Processors can interact with the same HTTP request.")
public class StandardHttpContextMap extends AbstractControllerService implements HttpContextMap {
public static final PropertyDescriptor MAX_OUTSTANDING_REQUESTS = new PropertyDescriptor.Builder()
.name("Maximum Outstanding Requests")
.description("The maximum number of HTTP requests that can be outstanding at any one time. Any attempt to register an additional HTTP Request will cause an error")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("5000")
.build();
public static final PropertyDescriptor REQUEST_EXPIRATION = new PropertyDescriptor.Builder()
.name("Request Expiration")
.description("Specifies how long an HTTP Request should be left unanswered before being evicted from the cache and being responded to with a Service Unavailable status code")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("1 min")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
private final ConcurrentMap<String, Wrapper> wrapperMap = new ConcurrentHashMap<>();
private volatile int maxSize = 5000;
private volatile long maxRequestNanos;
private volatile ScheduledExecutorService executor;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(2);
properties.add(MAX_OUTSTANDING_REQUESTS);
properties.add(REQUEST_EXPIRATION);
return properties;
}
@OnEnabled
public void onConfigured(final ConfigurationContext context) {
maxSize = context.getProperty(MAX_OUTSTANDING_REQUESTS).asInteger();
executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
final Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setName("StandardHttpContextMap-" + getIdentifier());
return thread;
}
});
maxRequestNanos = context.getProperty(REQUEST_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS);
final long scheduleNanos = maxRequestNanos / 2;
executor.scheduleWithFixedDelay(new CleanupExpiredRequests(), scheduleNanos, scheduleNanos, TimeUnit.NANOSECONDS);
}
@OnShutdown
@OnDisabled
public void cleanup() {
if (executor != null) {
executor.shutdown();
}
}
@Override
public boolean register(final String identifier, final HttpServletRequest request, final HttpServletResponse response, final AsyncContext context) {
// fail if there are too many already. Maybe add a configuration property for how many
// outstanding, with a default of say 5000
if (wrapperMap.size() >= maxSize) {
return false;
}
final Wrapper wrapper = new Wrapper(request, response, context);
final Wrapper existing = wrapperMap.putIfAbsent(identifier, wrapper);
if (existing != null) {
throw new IllegalStateException("HTTP Request already registered with identifier " + identifier);
}
return true;
}
@Override
public HttpServletResponse getResponse(final String identifier) {
final Wrapper wrapper = wrapperMap.get(identifier);
if (wrapper == null) {
return null;
}
return wrapper.getResponse();
}
@Override
public void complete(final String identifier) {
final Wrapper wrapper = wrapperMap.remove(identifier);
if (wrapper == null) {
throw new IllegalStateException("No HTTP Request registered with identifier " + identifier);
}
wrapper.getAsync().complete();
}
private static class Wrapper {
@SuppressWarnings("unused")
private final HttpServletRequest request;
private final HttpServletResponse response;
private final AsyncContext async;
private final long nanoTimeAdded = System.nanoTime();
public Wrapper(final HttpServletRequest request, final HttpServletResponse response, final AsyncContext async) {
this.request = request;
this.response = response;
this.async = async;
}
public HttpServletResponse getResponse() {
return response;
}
public AsyncContext getAsync() {
return async;
}
public long getNanoTimeAdded() {
return nanoTimeAdded;
}
}
private class CleanupExpiredRequests implements Runnable {
@Override
public void run() {
final long now = System.nanoTime();
final long threshold = now - maxRequestNanos;
final Iterator<Map.Entry<String, Wrapper>> itr = wrapperMap.entrySet().iterator();
while (itr.hasNext()) {
final Map.Entry<String, Wrapper> entry = itr.next();
if (entry.getValue().getNanoTimeAdded() < threshold) {
itr.remove();
// send SERVICE_UNAVAILABLE
try {
final AsyncContext async = entry.getValue().getAsync();
((HttpServletResponse) async.getResponse()).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
async.complete();
} catch (final Exception e) {
// we are trying to indicate that we are unavailable. If we have an exception and cannot respond,
// then so be it. Nothing to really do here.
}
}
}
}
}
@Override
public long getRequestTimeout(final TimeUnit timeUnit) {
return timeUnit.convert(maxRequestNanos, TimeUnit.NANOSECONDS);
}
}