blob: 9b0f46de8f70eac76b5ebedfec55d7285508a972 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.stateless.retrieval;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.stateless.ExecuteStateless;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.NiFiRegistryClient;
import org.apache.nifi.registry.client.NiFiRegistryClientConfig;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
public class RegistryDataflowProvider implements DataflowProvider {
private final ComponentLog logger;
public RegistryDataflowProvider(final ComponentLog logger) {
this.logger = logger;
}
@Override
public VersionedFlowSnapshot retrieveDataflowContents(final ProcessContext context) throws IOException {
final SSLContextService sslContextService = context.getProperty(ExecuteStateless.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLContext sslContext = sslContextService == null ? null : sslContextService.createContext();
final String url = context.getProperty(ExecuteStateless.REGISTRY_URL).getValue();
final NiFiRegistryClientConfig clientConfig = new NiFiRegistryClientConfig.Builder()
.baseUrl(url)
.connectTimeout(context.getProperty(ExecuteStateless.COMMS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue())
.readTimeout(context.getProperty(ExecuteStateless.COMMS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue())
.sslContext(sslContext)
.build();
final NiFiRegistryClient client = new JerseyNiFiRegistryClient.Builder()
.config(clientConfig)
.build();
final VersionedFlowSnapshot versionedFlowSnapshot;
final String bucketName = context.getProperty(ExecuteStateless.BUCKET).getValue();
final String flowName = context.getProperty(ExecuteStateless.FLOW_NAME).getValue();
final Integer flowVersion = context.getProperty(ExecuteStateless.FLOW_VERSION).asInteger();
try {
final String bucketId = getBucketId(client, bucketName);
final String flowId = getFlowId(client, flowName, bucketId);
logger.debug("Attempting to fetch dataflow from Registry at URL {}, Bucket {}, Flow {}, flowVersion {}", url, bucketId, flowId, flowVersion == null ? "<Latest>" : flowVersion);
if (flowVersion == null) {
versionedFlowSnapshot = client.getFlowSnapshotClient().getLatest(bucketId, flowId);
} else {
versionedFlowSnapshot = client.getFlowSnapshotClient().get(bucketId, flowId, flowVersion);
}
logger.debug("Successfully fetched dataflow from Registry at URL {}, Bucket {}, Flow {}, flowVersion {}", url, bucketId, flowId, flowVersion == null ? "<Latest>" : flowVersion);
} catch (final NiFiRegistryException e) {
throw new IOException("Failed to retrieve Flow Snapshot from Registry", e);
}
return versionedFlowSnapshot;
}
private String getFlowId(final NiFiRegistryClient client, final String flowName, final String bucketId) throws IOException {
final List<VersionedFlow> versionedFlows;
try {
versionedFlows = client.getFlowClient().getByBucket(bucketId);
} catch (NiFiRegistryException e) {
throw new IOException("Could not retrieve list of Flows from NiFi Registry for Bucket ID " + bucketId);
}
for (final VersionedFlow versionedFlow : versionedFlows) {
if (flowName.equals(versionedFlow.getName())) {
return versionedFlow.getIdentifier();
}
}
throw new IOException("Could not find a flow with the name '" + flowName + "' within bucket with ID '" + bucketId + "' in the given Registry");
}
private String getBucketId(final NiFiRegistryClient client, final String bucketName) throws IOException {
try {
final List<Bucket> allBuckets = client.getBucketClient().getAll();
final Optional<Bucket> optionalBucket = allBuckets.stream().filter(bkt -> bkt.getName().equals(bucketName)).findAny();
if (!optionalBucket.isPresent()) {
throw new IOException("Could not find a bucket with the name '" + bucketName + "' in the given Registry");
}
return optionalBucket.get().getIdentifier();
} catch (NiFiRegistryException e) {
throw new IOException("Failed to fetch buckets from NiFi Registry", e);
}
}
}