blob: a917c19238c72786c5abf9121281b151c85e598f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.toolkit.persistence;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.nifi.registry.NiFiRegistry;
import org.apache.nifi.registry.db.DataSourceFactory;
import org.apache.nifi.registry.db.DatabaseMetadataService;
import org.apache.nifi.registry.db.entity.BucketEntity;
import org.apache.nifi.registry.db.entity.FlowEntity;
import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
import org.apache.nifi.registry.extension.ExtensionManager;
import org.apache.nifi.registry.flow.FlowPersistenceProvider;
import org.apache.nifi.registry.properties.NiFiRegistryProperties;
import org.apache.nifi.registry.provider.StandardProviderFactory;
import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext;
import org.apache.nifi.registry.service.MetadataService;
import org.apache.nifi.registry.service.mapper.BucketMappings;
import org.apache.nifi.registry.service.mapper.FlowMappings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
public class FlowPersistenceProviderMigrator {
private static final Logger log = LoggerFactory.getLogger(FlowPersistenceProviderMigrator.class);
public static final int PARSE_EXCEPTION = 1;
public void doMigrate(MetadataService fromMetadata, FlowPersistenceProvider fromProvider, FlowPersistenceProvider toProvider) {
for (BucketEntity bucket : fromMetadata.getAllBuckets()) {
for (FlowEntity flow : fromMetadata.getFlowsByBucket(bucket.getId())) {
for (FlowSnapshotEntity flowSnapshot : fromMetadata.getSnapshots(flow.getId())) {
StandardFlowSnapshotContext context = new StandardFlowSnapshotContext.Builder(
BucketMappings.map(bucket),
FlowMappings.map(bucket, flow),
FlowMappings.map(bucket, flowSnapshot)).build();
int version = flowSnapshot.getVersion();
toProvider.saveFlowContent(context, fromProvider.getFlowContent(bucket.getId(), flow.getId(), version));
log.info("Migrated flow {} version {}", flow.getName(), version);
}
}
}
}
public static void main(String[] args) {
Options options = new Options();
options.addOption("t", "to", true, "Providers xml to migrate to.");
CommandLineParser parser = new DefaultParser();
CommandLine commandLine = null;
try {
commandLine = parser.parse(options, args);
} catch (ParseException e) {
log.error("Unable to parse command line.", e);
new HelpFormatter().printHelp("persistence-toolkit [args]", options);
System.exit(PARSE_EXCEPTION);
}
NiFiRegistryProperties fromProperties = NiFiRegistry.initializeProperties(NiFiRegistry.getMasterKeyProvider());
DatabaseMetadataService fromMetadataService = new DatabaseMetadataService(new JdbcTemplate(new DataSourceFactory(fromProperties).getDataSource()));
FlowPersistenceProvider fromPersistenceProvider = createFlowPersistenceProvider(fromProperties);
FlowPersistenceProvider toPersistenceProvider = createFlowPersistenceProvider(createToProperties(commandLine, fromProperties));
new FlowPersistenceProviderMigrator().doMigrate(fromMetadataService, fromPersistenceProvider, toPersistenceProvider);
}
private static NiFiRegistryProperties createToProperties(CommandLine commandLine, NiFiRegistryProperties fromProperties) {
NiFiRegistryProperties toProperties = new NiFiRegistryProperties();
for (String propertyKey : fromProperties.getPropertyKeys()) {
toProperties.setProperty(propertyKey, fromProperties.getProperty(propertyKey));
}
toProperties.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, commandLine.getOptionValue('t'));
return toProperties;
}
private static FlowPersistenceProvider createFlowPersistenceProvider(NiFiRegistryProperties niFiRegistryProperties) {
ExtensionManager fromExtensionManager = new ExtensionManager(niFiRegistryProperties);
fromExtensionManager.discoverExtensions();
StandardProviderFactory fromProviderFactory = new StandardProviderFactory(niFiRegistryProperties, fromExtensionManager);
fromProviderFactory.initialize();
return fromProviderFactory.getFlowPersistenceProvider();
}
}