| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.processors.standard.servlets; |
| |
| import com.google.common.base.Strings; |
| import com.google.common.collect.ImmutableList; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.flowfile.attributes.CoreAttributes; |
| import org.apache.nifi.flowfile.attributes.StandardFlowFileMediaType; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.ProcessSessionFactory; |
| import org.apache.nifi.processors.standard.ListenHTTP; |
| import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper; |
| import org.apache.nifi.processors.standard.exception.ListenHttpException; |
| import org.apache.nifi.schema.access.SchemaNotFoundException; |
| import org.apache.nifi.serialization.MalformedRecordException; |
| import org.apache.nifi.serialization.RecordReader; |
| import org.apache.nifi.serialization.RecordReaderFactory; |
| import org.apache.nifi.serialization.RecordSetWriter; |
| import org.apache.nifi.serialization.RecordSetWriterFactory; |
| import org.apache.nifi.serialization.record.RecordSet; |
| import org.apache.nifi.stream.io.StreamThrottler; |
| import org.apache.nifi.stream.io.StreamUtils; |
| import org.apache.nifi.util.FlowFileUnpackager; |
| import org.apache.nifi.util.FlowFileUnpackagerV1; |
| import org.apache.nifi.util.FlowFileUnpackagerV2; |
| import org.apache.nifi.util.FlowFileUnpackagerV3; |
| import org.eclipse.jetty.server.Request; |
| |
| import javax.servlet.AsyncContext; |
| import javax.servlet.MultipartConfigElement; |
| import javax.servlet.ServletConfig; |
| import javax.servlet.ServletContext; |
| import javax.servlet.ServletException; |
| import javax.servlet.http.HttpServlet; |
| import javax.servlet.http.HttpServletRequest; |
| import javax.servlet.http.HttpServletResponse; |
| import javax.servlet.http.Part; |
| import javax.ws.rs.Path; |
| import javax.ws.rs.core.MediaType; |
| import java.io.BufferedInputStream; |
| import java.io.BufferedOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.nio.charset.StandardCharsets; |
| import java.security.cert.X509Certificate; |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.regex.Pattern; |
| import java.util.zip.GZIPInputStream; |
| |
| @Path("") |
| public class ListenHTTPServlet extends HttpServlet { |
| |
| private static final long serialVersionUID = 5329940480987723163L; |
| |
| public static final String FLOWFILE_CONFIRMATION_HEADER = "x-prefer-acknowledge-uri"; |
| public static final String LOCATION_HEADER_NAME = "Location"; |
| public static final String DEFAULT_FOUND_SUBJECT = "none"; |
| public static final String DEFAULT_FOUND_ISSUER = "none"; |
| public static final String LOCATION_URI_INTENT_NAME = "x-location-uri-intent"; |
| public static final String LOCATION_URI_INTENT_VALUE = "flowfile-hold"; |
| public static final int FILES_BEFORE_CHECKING_DESTINATION_SPACE = 5; |
| public static final String ACCEPT_HEADER_NAME = "Accept"; |
| public static final String ACCEPT_HEADER_VALUE = String.format("%s,%s,%s,%s,*/*;q=0.8", |
| StandardFlowFileMediaType.VERSION_3.getMediaType(), |
| StandardFlowFileMediaType.VERSION_2.getMediaType(), |
| StandardFlowFileMediaType.VERSION_1.getMediaType(), |
| StandardFlowFileMediaType.VERSION_UNSPECIFIED.getMediaType()); |
| public static final String ACCEPT_ENCODING_NAME = "Accept-Encoding"; |
| public static final String ACCEPT_ENCODING_VALUE = "gzip"; |
| public static final String GZIPPED_HEADER = "flowfile-gzipped"; |
| public static final String PROTOCOL_VERSION_HEADER = "x-nifi-transfer-protocol-version"; |
| public static final String PROTOCOL_VERSION = "3"; |
| |
| private final AtomicLong filesReceived = new AtomicLong(0L); |
| private final AtomicBoolean spaceAvailable = new AtomicBoolean(true); |
| |
| private ComponentLog logger; |
| private AtomicReference<ProcessSessionFactory> sessionFactoryHolder; |
| private volatile ProcessContext processContext; |
| private Pattern authorizedPattern; |
| private Pattern authorizedIssuerPattern; |
| private Pattern headerPattern; |
| private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap; |
| private StreamThrottler streamThrottler; |
| private String basePath; |
| private int returnCode; |
| private long multipartRequestMaxSize; |
| private int multipartReadBufferSize; |
| private int port; |
| private RecordReaderFactory readerFactory; |
| private RecordSetWriterFactory writerFactory; |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void init(final ServletConfig config) throws ServletException { |
| final ServletContext context = config.getServletContext(); |
| this.logger = (ComponentLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); |
| this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER); |
| this.processContext = (ProcessContext) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER); |
| this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); |
| this.authorizedIssuerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_ISSUER_PATTERN); |
| this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN); |
| this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); |
| this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER); |
| this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH); |
| this.returnCode = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_RETURN_CODE); |
| this.multipartRequestMaxSize = (long) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE); |
| this.multipartReadBufferSize = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE); |
| this.port = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PORT); |
| this.readerFactory = processContext.getProperty(ListenHTTP.RECORD_READER).asControllerService(RecordReaderFactory.class); |
| this.writerFactory = processContext.getProperty(ListenHTTP.RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); |
| } |
| |
| @Override |
| protected void doHead(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { |
| if (request.getLocalPort() == port) { |
| response.addHeader(ACCEPT_ENCODING_NAME, ACCEPT_ENCODING_VALUE); |
| response.addHeader(ACCEPT_HEADER_NAME, ACCEPT_HEADER_VALUE); |
| response.addHeader(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION); |
| } else { |
| super.doHead(request, response); |
| } |
| } |
| |
| @Override |
| protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { |
| |
| if (request.getLocalPort() != port) { |
| super.doPost(request, response); |
| return; |
| } |
| |
| final ProcessContext context = processContext; |
| |
| ProcessSessionFactory sessionFactory; |
| do { |
| sessionFactory = sessionFactoryHolder.get(); |
| if (sessionFactory == null) { |
| try { |
| Thread.sleep(10); |
| } catch (final InterruptedException e) { |
| } |
| } |
| } while (sessionFactory == null); |
| |
| final ProcessSession session = sessionFactory.createSession(); |
| String foundSubject = null; |
| String foundIssuer = null; |
| try { |
| final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE; |
| if (n == 0 || !spaceAvailable.get()) { |
| if (context.getAvailableRelationships().isEmpty()) { |
| spaceAvailable.set(false); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Received request from " + request.getRemoteHost() + " but no space available; Indicating Service Unavailable"); |
| } |
| response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); |
| return; |
| } else { |
| spaceAvailable.set(true); |
| } |
| } |
| response.setHeader("Content-Type", MediaType.TEXT_PLAIN); |
| |
| final boolean contentGzipped = Boolean.parseBoolean(request.getHeader(GZIPPED_HEADER)); |
| |
| final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate"); |
| foundSubject = DEFAULT_FOUND_SUBJECT; |
| foundIssuer = DEFAULT_FOUND_ISSUER; |
| if (certs != null && certs.length > 0) { |
| for (final X509Certificate cert : certs) { |
| foundSubject = cert.getSubjectDN().getName(); |
| foundIssuer = cert.getIssuerDN().getName(); |
| if (authorizedPattern.matcher(foundSubject).matches()) { |
| if (authorizedIssuerPattern.matcher(foundIssuer).matches()) { |
| break; |
| } else { |
| logger.warn("Access Forbidden [Issuer not authorized] Host [{}] Subject [{}] Issuer [{}]", request.getRemoteHost(), foundSubject, foundIssuer); |
| response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on issuer dn"); |
| return; |
| } |
| } else { |
| logger.warn("Access Forbidden [Subject not authorized] Host [{}] Subject [{}] Issuer [{}]", request.getRemoteHost(), foundSubject, foundIssuer); |
| response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on subject dn"); |
| return; |
| } |
| } |
| } |
| |
| final String destinationVersion = request.getHeader(PROTOCOL_VERSION_HEADER); |
| Integer protocolVersion = null; |
| if (destinationVersion != null) { |
| try { |
| protocolVersion = Integer.valueOf(destinationVersion); |
| } catch (final NumberFormatException e) { |
| // Value was invalid. Treat as if the header were missing. |
| } |
| } |
| |
| final boolean destinationIsLegacyNiFi = (protocolVersion == null); |
| final boolean createHold = Boolean.parseBoolean(request.getHeader(FLOWFILE_CONFIRMATION_HEADER)); |
| final String contentType = request.getContentType(); |
| |
| final InputStream unthrottled = contentGzipped ? new GZIPInputStream(request.getInputStream()) : request.getInputStream(); |
| |
| final InputStream in = (streamThrottler == null) ? unthrottled : streamThrottler.newThrottledInputStream(unthrottled); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Received request from " + request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + contentType + ", gzip=" + contentGzipped); |
| } |
| |
| Set<FlowFile> flowFileSet; |
| if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains("multipart/form-data")) { |
| flowFileSet = handleMultipartRequest(request, session, foundSubject, foundIssuer); |
| } else { |
| flowFileSet = handleRequest(request, session, foundSubject, foundIssuer, destinationIsLegacyNiFi, contentType, in); |
| } |
| proceedFlow(request, response, session, foundSubject, foundIssuer, createHold, flowFileSet); |
| } catch (final Throwable t) { |
| handleException(request, response, session, foundSubject, foundIssuer, t); |
| } |
| } |
| |
| private void handleException(final HttpServletRequest request, final HttpServletResponse response, |
| final ProcessSession session, final String foundSubject, final String foundIssuer, final Throwable t) throws IOException { |
| session.rollback(); |
| logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] IssuerDN [{}] due to {}", request.getRemoteHost(), foundSubject, foundIssuer, t); |
| if (t instanceof ListenHttpException) { |
| final int returnCode = ((ListenHttpException) t).getReturnCode(); |
| response.sendError(returnCode, t.toString()); |
| } else { |
| response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString()); |
| } |
| } |
| |
| private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject, String foundIssuer) |
| throws IOException, IllegalStateException, ServletException { |
| if (isRecordProcessing()) { |
| logger.debug("Record processing will not be utilized while processing multipart request. Request URI: {}", request.getRequestURI()); |
| } |
| Set<FlowFile> flowFileSet = new HashSet<>(); |
| String tempDir = System.getProperty("java.io.tmpdir"); |
| request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, multipartRequestMaxSize, multipartRequestMaxSize, multipartReadBufferSize)); |
| List<Part> requestParts = ImmutableList.copyOf(request.getParts()); |
| for (int i = 0; i < requestParts.size(); i++) { |
| Part part = requestParts.get(i); |
| FlowFile flowFile = session.create(); |
| try (OutputStream flowFileOutputStream = session.write(flowFile)) { |
| StreamUtils.copy(part.getInputStream(), flowFileOutputStream); |
| } |
| flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, foundIssuer, flowFile); |
| flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size()); |
| flowFileSet.add(flowFile); |
| } |
| return flowFileSet; |
| } |
| |
| private FlowFile savePartDetailsAsAttributes(final ProcessSession session, final Part part, final FlowFile flowFile, final int sequenceNumber, final int allPartsCount) { |
| final Map<String, String> attributes = new HashMap<>(); |
| for (String headerName : part.getHeaderNames()) { |
| final String headerValue = part.getHeader(headerName); |
| putAttribute(attributes, "http.headers.multipart." + headerName, headerValue); |
| } |
| putAttribute(attributes, "http.multipart.size", part.getSize()); |
| putAttribute(attributes, "http.multipart.content.type", part.getContentType()); |
| putAttribute(attributes, "http.multipart.name", part.getName()); |
| putAttribute(attributes, "http.multipart.filename", part.getSubmittedFileName()); |
| putAttribute(attributes, "http.multipart.fragments.sequence.number", sequenceNumber + 1); |
| putAttribute(attributes, "http.multipart.fragments.total.number", allPartsCount); |
| return session.putAllAttributes(flowFile, attributes); |
| } |
| |
| private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session, String foundSubject, String foundIssuer, |
| final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) throws IOException { |
| FlowFile flowFile; |
| String holdUuid = null; |
| final AtomicBoolean hasMoreData = new AtomicBoolean(false); |
| final FlowFileUnpackager unpackager = getFlowFileUnpackager(contentType); |
| |
| final Set<FlowFile> flowFileSet = new HashSet<>(); |
| |
| do { |
| final long startNanos = System.nanoTime(); |
| final Map<String, String> attributes = new HashMap<>(); |
| flowFile = session.create(); |
| |
| final OutputStream out = session.write(flowFile); |
| |
| try (final BufferedOutputStream bos = new BufferedOutputStream(out, 65536)) { |
| if (unpackager == null) { |
| if (isRecordProcessing()) { |
| processRecord(in, flowFile, out); |
| } else { |
| IOUtils.copy(in, bos); |
| hasMoreData.set(false); |
| } |
| } else { |
| if (isRecordProcessing()) { |
| logger.debug("Record processing will not be utilized while processing with unpackager. Request URI: {}", request.getRequestURI()); |
| } |
| attributes.putAll(unpackager.unpackageFlowFile(in, bos)); |
| |
| if (destinationIsLegacyNiFi) { |
| if (attributes.containsKey("nf.file.name")) { |
| // for backward compatibility with old nifi... |
| attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name")); |
| } |
| |
| if (attributes.containsKey("nf.file.path")) { |
| attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path")); |
| } |
| } |
| |
| hasMoreData.set(unpackager.hasMoreData()); |
| } |
| } |
| |
| |
| final long transferNanos = System.nanoTime() - startNanos; |
| final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); |
| |
| // put metadata on flowfile |
| final String nameVal = request.getHeader(CoreAttributes.FILENAME.key()); |
| if (StringUtils.isNotBlank(nameVal)) { |
| attributes.put(CoreAttributes.FILENAME.key(), nameVal); |
| } |
| |
| String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key()); |
| if (sourceSystemFlowFileIdentifier != null) { |
| sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier; |
| |
| // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's |
| // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event |
| attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); |
| } |
| |
| flowFile = session.putAllAttributes(flowFile, attributes); |
| flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, foundIssuer, flowFile); |
| final String details = String.format("Remote DN=%s, Issuer DN=%s", foundSubject, foundIssuer); |
| session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, details, transferMillis); |
| flowFileSet.add(flowFile); |
| |
| if (holdUuid == null) { |
| holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key()); |
| } |
| } while (hasMoreData.get()); |
| return flowFileSet; |
| } |
| |
| protected void proceedFlow(final HttpServletRequest request, final HttpServletResponse response, |
| final ProcessSession session, final String foundSubject, final String foundIssuer, final boolean createHold, |
| final Set<FlowFile> flowFileSet) throws IOException { |
| if (createHold) { |
| String uuid = UUID.randomUUID().toString(); |
| |
| if (flowFileMap.containsKey(uuid)) { |
| uuid = UUID.randomUUID().toString(); |
| } |
| |
| final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost()); |
| FlowFileEntryTimeWrapper previousWrapper; |
| do { |
| previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper); |
| if (previousWrapper != null) { |
| uuid = UUID.randomUUID().toString(); |
| } |
| } while (previousWrapper != null); |
| |
| response.setStatus(HttpServletResponse.SC_SEE_OTHER); |
| final String ackUri = "/" + basePath + "/holds/" + uuid; |
| response.addHeader(LOCATION_HEADER_NAME, ackUri); |
| response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE); |
| response.getOutputStream().write(ackUri.getBytes(StandardCharsets.UTF_8)); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]; placed hold on these {} files with ID {}", |
| flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer, flowFileSet.size(), uuid); |
| } |
| } else { |
| logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]; transferring to 'success'", |
| request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer); |
| |
| session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS); |
| |
| final AsyncContext asyncContext = request.startAsync(); |
| session.commitAsync(() -> { |
| response.setStatus(this.returnCode); |
| asyncContext.complete(); |
| }, t -> { |
| logger.error("Failed to commit session. Returning error response to Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]", |
| request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer, t); |
| response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); |
| asyncContext.complete(); |
| } |
| ); |
| } |
| } |
| |
| protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session, |
| final String foundSubject, final String foundIssuer, FlowFile flowFile) { |
| Map<String, String> attributes = new HashMap<>(); |
| addMatchingRequestHeaders(request, attributes); |
| flowFile = session.putAllAttributes(flowFile, attributes); |
| flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost()); |
| flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI()); |
| flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject); |
| flowFile = session.putAttribute(flowFile, "restlistener.remote.issuer.dn", foundIssuer); |
| return flowFile; |
| } |
| |
| private void processRecord(InputStream in, FlowFile flowFile, OutputStream out) { |
| try (final RecordReader reader = readerFactory.createRecordReader(flowFile, new BufferedInputStream(in), logger)) { |
| final RecordSet recordSet = reader.createRecordSet(); |
| try (final RecordSetWriter writer = writerFactory.createWriter(logger, reader.getSchema(), out, flowFile)) { |
| writer.write(recordSet); |
| } |
| } catch (IOException | MalformedRecordException e) { |
| throw new ListenHttpException("Could not process record.", e, HttpServletResponse.SC_BAD_REQUEST); |
| } catch (SchemaNotFoundException e) { |
| throw new ListenHttpException("Could not find schema.", e, HttpServletResponse.SC_INTERNAL_SERVER_ERROR); |
| } |
| } |
| |
| private FlowFileUnpackager getFlowFileUnpackager(String contentType) { |
| final FlowFileUnpackager unpackager; |
| if (StandardFlowFileMediaType.VERSION_3.getMediaType().equals(contentType)) { |
| unpackager = new FlowFileUnpackagerV3(); |
| } else if (StandardFlowFileMediaType.VERSION_2.getMediaType().equals(contentType)) { |
| unpackager = new FlowFileUnpackagerV2(); |
| } else if (StringUtils.startsWith(contentType, StandardFlowFileMediaType.VERSION_UNSPECIFIED.getMediaType())) { |
| unpackager = new FlowFileUnpackagerV1(); |
| } else { |
| unpackager = null; |
| } |
| return unpackager; |
| } |
| |
| private void addMatchingRequestHeaders(final HttpServletRequest request, final Map<String, String> attributes) { |
| // put arbitrary headers on flow file |
| for (Enumeration<String> headerEnum = request.getHeaderNames(); |
| headerEnum.hasMoreElements(); ) { |
| String headerName = headerEnum.nextElement(); |
| if (headerPattern != null && headerPattern.matcher(headerName).matches()) { |
| String headerValue = request.getHeader(headerName); |
| attributes.put(headerName, headerValue); |
| } |
| } |
| } |
| |
| |
| |
| private void putAttribute(final Map<String, String> map, final String key, final Object value) { |
| if (value == null) { |
| return; |
| } |
| |
| putAttribute(map, key, value.toString()); |
| } |
| |
| private void putAttribute(final Map<String, String> map, final String key, final String value) { |
| if (value == null) { |
| return; |
| } |
| |
| map.put(key, value); |
| } |
| |
| private boolean isRecordProcessing() { |
| return readerFactory != null && writerFactory != null; |
| } |
| } |