NIFI-13105 Implemented FlowRegistryClient using GitHub for versioning
This closes #8765
Signed-off-by: David Handermann <exceptionfactory@apache.org>
diff --git a/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java b/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
index 749e5b8..4436c55 100644
--- a/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
+++ b/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
/**
* <p>
@@ -146,7 +147,7 @@
FlowRegistryBucket getBucket(FlowRegistryClientConfigurationContext context, BucketLocation bucketLocation) throws FlowRegistryException, IOException;
/**
- * Registers the given RegisteredFlow into the the Flow Registry.
+ * Registers the given RegisteredFlow into the Flow Registry.
*
* @param context Configuration context.
* @param flow The RegisteredFlow to add to the Registry.
@@ -252,4 +253,15 @@
* @throws IOException If there is issue with the communication between NiFi and the Flow Registry.
*/
Optional<String> getLatestVersion(FlowRegistryClientConfigurationContext context, FlowLocation flowLocation) throws FlowRegistryException, IOException;
+
+ /**
+ * Generates the id for registering a flow.
+ *
+ * @param flowName the name of the flow
+ * @return the generated id
+ */
+ default String generateFlowId(final String flowName) {
+ return UUID.randomUUID().toString();
+ }
+
}
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 00ee910..6501ad0 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -872,6 +872,12 @@
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-github-nar</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
<!-- AspectJ library needed by the Java Agent used for native library loading (see bootstrap.conf) -->
<dependency>
<groupId>org.aspectj</groupId>
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/pom.xml b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/pom.xml
new file mode 100644
index 0000000..be73f95
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/pom.xml
@@ -0,0 +1,43 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-github-bundle</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>nifi-github-extensions</artifactId>
+ <packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-jakarta-xmlbind-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.kohsuke</groupId>
+ <artifactId>github-api</artifactId>
+ <version>${github-api.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/FlowSnapshotSerializer.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/FlowSnapshotSerializer.java
new file mode 100644
index 0000000..9f464e1
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/FlowSnapshotSerializer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.github;
+
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Serializer for flow snapshots.
+ */
+public interface FlowSnapshotSerializer {
+
+ String serialize(final RegisteredFlowSnapshot flowSnapshot) throws IOException;
+
+ RegisteredFlowSnapshot deserialize(final InputStream inputStream) throws IOException;
+
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubAuthenticationType.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubAuthenticationType.java
new file mode 100644
index 0000000..964c376
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubAuthenticationType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.github;
+
+/**
+ * Enumeration of authentication types for the GitHub client.
+ */
+public enum GitHubAuthenticationType {
+
+ NONE,
+ PERSONAL_ACCESS_TOKEN,
+ APP_INSTALLATION_TOKEN;
+
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubCreateContentRequest.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubCreateContentRequest.java
new file mode 100644
index 0000000..f1864eb
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubCreateContentRequest.java
@@ -0,0 +1,101 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.github;
+
+import java.util.Objects;
+
+public class GitHubCreateContentRequest {
+
+ private final String branch;
+ private final String path;
+ private final String content;
+ private final String message;
+ private final String existingContentSha;
+
+ private GitHubCreateContentRequest(final Builder builder) {
+ this.branch = Objects.requireNonNull(builder.branch);
+ this.path = Objects.requireNonNull(builder.path);
+ this.content = Objects.requireNonNull(builder.content);
+ this.message = Objects.requireNonNull(builder.message);
+ // Will be null for create, and populated for update
+ this.existingContentSha = builder.existingContentSha;
+ }
+
+ public String getBranch() {
+ return branch;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getContent() {
+ return content;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public String getExistingContentSha() {
+ return existingContentSha;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+ private String branch;
+ private String path;
+ private String content;
+ private String message;
+ private String existingContentSha;
+
+ public Builder branch(final String branch) {
+ this.branch = branch;
+ return this;
+ }
+
+ public Builder path(final String path) {
+ this.path = path;
+ return this;
+ }
+
+ public Builder content(final String content) {
+ this.content = content;
+ return this;
+ }
+
+ public Builder message(final String message) {
+ this.message = message;
+ return this;
+ }
+
+ public Builder existingContentSha(final String existingSha) {
+ this.existingContentSha = existingSha;
+ return this;
+ }
+
+ public GitHubCreateContentRequest build() {
+ return new GitHubCreateContentRequest(this);
+ }
+ }
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubFlowRegistryClient.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubFlowRegistryClient.java
new file mode 100644
index 0000000..34fce1d
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubFlowRegistryClient.java
@@ -0,0 +1,677 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.nifi.github;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.registry.flow.AbstractFlowRegistryClient;
+import org.apache.nifi.registry.flow.AuthorizationException;
+import org.apache.nifi.registry.flow.BucketLocation;
+import org.apache.nifi.registry.flow.FlowAlreadyExistsException;
+import org.apache.nifi.registry.flow.FlowLocation;
+import org.apache.nifi.registry.flow.FlowRegistryBranch;
+import org.apache.nifi.registry.flow.FlowRegistryBucket;
+import org.apache.nifi.registry.flow.FlowRegistryClientConfigurationContext;
+import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext;
+import org.apache.nifi.registry.flow.FlowRegistryException;
+import org.apache.nifi.registry.flow.FlowRegistryPermissions;
+import org.apache.nifi.registry.flow.FlowVersionLocation;
+import org.apache.nifi.registry.flow.RegisterAction;
+import org.apache.nifi.registry.flow.RegisteredFlow;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
+import org.kohsuke.github.GHCommit;
+import org.kohsuke.github.GHContent;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link org.apache.nifi.registry.flow.FlowRegistryClient} that uses GitHub for version controlling flows.
+ */
+public class GitHubFlowRegistryClient extends AbstractFlowRegistryClient {
+
+ static final PropertyDescriptor GITHUB_API_URL = new PropertyDescriptor.Builder()
+ .name("GitHub API URL")
+ .description("The URL of the GitHub API")
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .defaultValue("https://api.github.com/")
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor REPOSITORY_NAME = new PropertyDescriptor.Builder()
+ .name("Repository Name")
+ .description("The name of the repository")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor REPOSITORY_OWNER = new PropertyDescriptor.Builder()
+ .name("Repository Owner")
+ .description("The owner of the repository")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor REPOSITORY_BRANCH = new PropertyDescriptor.Builder()
+ .name("Default Branch")
+ .description("The default branch to use for this client")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .defaultValue("main")
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor REPOSITORY_PATH = new PropertyDescriptor.Builder()
+ .name("Repository Path")
+ .description("The path with in the repository that this client will use to store all data. " +
+ "If left blank, then the root of the repository will be used.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .required(false)
+ .build();
+
+ static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder()
+ .name("Authentication Type")
+ .description("The type of authentication to use for accessing GitHub")
+ .allowableValues(GitHubAuthenticationType.values())
+ .defaultValue(GitHubAuthenticationType.NONE.name())
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor PERSONAL_ACCESS_TOKEN = new PropertyDescriptor.Builder()
+ .name("Personal Access Token")
+ .description("The personal access token to use for authentication")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .required(true)
+ .sensitive(true)
+ .dependsOn(AUTHENTICATION_TYPE, GitHubAuthenticationType.PERSONAL_ACCESS_TOKEN.name())
+ .build();
+
+ static final PropertyDescriptor APP_INSTALLATION_TOKEN = new PropertyDescriptor.Builder()
+ .name("App Installation Token")
+ .description("The app installation token to use for authentication")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .required(true)
+ .sensitive(true)
+ .dependsOn(AUTHENTICATION_TYPE, GitHubAuthenticationType.APP_INSTALLATION_TOKEN.name())
+ .build();
+
+ static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
+ GITHUB_API_URL,
+ REPOSITORY_OWNER,
+ REPOSITORY_NAME,
+ REPOSITORY_BRANCH,
+ REPOSITORY_PATH,
+ AUTHENTICATION_TYPE,
+ PERSONAL_ACCESS_TOKEN,
+ APP_INSTALLATION_TOKEN
+ );
+
+ static final String DEFAULT_BUCKET_NAME = "default";
+ static final String DEFAULT_BUCKET_KEEP_FILE_PATH = DEFAULT_BUCKET_NAME + "/.keep";
+ static final String DEFAULT_BUCKET_KEEP_FILE_CONTENT = "Do Not Delete";
+ static final String DEFAULT_BUCKET_KEEP_FILE_MESSAGE = "Creating default bucket";
+
+ static final String REGISTER_FLOW_MESSAGE_PREFIX = "Registering Flow";
+ static final String REGISTER_FLOW_MESSAGE_FORMAT = REGISTER_FLOW_MESSAGE_PREFIX + " [%s]";
+ static final String DEREGISTER_FLOW_MESSAGE_FORMAT = "Deregistering Flow [%s]";
+ static final String DEFAULT_FLOW_SNAPSHOT_MESSAGE_FORMAT = "Saving Flow Snapshot %s";
+ static final String SNAPSHOT_FILE_EXTENSION = ".json";
+ static final String SNAPSHOT_FILE_PATH_FORMAT = "%s/%s" + SNAPSHOT_FILE_EXTENSION;
+ static final String FLOW_CONTENTS_GROUP_ID = "flow-contents-group";
+
+ static final String STORAGE_LOCATION_PREFIX = "git@github.com:";
+ static final String STORAGE_LOCATION_FORMAT = STORAGE_LOCATION_PREFIX + "%s/%s.git";
+
+ private volatile FlowSnapshotSerializer flowSnapshotSerializer;
+ private volatile GitHubRepositoryClient repositoryClient;
+ private final AtomicBoolean clientInitialized = new AtomicBoolean(false);
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+ final String repoPath = validationContext.getProperty(REPOSITORY_PATH).getValue();
+ if (repoPath != null && (repoPath.startsWith("/") || repoPath.endsWith("/"))) {
+ results.add(new ValidationResult.Builder()
+ .subject(REPOSITORY_PATH.getDisplayName())
+ .valid(false)
+ .explanation("Path can not start or end with /")
+ .build());
+ }
+
+ return results;
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ super.onPropertyModified(descriptor, oldValue, newValue);
+ synchronized (this) {
+ clientInitialized.set(false);
+ }
+ }
+
+ @Override
+ public void initialize(final FlowRegistryClientInitializationContext context) {
+ super.initialize(context);
+ flowSnapshotSerializer = createFlowSnapshotSerializer(context);
+ }
+
+ // protected to allow for overriding from tests
+ protected FlowSnapshotSerializer createFlowSnapshotSerializer(final FlowRegistryClientInitializationContext initializationContext) {
+ return new JacksonFlowSnapshotSerializer();
+ }
+
+ @Override
+ public boolean isStorageLocationApplicable(final FlowRegistryClientConfigurationContext context, final String location) {
+ return location != null && location.startsWith(STORAGE_LOCATION_PREFIX);
+ }
+
+ @Override
+ public boolean isBranchingSupported(final FlowRegistryClientConfigurationContext context) {
+ return true;
+ }
+
+ @Override
+ public Set<FlowRegistryBranch> getBranches(final FlowRegistryClientConfigurationContext context) throws FlowRegistryException, IOException {
+ final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+ verifyReadPermissions(repositoryClient);
+
+ return repositoryClient.getBranches().stream()
+ .map(branchName -> {
+ final FlowRegistryBranch flowRegistryBranch = new FlowRegistryBranch();
+ flowRegistryBranch.setName(branchName);
+ return flowRegistryBranch;
+ }).collect(Collectors.toSet());
+ }
+
+ @Override
+ public FlowRegistryBranch getDefaultBranch(final FlowRegistryClientConfigurationContext context) {
+ final FlowRegistryBranch defaultBranch = new FlowRegistryBranch();
+ defaultBranch.setName(context.getProperty(REPOSITORY_BRANCH).getValue());
+ return defaultBranch;
+ }
+
+ @Override
+ public Set<FlowRegistryBucket> getBuckets(final FlowRegistryClientConfigurationContext context, final String branch) throws IOException, FlowRegistryException {
+ final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+ verifyReadPermissions(repositoryClient);
+
+ final Set<FlowRegistryBucket> buckets = repositoryClient.getTopLevelDirectoryNames(branch).stream()
+ .map(bucketName -> createFlowRegistryBucket(repositoryClient, bucketName))
+ .collect(Collectors.toSet());
+
+ // if the repository has no top-level directories, then return a default bucket entry, this won't exist in the repository until the first time a flow is saved to it
+ return buckets.isEmpty() ? Set.of(createFlowRegistryBucket(repositoryClient, DEFAULT_BUCKET_NAME)) : buckets;
+ }
+
+ @Override
+ public FlowRegistryBucket getBucket(final FlowRegistryClientConfigurationContext context, final BucketLocation bucketLocation) throws FlowRegistryException, IOException {
+ final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+ verifyReadPermissions(repositoryClient);
+ return createFlowRegistryBucket(repositoryClient, bucketLocation.getBucketId());
+ }
+
+ @Override
+ public RegisteredFlow registerFlow(final FlowRegistryClientConfigurationContext context, final RegisteredFlow flow) throws FlowRegistryException, IOException {
+ final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+ verifyWritePermissions(repositoryClient);
+
+ final String branch = flow.getBranch();
+ final FlowLocation flowLocation = new FlowLocation(branch, flow.getBucketIdentifier(), flow.getIdentifier());
+ final String filePath = getSnapshotFilePath(flowLocation);
+ final String commitMessage = REGISTER_FLOW_MESSAGE_FORMAT.formatted(flow.getIdentifier());
+
+ final Optional<String> existingFileSha = repositoryClient.getContentSha(filePath, branch);
+ if (existingFileSha.isPresent()) {
+ throw new FlowAlreadyExistsException("Another flow is already registered at [" + filePath + "] on branch [" + branch + "]");
+ }
+
+ // Clear values we don't want in the json stored in GitHub
+ final String originalBucketId = flow.getBucketIdentifier();
+ flow.setBucketIdentifier(null);
+ flow.setBucketName(null);
+ flow.setBranch(null);
+
+ final RegisteredFlowSnapshot flowSnapshot = new RegisteredFlowSnapshot();
+ flowSnapshot.setFlow(flow);
+
+ final GitHubCreateContentRequest request = GitHubCreateContentRequest.builder()
+ .branch(branch)
+ .path(filePath)
+ .content(flowSnapshotSerializer.serialize(flowSnapshot))
+ .message(commitMessage)
+ .build();
+
+ repositoryClient.createContent(request);
+
+ // Re-populate fields before returning
+ flow.setBucketName(originalBucketId);
+ flow.setBucketIdentifier(originalBucketId);
+ flow.setBranch(branch);
+
+ return flow;
+ }
+
+ @Override
+ public RegisteredFlow deregisterFlow(final FlowRegistryClientConfigurationContext context, final FlowLocation flowLocation) throws FlowRegistryException, IOException {
+ final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+ verifyWritePermissions(repositoryClient);
+
+ final String branch = flowLocation.getBranch();
+ final String filePath = getSnapshotFilePath(flowLocation);
+ final String commitMessage = DEREGISTER_FLOW_MESSAGE_FORMAT.formatted(flowLocation.getFlowId());
+ final GHContent deletedSnapshotContent = repositoryClient.deleteContent(filePath, commitMessage, branch);
+
+ final RegisteredFlowSnapshot deletedSnapshot = getSnapshot(deletedSnapshotContent.read());
+ updateBucketReferences(repositoryClient, deletedSnapshot, flowLocation.getBucketId());
+ return deletedSnapshot.getFlow();
+ }
+
+ @Override
+ public RegisteredFlow getFlow(final FlowRegistryClientConfigurationContext context, final FlowLocation flowLocation) throws FlowRegistryException, IOException {
+ final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+ verifyReadPermissions(repositoryClient);
+
+ final String branch = flowLocation.getBranch();
+ final String filePath = getSnapshotFilePath(flowLocation);
+
+ final RegisteredFlowSnapshot existingSnapshot = getSnapshot(filePath, branch);
+ populateFlowAndSnapshotMetadata(existingSnapshot, flowLocation);
+ updateBucketReferences(repositoryClient, existingSnapshot, flowLocation.getBucketId());
+
+ final RegisteredFlow registeredFlow = existingSnapshot.getFlow();
+ registeredFlow.setBranch(branch);
+ return registeredFlow;
+ }
+
+ @Override
+ public Set<RegisteredFlow> getFlows(final FlowRegistryClientConfigurationContext context, final BucketLocation bucketLocation) throws IOException, FlowRegistryException {
+ final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+ verifyReadPermissions(repositoryClient);
+
+ final String branch = bucketLocation.getBranch();
+ final String bucketId = bucketLocation.getBucketId();
+
+ return repositoryClient.getFileNames(bucketId, branch).stream()
+ .filter(filename -> filename.endsWith(SNAPSHOT_FILE_EXTENSION))
+ .map(filename -> mapToRegisteredFlow(bucketLocation, filename))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final FlowVersionLocation flowVersionLocation)
+ throws FlowRegistryException, IOException {
+ final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+ verifyReadPermissions(repositoryClient);
+
+ final String version = flowVersionLocation.getVersion();
+ final String filePath = getSnapshotFilePath(flowVersionLocation);
+
+ final InputStream inputStream = repositoryClient.getContentFromCommit(filePath, version);
+ final RegisteredFlowSnapshot flowSnapshot = getSnapshot(inputStream);
+ populateFlowAndSnapshotMetadata(flowSnapshot, flowVersionLocation);
+
+ // populate values that aren't store in GitHub
+ flowSnapshot.getSnapshotMetadata().setVersion(version);
+ flowSnapshot.getSnapshotMetadata().setBranch(flowVersionLocation.getBranch());
+ flowSnapshot.getFlow().setBranch(flowVersionLocation.getBranch());
+
+ // populate outgoing bucket references
+ updateBucketReferences(repositoryClient, flowSnapshot, flowVersionLocation.getBucketId());
+
+ // determine if the version is the "latest" version by comparing to the response of getLatestVersion
+ final String latestVersion = getLatestVersion(context, flowVersionLocation).orElse(null);
+ flowSnapshot.setLatest(version.equals(latestVersion));
+
+ return flowSnapshot;
+ }
+
+ @Override
+ public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot, final RegisterAction action)
+ throws FlowRegistryException, IOException {
+ final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+ verifyWritePermissions(repositoryClient);
+
+ final RegisteredFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
+ final String branch = snapshotMetadata.getBranch();
+ final FlowLocation flowLocation = new FlowLocation(snapshotMetadata.getBranch(), snapshotMetadata.getBucketIdentifier(), snapshotMetadata.getFlowIdentifier());
+ final String filePath = getSnapshotFilePath(flowLocation);
+ final String previousSha = repositoryClient.getContentSha(filePath, branch).orElse(null);
+
+ final String snapshotComments = snapshotMetadata.getComments();
+ final String commitMessage = StringUtils.isBlank(snapshotComments) ? DEFAULT_FLOW_SNAPSHOT_MESSAGE_FORMAT.formatted(flowLocation.getFlowId()) : snapshotComments;
+
+ final RegisteredFlowSnapshot existingSnapshot = getSnapshot(filePath, branch);
+ populateFlowAndSnapshotMetadata(existingSnapshot, flowLocation);
+
+ final RegisteredFlow existingFlow = existingSnapshot.getFlow();
+ existingFlow.setBranch(null);
+ flowSnapshot.setFlow(existingFlow);
+
+ // Clear values we don't want stored in the json in GitHub
+ flowSnapshot.setBucket(null);
+ flowSnapshot.getSnapshotMetadata().setBucketIdentifier(null);
+ flowSnapshot.getSnapshotMetadata().setBranch(null);
+ flowSnapshot.getSnapshotMetadata().setVersion(null);
+ flowSnapshot.getSnapshotMetadata().setComments(null);
+ flowSnapshot.getSnapshotMetadata().setTimestamp(0);
+
+ // replace the id of the top level group and all of its references with a constant value prior to serializing to avoid
+ // unnecessary diffs when different instances of the same flow are imported and have different top-level PG ids
+ final String originalFlowContentsGroupId = replaceGroupId(flowSnapshot.getFlowContents(), FLOW_CONTENTS_GROUP_ID);
+ final Position originalFlowContentsPosition = replacePosition(flowSnapshot.getFlowContents(), new Position(0, 0));
+
+ final GitHubCreateContentRequest createContentRequest = GitHubCreateContentRequest.builder()
+ .branch(branch)
+ .path(filePath)
+ .content(flowSnapshotSerializer.serialize(flowSnapshot))
+ .message(commitMessage)
+ .existingContentSha(previousSha)
+ .build();
+
+ final String createContentCommitSha = repositoryClient.createContent(createContentRequest);
+
+ final VersionedFlowCoordinates versionedFlowCoordinates = new VersionedFlowCoordinates();
+ versionedFlowCoordinates.setRegistryId(getIdentifier());
+ versionedFlowCoordinates.setBranch(flowLocation.getBranch());
+ versionedFlowCoordinates.setBucketId(flowLocation.getBucketId());
+ versionedFlowCoordinates.setFlowId(flowLocation.getFlowId());
+ versionedFlowCoordinates.setVersion(createContentCommitSha);
+ versionedFlowCoordinates.setStorageLocation(getStorageLocation(repositoryClient));
+
+ flowSnapshot.getFlowContents().setVersionedFlowCoordinates(versionedFlowCoordinates);
+ flowSnapshot.getFlow().setBranch(branch);
+ flowSnapshot.getSnapshotMetadata().setBranch(branch);
+ flowSnapshot.getSnapshotMetadata().setVersion(createContentCommitSha);
+ flowSnapshot.setLatest(true);
+
+ // populate outgoing bucket references
+ updateBucketReferences(repositoryClient, flowSnapshot, flowLocation.getBucketId());
+
+ // set back to the original id so that the returned snapshot is has the correct values from what was passed in
+ replaceGroupId(flowSnapshot.getFlowContents(), originalFlowContentsGroupId);
+ replacePosition(flowSnapshot.getFlowContents(), originalFlowContentsPosition);
+
+ return flowSnapshot;
+ }
+
+ @Override
+ public Set<RegisteredFlowSnapshotMetadata> getFlowVersions(final FlowRegistryClientConfigurationContext context, final FlowLocation flowLocation)
+ throws FlowRegistryException, IOException {
+ final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+ verifyReadPermissions(repositoryClient);
+
+ final String branch = flowLocation.getBranch();
+ final String filePath = getSnapshotFilePath(flowLocation);
+
+ final Set<RegisteredFlowSnapshotMetadata> snapshotMetadataSet = new LinkedHashSet<>();
+ for (final GHCommit ghCommit : repositoryClient.getCommits(filePath, branch)) {
+ final RegisteredFlowSnapshotMetadata snapshotMetadata = createSnapshotMetadata(ghCommit, flowLocation);
+ if (snapshotMetadata.getComments() != null && snapshotMetadata.getComments().startsWith(REGISTER_FLOW_MESSAGE_PREFIX)) {
+ continue;
+ }
+ snapshotMetadataSet.add(snapshotMetadata);
+ }
+ return snapshotMetadataSet;
+ }
+
+ @Override
+ public Optional<String> getLatestVersion(final FlowRegistryClientConfigurationContext context, final FlowLocation flowLocation) throws FlowRegistryException, IOException {
+ final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+ verifyReadPermissions(repositoryClient);
+
+ final String branch = flowLocation.getBranch();
+ final String filePath = getSnapshotFilePath(flowLocation);
+
+ final List<GHCommit> commits = repositoryClient.getCommits(filePath, branch);
+ final String latestVersion = commits.isEmpty() ? null : commits.getFirst().getSHA1();
+ return Optional.ofNullable(latestVersion);
+ }
+
+ @Override
+ public String generateFlowId(final String flowName) {
+ return flowName.trim()
+ .replaceAll("\\s", "-") // replace whitespace with -
+ .replaceAll("[^a-zA-Z0-9-]", "") // replace all other invalid chars with empty string
+ .replaceAll("(-)\\1+", "$1"); // replace consecutive - with single -
+ }
+
+ private FlowRegistryBucket createFlowRegistryBucket(final GitHubRepositoryClient repositoryClient, final String name) {
+ final FlowRegistryPermissions bucketPermissions = new FlowRegistryPermissions();
+ bucketPermissions.setCanRead(repositoryClient.getCanRead());
+ bucketPermissions.setCanWrite(repositoryClient.getCanWrite());
+ bucketPermissions.setCanDelete(repositoryClient.getCanWrite());
+
+ final FlowRegistryBucket bucket = new FlowRegistryBucket();
+ bucket.setIdentifier(name);
+ bucket.setName(name);
+ bucket.setPermissions(bucketPermissions);
+ return bucket;
+ }
+
+ private RegisteredFlowSnapshotMetadata createSnapshotMetadata(final GHCommit ghCommit, final FlowLocation flowLocation) throws IOException {
+ final GHCommit.ShortInfo shortInfo = ghCommit.getCommitShortInfo();
+
+ final RegisteredFlowSnapshotMetadata snapshotMetadata = new RegisteredFlowSnapshotMetadata();
+ snapshotMetadata.setBranch(flowLocation.getBranch());
+ snapshotMetadata.setBucketIdentifier(flowLocation.getBucketId());
+ snapshotMetadata.setFlowIdentifier(flowLocation.getFlowId());
+ snapshotMetadata.setVersion(ghCommit.getSHA1());
+ snapshotMetadata.setAuthor(ghCommit.getAuthor().getLogin());
+ snapshotMetadata.setComments(shortInfo.getMessage());
+ snapshotMetadata.setTimestamp(shortInfo.getCommitDate().getTime());
+ return snapshotMetadata;
+ }
+
+ private RegisteredFlow mapToRegisteredFlow(final BucketLocation bucketLocation, final String filename) {
+ final String branch = bucketLocation.getBranch();
+ final String bucketId = bucketLocation.getBucketId();
+ final String flowId = filename.replace(SNAPSHOT_FILE_EXTENSION, "");
+
+ final RegisteredFlow registeredFlow = new RegisteredFlow();
+ registeredFlow.setIdentifier(flowId);
+ registeredFlow.setName(flowId);
+ registeredFlow.setBranch(branch);
+ registeredFlow.setBucketIdentifier(bucketId);
+ registeredFlow.setBucketName(bucketId);
+ return registeredFlow;
+ }
+
+ private String getSnapshotFilePath(final FlowLocation flowLocation) {
+ return SNAPSHOT_FILE_PATH_FORMAT.formatted(flowLocation.getBucketId(), flowLocation.getFlowId());
+ }
+
+ private RegisteredFlowSnapshot getSnapshot(final String filePath, final String branch) throws IOException, FlowRegistryException {
+ try (final InputStream contentInputStream = repositoryClient.getContentFromBranch(filePath, branch)) {
+ return flowSnapshotSerializer.deserialize(contentInputStream);
+ }
+ }
+
+ private RegisteredFlowSnapshot getSnapshot(final InputStream inputStream) throws IOException {
+ try {
+ return flowSnapshotSerializer.deserialize(inputStream);
+ } finally {
+ IOUtils.closeQuietly(inputStream);
+ }
+ }
+
+ private Position replacePosition(final VersionedProcessGroup group, final Position newPosition) {
+ final Position originalPosition = group.getPosition();
+ group.setPosition(newPosition);
+ return originalPosition;
+ }
+
+ private String replaceGroupId(final VersionedProcessGroup group, final String newGroupId) {
+ final String originalGroupId = group.getIdentifier();
+ group.setIdentifier(newGroupId);
+
+ replaceGroupId(group.getProcessGroups(), newGroupId);
+ replaceGroupId(group.getRemoteProcessGroups(), newGroupId);
+ replaceGroupId(group.getProcessors(), newGroupId);
+ replaceGroupId(group.getFunnels(), newGroupId);
+ replaceGroupId(group.getLabels(), newGroupId);
+ replaceGroupId(group.getInputPorts(), newGroupId);
+ replaceGroupId(group.getOutputPorts(), newGroupId);
+ replaceGroupId(group.getControllerServices(), newGroupId);
+ replaceGroupId(group.getConnections(), newGroupId);
+
+ if (group.getConnections() != null) {
+ for (final VersionedConnection connection : group.getConnections()) {
+ replaceGroupId(connection.getSource(), originalGroupId, newGroupId);
+ replaceGroupId(connection.getDestination(), originalGroupId, newGroupId);
+ }
+ }
+
+ return originalGroupId;
+ }
+
+ private <T extends VersionedComponent> void replaceGroupId(final Collection<T> components, final String newGroupIdentifier) {
+ if (components == null) {
+ return;
+ }
+ components.forEach(c -> c.setGroupIdentifier(newGroupIdentifier));
+ }
+
+ private void replaceGroupId(final ConnectableComponent connectableComponent, final String originalGroupId, final String newGroupId) {
+ if (connectableComponent == null) {
+ return;
+ }
+
+ if (originalGroupId.equals(connectableComponent.getGroupId())) {
+ connectableComponent.setGroupId(newGroupId);
+ }
+ }
+
+ private void updateBucketReferences(final GitHubRepositoryClient repositoryClient, final RegisteredFlowSnapshot flowSnapshot, final String bucketId) {
+ final FlowRegistryBucket bucket = createFlowRegistryBucket(repositoryClient, bucketId);
+ flowSnapshot.setBucket(bucket);
+
+ final RegisteredFlow flow = flowSnapshot.getFlow();
+ flow.setBucketName(bucketId);
+ flow.setBucketIdentifier(bucketId);
+
+ final RegisteredFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
+ snapshotMetadata.setBucketIdentifier(bucketId);
+ }
+
+ // Ensures the snapshot has non-null flow and metadata fields, which would only be null if taking a flow from "Download Flow Definition" and adding directly to GitHub
+ private void populateFlowAndSnapshotMetadata(final RegisteredFlowSnapshot flowSnapshot, final FlowLocation flowLocation) {
+ if (flowSnapshot.getFlow() == null) {
+ final RegisteredFlow registeredFlow = new RegisteredFlow();
+ registeredFlow.setName(flowLocation.getFlowId());
+ registeredFlow.setIdentifier(flowLocation.getFlowId());
+ flowSnapshot.setFlow(registeredFlow);
+ }
+ if (flowSnapshot.getSnapshotMetadata() == null) {
+ final RegisteredFlowSnapshotMetadata snapshotMetadata = new RegisteredFlowSnapshotMetadata();
+ snapshotMetadata.setFlowIdentifier(flowLocation.getFlowId());
+ flowSnapshot.setSnapshotMetadata(snapshotMetadata);
+ }
+ }
+
+ private String getStorageLocation(final GitHubRepositoryClient repositoryClient) {
+ return STORAGE_LOCATION_FORMAT.formatted(repositoryClient.getRepoOwner(), repositoryClient.getRepoName());
+ }
+
+ private void verifyWritePermissions(final GitHubRepositoryClient repositoryClient) throws AuthorizationException {
+ if (!repositoryClient.getCanWrite()) {
+ throw new AuthorizationException("Client does not have write access to the GitHub repository");
+ }
+ }
+
+ private void verifyReadPermissions(final GitHubRepositoryClient repositoryClient) throws AuthorizationException {
+ if (!repositoryClient.getCanRead()) {
+ throw new AuthorizationException("Client does not have read access to the GitHub repository");
+ }
+ }
+
+ private synchronized GitHubRepositoryClient getRepositoryClient(final FlowRegistryClientConfigurationContext context) throws IOException, FlowRegistryException {
+ if (!clientInitialized.get()) {
+ getLogger().info("Initializing GitHub repository client");
+ repositoryClient = createRepositoryClient(context);
+ clientInitialized.set(true);
+ initializeDefaultBucket(context);
+ }
+ return repositoryClient;
+ }
+
+ // protected so can be overridden during tests
+ protected GitHubRepositoryClient createRepositoryClient(final FlowRegistryClientConfigurationContext context) throws IOException, FlowRegistryException {
+ return GitHubRepositoryClient.builder()
+ .apiUrl(context.getProperty(GITHUB_API_URL).getValue())
+ .authenticationType(GitHubAuthenticationType.valueOf(context.getProperty(AUTHENTICATION_TYPE).getValue()))
+ .personalAccessToken(context.getProperty(PERSONAL_ACCESS_TOKEN).getValue())
+ .appInstallationToken(context.getProperty(APP_INSTALLATION_TOKEN).getValue())
+ .repoOwner(context.getProperty(REPOSITORY_OWNER).getValue())
+ .repoName(context.getProperty(REPOSITORY_NAME).getValue())
+ .repoPath(context.getProperty(REPOSITORY_PATH).getValue())
+ .build();
+ }
+
+ // If the client has write permissions to the repo, then ensure the directory for the default bucket is present and if not create it,
+ // otherwise the client can only be used to import flows from the repo and won't be able to set up the default bucket
+ private void initializeDefaultBucket(final FlowRegistryClientConfigurationContext context) throws IOException, FlowRegistryException {
+ if (!repositoryClient.getCanWrite()) {
+ getLogger().info("GitHub repository client does not have write permissions to the repository, skipping setup of default bucket");
+ return;
+ }
+
+ final String branch = context.getProperty(REPOSITORY_BRANCH).getValue();
+ final Set<String> bucketDirectoryNames = repositoryClient.getTopLevelDirectoryNames(branch);
+ if (!bucketDirectoryNames.isEmpty()) {
+ getLogger().info("Found existing buckets, skipping setup of default bucket");
+ return;
+ }
+
+ getLogger().info("Creating default bucket in repo [{}/{}] on branch [{}]", repositoryClient.getRepoOwner(), repositoryClient.getRepoName(), branch);
+
+ repositoryClient.createContent(
+ GitHubCreateContentRequest.builder()
+ .branch(branch)
+ .path(DEFAULT_BUCKET_KEEP_FILE_PATH)
+ .content(DEFAULT_BUCKET_KEEP_FILE_CONTENT)
+ .message(DEFAULT_BUCKET_KEEP_FILE_MESSAGE)
+ .build()
+ );
+ }
+
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java
new file mode 100644
index 0000000..fc85db0
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java
@@ -0,0 +1,471 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.github;
+
+import org.apache.nifi.registry.flow.FlowRegistryException;
+import org.kohsuke.github.GHCommit;
+import org.kohsuke.github.GHContent;
+import org.kohsuke.github.GHContentUpdateResponse;
+import org.kohsuke.github.GHMyself;
+import org.kohsuke.github.GHPermissionType;
+import org.kohsuke.github.GHRef;
+import org.kohsuke.github.GHRepository;
+import org.kohsuke.github.GitHub;
+import org.kohsuke.github.GitHubBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * Client to encapsulate access to a GitHub Repository through the Hub4j GitHub client.
+ */
+public class GitHubRepositoryClient {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GitHubRepositoryClient.class);
+
+ private static final String BRANCH_REF_PATTERN = "refs/heads/%s";
+ private static final int COMMIT_PAGE_SIZE = 50;
+
+ private final String repoOwner;
+ private final String repoName;
+ private final String repoPath;
+
+ private final GitHub gitHub;
+ private final GHRepository repository;
+ private final GitHubAuthenticationType authenticationType;
+ private final boolean canRead;
+ private final boolean canWrite;
+
+ private GitHubRepositoryClient(final Builder builder) throws IOException, FlowRegistryException {
+ final String apiUrl = Objects.requireNonNull(builder.apiUrl, "API URL is required");
+ final GitHubBuilder gitHubBuilder = new GitHubBuilder().withEndpoint(apiUrl);
+
+ repoPath = builder.repoPath;
+ repoOwner = Objects.requireNonNull(builder.repoOwner, "Repository Owner is required");
+ repoName = Objects.requireNonNull(builder.repoName, "Repository Name is required");
+ authenticationType = Objects.requireNonNull(builder.authenticationType, "Authentication Type is required");
+
+ switch (authenticationType) {
+ case PERSONAL_ACCESS_TOKEN -> gitHubBuilder.withOAuthToken(builder.personalAccessToken);
+ case APP_INSTALLATION_TOKEN -> gitHubBuilder.withAppInstallationToken(builder.appInstallationToken);
+ }
+
+ gitHub = gitHubBuilder.build();
+
+ final String fullRepoName = repoOwner + "/" + repoName;
+ try {
+ repository = gitHub.getRepository(fullRepoName);
+ } catch (final FileNotFoundException fnf) {
+ throw new FlowRegistryException("Repository [" + fullRepoName + "] not found");
+ }
+
+ // if anonymous then we assume the client has read permissions, otherwise the call to getRepository above would have failed
+ // if not anonymous then we get the identity of the current user and then ask for the permissions the current user has on the repo
+ if (gitHub.isAnonymous()) {
+ canRead = true;
+ canWrite = false;
+ } else {
+ final GHMyself currentUser = gitHub.getMyself();
+ canRead = repository.hasPermission(currentUser, GHPermissionType.READ);
+ canWrite = repository.hasPermission(currentUser, GHPermissionType.WRITE);
+ }
+ }
+
+ /**
+ * @return the repo owner
+ */
+ public String getRepoOwner() {
+ return repoOwner;
+ }
+
+ /**
+ * @return the repo name
+ */
+ public String getRepoName() {
+ return repoName;
+ }
+
+ /**
+ * @return the authentication type this client is configured with
+ */
+ public GitHubAuthenticationType getAuthenticationType() {
+ return authenticationType;
+ }
+
+ /**
+ * @return true if the repository is readable by configured credentials
+ */
+ public boolean getCanRead() {
+ return canRead;
+ }
+
+ /**
+ * @return true if the repository is writable by the configured credentials
+ */
+ public boolean getCanWrite() {
+ return canWrite;
+ }
+
+ /**
+ * Creates the content specified by the given builder.
+ *
+ * @param request the request for the content to create
+ * @return the update response
+ *
+ * @throws IOException if an I/O error happens calling GitHub
+ * @throws FlowRegistryException if a non I/O error happens calling GitHub
+ */
+ public String createContent(final GitHubCreateContentRequest request) throws IOException, FlowRegistryException {
+ final String branch = request.getBranch();
+ final String resolvedPath = getResolvedPath(request.getPath());
+ LOGGER.debug("Creating content at path [{}] on branch [{}] in repo [{}] ", resolvedPath, branch, repository.getName());
+ return execute(() -> {
+ try {
+ final GHContentUpdateResponse response = repository.createContent()
+ .branch(branch)
+ .path(resolvedPath)
+ .content(request.getContent())
+ .message(request.getMessage())
+ .sha(request.getExistingContentSha())
+ .commit();
+ return response.getCommit().getSha();
+ } catch (final FileNotFoundException fnf) {
+ throwPathOrBranchNotFound(fnf, resolvedPath, branch);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Gets the names of all the branches in the repo.
+ *
+ * @return the set of all branches in the repo
+ *
+ * @throws IOException if an I/O error happens calling GitHub
+ * @throws FlowRegistryException if a non I/O error happens calling GitHub
+ */
+ public Set<String> getBranches() throws IOException, FlowRegistryException {
+ LOGGER.debug("Getting branches for repo [{}]", repository.getName());
+ return execute(() -> repository.getBranches().keySet());
+ }
+
+ /**
+ * Gets an InputStream to read the latest content of the given path from the given branch.
+ * The returned stream already contains the contents of the requested file.
+ *
+ * @param path the path to the content
+ * @param branch the branch
+ * @return an input stream containing the contents of the path
+ *
+ * @throws IOException if an I/O error happens calling GitHub
+ * @throws FlowRegistryException if a non I/O error happens calling GitHub
+ */
+ public InputStream getContentFromBranch(final String path, final String branch) throws IOException, FlowRegistryException {
+ final String resolvedPath = getResolvedPath(path);
+ final String branchRef = BRANCH_REF_PATTERN.formatted(branch);
+ LOGGER.debug("Getting content for [{}] from branch [{}] in repo [{}] ", resolvedPath, branch, repository.getName());
+
+ return execute(() -> {
+ try {
+ final GHContent ghContent = repository.getFileContent(resolvedPath, branchRef);
+ return ghContent.read();
+ } catch (final FileNotFoundException fnf) {
+ throwPathOrBranchNotFound(fnf, resolvedPath, branchRef);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Gets the content of the given path from the given commit.
+ * The returned stream already contains the contents of the requested file.
+ *
+ * @param path the path to the content
+ * @param commitSha the commit SHA
+ * @return an input stream containing the contents of the path
+ *
+ * @throws IOException if an I/O error happens calling GitHub
+ * @throws FlowRegistryException if a non I/O error happens calling GitHub
+ */
+ public InputStream getContentFromCommit(final String path, final String commitSha) throws IOException, FlowRegistryException {
+ final String resolvedPath = getResolvedPath(path);
+ LOGGER.debug("Getting content for [{}] from commit [{}] in repo [{}] ", resolvedPath, commitSha, repository.getName());
+
+ return execute(() -> {
+ try {
+ final GHContent ghContent = repository.getFileContent(resolvedPath, commitSha);
+ return ghContent.read();
+ } catch (final FileNotFoundException fnf) {
+ throw new FlowRegistryException("Path [" + resolvedPath + "] or Commit [" + commitSha + "] not found", fnf);
+ }
+ });
+ }
+
+ /**
+ * Gets the commits for a given path on a given branch.
+ *
+ * @param path the path
+ * @param branch the branch
+ * @return the list of commits for the given path
+ *
+ * @throws IOException if an I/O error happens calling GitHub
+ * @throws FlowRegistryException if a non I/O error happens calling GitHub
+ */
+ public List<GHCommit> getCommits(final String path, final String branch) throws IOException, FlowRegistryException {
+ final String resolvedPath = getResolvedPath(path);
+ final String branchRef = BRANCH_REF_PATTERN.formatted(branch);
+ LOGGER.debug("Getting commits for [{}] from branch [{}] in repo [{}]", resolvedPath, branch, repository.getName());
+
+ return execute(() -> {
+ try {
+ final GHRef branchGhRef = repository.getRef(branchRef);
+ return repository.queryCommits()
+ .path(resolvedPath)
+ .from(branchGhRef.getObject().getSha())
+ .pageSize(COMMIT_PAGE_SIZE)
+ .list()
+ .toList();
+ } catch (final FileNotFoundException fnf) {
+ throwPathOrBranchNotFound(fnf, resolvedPath, branchRef);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Gets the top-level directory names, which are the directories at the root of the repo, or within the prefix if specified.
+ *
+ * @param branch the branch
+ * @return the set of directory names
+ *
+ * @throws IOException if an I/O error happens calling GitHub
+ * @throws FlowRegistryException if a non I/O error happens calling GitHub
+ */
+ public Set<String> getTopLevelDirectoryNames(final String branch) throws IOException, FlowRegistryException {
+ return getDirectoryItems("", branch, GHContent::isDirectory);
+ }
+
+ /**
+ * Gets the names of the directories contained within the given directory.
+ *
+ * @param directory the directory to list
+ * @param branch the branch
+ * @return the set of directory names
+ *
+ * @throws IOException if an I/O error happens calling GitHub
+ * @throws FlowRegistryException if a non I/O error happens calling GitHub
+ */
+ public Set<String> getDirectoryNames(final String directory, final String branch) throws IOException, FlowRegistryException {
+ return getDirectoryItems(directory, branch, GHContent::isDirectory);
+ }
+
+ /**
+ * Gets the names of the directories container within the given directory.
+ *
+ * @param directory the directory to list
+ * @param branch the branch
+ * @return the set of file names
+ *
+ * @throws IOException if an I/O error happens calling GitHub
+ * @throws FlowRegistryException if a non I/O error happens calling GitHub
+ */
+ public Set<String> getFileNames(final String directory, final String branch) throws IOException, FlowRegistryException {
+ return getDirectoryItems(directory, branch, GHContent::isFile);
+ }
+
+ /**
+ * Get the names of the items in the given directory on the given branch, filtered by the provided filter.
+ *
+ * @param directory the directory
+ * @param branch the branch
+ * @param filter the filter to determine which items get included
+ * @return the set of item names
+ *
+ * @throws IOException if an I/O error happens calling GitHub
+ * @throws FlowRegistryException if a non I/O error happens calling GitHub
+ */
+ private Set<String> getDirectoryItems(final String directory, final String branch, final Predicate<GHContent> filter) throws IOException, FlowRegistryException {
+ final String resolvedDirectory = getResolvedPath(directory);
+ final String branchRef = BRANCH_REF_PATTERN.formatted(branch);
+ LOGGER.debug("Getting directory items for [{}] from branch [{}] in repo [{}] ", resolvedDirectory, branch, repository.getName());
+
+ return execute(() -> {
+ try {
+ return repository.getDirectoryContent(resolvedDirectory, branchRef).stream()
+ .filter(filter)
+ .map(GHContent::getName)
+ .collect(Collectors.toSet());
+ } catch (final FileNotFoundException fnf) {
+ throwPathOrBranchNotFound(fnf, resolvedDirectory, branchRef);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Gets the current SHA for the given path from the given branch.
+ *
+ * @param path the path to the content
+ * @param branch the branch
+ * @return current sha for the given file, or empty optional
+ *
+ * @throws IOException if an I/O error happens calling GitHub
+ */
+ public Optional<String> getContentSha(final String path, final String branch) throws IOException, FlowRegistryException {
+ final String resolvedPath = getResolvedPath(path);
+ final String branchRef = BRANCH_REF_PATTERN.formatted(branch);
+ LOGGER.debug("Getting content SHA for [{}] from branch [{}] in repo [{}] ", resolvedPath, branch, repository.getName());
+
+ return execute(() -> {
+ try {
+ final GHContent ghContent = repository.getFileContent(resolvedPath, branchRef);
+ return Optional.of(ghContent.getSha());
+ } catch (final FileNotFoundException e) {
+ LOGGER.warn("Unable to get content SHA for [{}] from branch [{}] because content does not exist", resolvedPath, branch);
+ return Optional.empty();
+ }
+ });
+ }
+
+ /**
+ * Deletes the contents for the given file on the given branch.
+ *
+ * @param filePath the file path to delete
+ * @param commitMessage the commit message for the delete commit
+ * @param branch the branch to delete from
+ * @return the deleted content
+ *
+ * @throws IOException if an I/O error happens calling GitHub
+ * @throws FlowRegistryException if a non I/O error happens calling GitHub
+ */
+ public GHContent deleteContent(final String filePath, final String commitMessage, final String branch) throws FlowRegistryException, IOException {
+ final String resolvedPath = getResolvedPath(filePath);
+ LOGGER.debug("Deleting file [{}] in repo [{}] on branch [{}]", resolvedPath, repository.getName(), branch);
+ return execute(() -> {
+ try {
+ GHContent ghContent = repository.getFileContent(resolvedPath);
+ ghContent.delete(commitMessage, branch);
+ return ghContent;
+ } catch (final FileNotFoundException fnf) {
+ throwPathOrBranchNotFound(fnf, resolvedPath, branch);
+ return null;
+ }
+ });
+ }
+
+ private String getResolvedPath(final String path) {
+ return repoPath == null ? path : repoPath + "/" + path;
+ }
+
+ private void throwPathOrBranchNotFound(final FileNotFoundException fileNotFoundException, final String path, final String branch) throws FlowRegistryException {
+ throw new FlowRegistryException("Path [" + path + "] or Branch [" + branch + "] not found", fileNotFoundException);
+ }
+
+ private <T> T execute(final GHRequest<T> action) throws FlowRegistryException, IOException {
+ try {
+ return action.execute();
+ } catch (final FlowRegistryException | IOException e) {
+ throw e;
+ } catch (final Exception e) {
+ throw new FlowRegistryException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Functional interface for making a request to GitHub which may throw IOException.
+ *
+ * @param <T> the result of the request
+ */
+ private interface GHRequest<T> {
+
+ T execute() throws IOException, FlowRegistryException;
+
+ }
+
+ /**
+ * @return a new builder
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for the repository client.
+ */
+ public static class Builder {
+
+ private String apiUrl;
+ private GitHubAuthenticationType authenticationType;
+ private String personalAccessToken;
+ private String appInstallationToken;
+ private String repoOwner;
+ private String repoName;
+ private String repoPath;
+
+ public Builder apiUrl(final String apiUrl) {
+ this.apiUrl = apiUrl;
+ return this;
+ }
+
+ public Builder authenticationType(final GitHubAuthenticationType authenticationType) {
+ this.authenticationType = authenticationType;
+ return this;
+ }
+
+ public Builder personalAccessToken(final String personalAccessToken) {
+ this.personalAccessToken = personalAccessToken;
+ return this;
+ }
+
+ public Builder appInstallationToken(final String appInstallationToken) {
+ this.appInstallationToken = appInstallationToken;
+ return this;
+ }
+
+ public Builder repoOwner(final String repoOwner) {
+ this.repoOwner = repoOwner;
+ return this;
+ }
+
+ public Builder repoName(final String repoName) {
+ this.repoName = repoName;
+ return this;
+ }
+
+ public Builder repoPath(final String repoPath) {
+ this.repoPath = repoPath;
+ return this;
+ }
+
+ public GitHubRepositoryClient build() throws IOException, FlowRegistryException {
+ return new GitHubRepositoryClient(this);
+ }
+
+ }
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/JacksonFlowSnapshotSerializer.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/JacksonFlowSnapshotSerializer.java
new file mode 100644
index 0000000..b21b9f7
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/JacksonFlowSnapshotSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.github;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntrospector;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Implementation of {@link FlowSnapshotSerializer} that is Jackson's ObjectMapper.
+ */
+public class JacksonFlowSnapshotSerializer implements FlowSnapshotSerializer {
+
+ private static final ObjectMapper OBJECT_MAPPER = JsonMapper.builder()
+ .serializationInclusion(JsonInclude.Include.NON_NULL)
+ .defaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL))
+ .annotationIntrospector(new JakartaXmlBindAnnotationIntrospector(TypeFactory.defaultInstance()))
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true)
+ .enable(SerializationFeature.INDENT_OUTPUT)
+ .addModule(new VersionedComponentModule())
+ .build();
+
+ @Override
+ public String serialize(final RegisteredFlowSnapshot flowSnapshot) throws IOException {
+ return OBJECT_MAPPER.writeValueAsString(flowSnapshot);
+ }
+
+ @Override
+ public RegisteredFlowSnapshot deserialize(final InputStream inputStream) throws IOException {
+ return OBJECT_MAPPER.readValue(inputStream, RegisteredFlowSnapshot.class);
+ }
+
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/VersionedComponentModule.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/VersionedComponentModule.java
new file mode 100644
index 0000000..813c6ba
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/VersionedComponentModule.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.github;
+
+import com.fasterxml.jackson.databind.BeanDescription;
+import com.fasterxml.jackson.databind.SerializationConfig;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.BeanPropertyWriter;
+import com.fasterxml.jackson.databind.ser.BeanSerializerModifier;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.VersionedComponent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Jackson Module to customize serialization of versioned component objects.
+ */
+public class VersionedComponentModule extends SimpleModule {
+
+ private static final Set<String> EXCLUDE_JSON_FIELDS = Set.of("instanceIdentifier", "instanceGroupId");
+
+ @Override
+ public void setupModule(final SetupContext context) {
+ super.setupModule(context);
+ context.addBeanSerializerModifier(new VersionedComponentBeanSerializerModifier());
+ }
+
+ private static class VersionedComponentBeanSerializerModifier extends BeanSerializerModifier {
+ @Override
+ public List<BeanPropertyWriter> changeProperties(final SerializationConfig config, final BeanDescription beanDesc, final List<BeanPropertyWriter> beanProperties) {
+ if (!VersionedComponent.class.isAssignableFrom(beanDesc.getBeanClass())
+ && !ConnectableComponent.class.isAssignableFrom(beanDesc.getBeanClass())) {
+ return super.changeProperties(config, beanDesc, beanProperties);
+ }
+
+ final List<BeanPropertyWriter> includedProperties = new ArrayList<>();
+ for (final BeanPropertyWriter property : beanProperties) {
+ if (!EXCLUDE_JSON_FIELDS.contains(property.getName())) {
+ includedProperties.add(property);
+ }
+ }
+ return includedProperties;
+ }
+ }
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowRegistryClient b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowRegistryClient
new file mode 100644
index 0000000..bd467f5
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowRegistryClient
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.github.GitHubFlowRegistryClient
\ No newline at end of file
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java
new file mode 100644
index 0000000..0b09a96
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.github;
+
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.registry.flow.FlowRegistryBucket;
+import org.apache.nifi.registry.flow.FlowRegistryClientConfigurationContext;
+import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext;
+import org.apache.nifi.registry.flow.FlowRegistryException;
+import org.apache.nifi.registry.flow.RegisterAction;
+import org.apache.nifi.registry.flow.RegisteredFlow;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class GitHubFlowRegistryClientTest {
+
+ static final String DEFAULT_REPO_PATH = "some-path";
+ static final String DEFAULT_REPO_BRANCH = "some-branch";
+
+ private GitHubRepositoryClient repositoryClient;
+ private FlowSnapshotSerializer flowSnapshotSerializer;
+ private GitHubFlowRegistryClient flowRegistryClient;
+ private FlowRegistryClientConfigurationContext clientConfigurationContext;
+ private ComponentLog componentLog;
+
+ @BeforeEach
+ public void setup() throws IOException, FlowRegistryException {
+ repositoryClient = mock(GitHubRepositoryClient.class);
+ flowSnapshotSerializer = mock(FlowSnapshotSerializer.class);
+ flowRegistryClient = new TestableGitHubRepositoryClient(repositoryClient, flowSnapshotSerializer);
+
+ clientConfigurationContext = mock(FlowRegistryClientConfigurationContext.class);
+ componentLog = mock(ComponentLog.class);
+
+ final FlowRegistryClientInitializationContext initializationContext = mock(FlowRegistryClientInitializationContext.class);
+ when(initializationContext.getLogger()).thenReturn(componentLog);
+ flowRegistryClient.initialize(initializationContext);
+
+ when(repositoryClient.getCanRead()).thenReturn(true);
+ when(repositoryClient.getCanWrite()).thenReturn(true);
+ when(repositoryClient.getTopLevelDirectoryNames(anyString())).thenReturn(Set.of("existing-bucket"));
+ }
+
+ @Test
+ public void testRegisterFlow() throws IOException, FlowRegistryException {
+ setupClientConfigurationContextWithDefaults();
+
+ final String serializedSnapshotContent = "placeholder";
+ when(flowSnapshotSerializer.serialize(any(RegisteredFlowSnapshot.class))).thenReturn(serializedSnapshotContent);
+
+ final RegisteredFlow incomingFlow = createIncomingRegisteredFlow();
+ final RegisteredFlow resultFlow = flowRegistryClient.registerFlow(clientConfigurationContext, incomingFlow);
+ assertEquals(incomingFlow.getIdentifier(), resultFlow.getIdentifier());
+ assertEquals(incomingFlow.getName(), resultFlow.getName());
+ assertEquals(incomingFlow.getBucketIdentifier(), resultFlow.getBucketIdentifier());
+ assertEquals(incomingFlow.getBucketName(), resultFlow.getBucketName());
+ assertEquals(incomingFlow.getBranch(), resultFlow.getBranch());
+
+ final ArgumentCaptor<GitHubCreateContentRequest> argumentCaptor = ArgumentCaptor.forClass(GitHubCreateContentRequest.class);
+ verify(repositoryClient).createContent(argumentCaptor.capture());
+
+ final GitHubCreateContentRequest capturedArgument = argumentCaptor.getValue();
+ assertEquals(incomingFlow.getBranch(), capturedArgument.getBranch());
+ assertEquals(GitHubFlowRegistryClient.SNAPSHOT_FILE_PATH_FORMAT.formatted(incomingFlow.getBucketIdentifier(), incomingFlow.getIdentifier()), capturedArgument.getPath());
+ assertEquals(serializedSnapshotContent, capturedArgument.getContent());
+ assertNull(capturedArgument.getExistingContentSha());
+ }
+
+ @Test
+ public void testRegisterFlowSnapshot() throws IOException, FlowRegistryException {
+ setupClientConfigurationContextWithDefaults();
+
+ final RegisteredFlow incomingFlow = createIncomingRegisteredFlow();
+
+ final RegisteredFlowSnapshotMetadata incomingMetadata = new RegisteredFlowSnapshotMetadata();
+ incomingMetadata.setBranch(incomingFlow.getBranch());
+ incomingMetadata.setBucketIdentifier(incomingFlow.getBucketIdentifier());
+ incomingMetadata.setFlowIdentifier(incomingFlow.getIdentifier());
+ incomingMetadata.setComments("Unit test");
+
+ final RegisteredFlowSnapshot incomingSnapshot = new RegisteredFlowSnapshot();
+ incomingSnapshot.setFlow(incomingFlow);
+ incomingSnapshot.setSnapshotMetadata(incomingMetadata);
+ incomingSnapshot.setFlowContents(new VersionedProcessGroup());
+
+ final String snapshotFilePath = GitHubFlowRegistryClient.SNAPSHOT_FILE_PATH_FORMAT.formatted(incomingFlow.getBucketIdentifier(), incomingFlow.getIdentifier());
+ when(repositoryClient.getContentFromBranch(snapshotFilePath, DEFAULT_REPO_BRANCH)).thenReturn(new ByteArrayInputStream(new byte[0]));
+ when(flowSnapshotSerializer.deserialize(any(InputStream.class))).thenReturn(incomingSnapshot);
+
+ final String serializedSnapshotContent = "placeholder";
+ when(flowSnapshotSerializer.serialize(any(RegisteredFlowSnapshot.class))).thenReturn(serializedSnapshotContent);
+
+ final String commitSha = "commitSha";
+ when(repositoryClient.createContent(any(GitHubCreateContentRequest.class))).thenReturn(commitSha);
+
+ final RegisteredFlowSnapshot resultSnapshot = flowRegistryClient.registerFlowSnapshot(clientConfigurationContext, incomingSnapshot, RegisterAction.COMMIT);
+ assertNotNull(resultSnapshot);
+
+ final RegisteredFlow resultFlow = resultSnapshot.getFlow();
+ assertNotNull(resultFlow);
+ assertEquals(incomingFlow.getIdentifier(), resultFlow.getIdentifier());
+ assertEquals(incomingFlow.getName(), resultFlow.getName());
+ assertEquals(incomingFlow.getBucketIdentifier(), resultFlow.getBucketIdentifier());
+ assertEquals(incomingFlow.getBucketName(), resultFlow.getBucketName());
+ assertEquals(incomingFlow.getBranch(), resultFlow.getBranch());
+
+ final RegisteredFlowSnapshotMetadata resultMetadata = resultSnapshot.getSnapshotMetadata();
+ assertNotNull(resultMetadata);
+ assertEquals(incomingMetadata.getBranch(), resultMetadata.getBranch());
+ assertEquals(incomingMetadata.getBucketIdentifier(), resultMetadata.getBucketIdentifier());
+ assertEquals(incomingMetadata.getFlowIdentifier(), resultMetadata.getFlowIdentifier());
+ assertEquals(incomingMetadata.getComments(), resultMetadata.getComments());
+
+ final FlowRegistryBucket resultBucket = resultSnapshot.getBucket();
+ assertNotNull(resultBucket);
+ assertEquals(incomingMetadata.getBucketIdentifier(), resultBucket.getIdentifier());
+ assertEquals(incomingMetadata.getBucketIdentifier(), resultBucket.getName());
+ }
+
+ private RegisteredFlow createIncomingRegisteredFlow() {
+ final RegisteredFlow incomingFlow = new RegisteredFlow();
+ incomingFlow.setIdentifier("Flow1");
+ incomingFlow.setName("Flow1");
+ incomingFlow.setBucketIdentifier("Bucket1");
+ incomingFlow.setBucketName("Bucket1");
+ incomingFlow.setBranch(DEFAULT_REPO_BRANCH);
+ return incomingFlow;
+ }
+
+ private void setupClientConfigurationContext(final String repoPath, final String branch) {
+ final PropertyValue repoPathPropertyValue = createMockPropertyValue(repoPath);
+ when(clientConfigurationContext.getProperty(GitHubFlowRegistryClient.REPOSITORY_PATH)).thenReturn(repoPathPropertyValue);
+
+ final PropertyValue branchPropertyValue = createMockPropertyValue(branch);
+ when(clientConfigurationContext.getProperty(GitHubFlowRegistryClient.REPOSITORY_BRANCH)).thenReturn(branchPropertyValue);
+ }
+
+ private void setupClientConfigurationContextWithDefaults() {
+ setupClientConfigurationContext(DEFAULT_REPO_PATH, DEFAULT_REPO_BRANCH);
+ }
+
+ private PropertyValue createMockPropertyValue(final String value) {
+ final PropertyValue propertyValue = mock(PropertyValue.class);
+ when(propertyValue.getValue()).thenReturn(value);
+ return propertyValue;
+ }
+
+ private static class TestableGitHubRepositoryClient extends GitHubFlowRegistryClient {
+ private final GitHubRepositoryClient repositoryClient;
+ private final FlowSnapshotSerializer flowSnapshotSerializer;
+
+ public TestableGitHubRepositoryClient(final GitHubRepositoryClient repositoryClient, final FlowSnapshotSerializer flowSnapshotSerializer) {
+ this.repositoryClient = repositoryClient;
+ this.flowSnapshotSerializer = flowSnapshotSerializer;
+ }
+
+ @Override
+ protected GitHubRepositoryClient createRepositoryClient(final FlowRegistryClientConfigurationContext context) throws IOException, FlowRegistryException {
+ return repositoryClient;
+ }
+
+ @Override
+ protected FlowSnapshotSerializer createFlowSnapshotSerializer(final FlowRegistryClientInitializationContext initializationContext) {
+ return flowSnapshotSerializer;
+ }
+ }
+
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-nar/pom.xml b/nifi-extension-bundles/nifi-github-bundle/nifi-github-nar/pom.xml
new file mode 100644
index 0000000..25b10f5
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-nar/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-github-bundle</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-github-nar</artifactId>
+ <packaging>nar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-github-extensions</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-standard-shared-nar</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+</project>
+
diff --git a/nifi-extension-bundles/nifi-github-bundle/pom.xml b/nifi-extension-bundles/nifi-github-bundle/pom.xml
new file mode 100644
index 0000000..d33cf43
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>nifi-standard-shared-bom</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../nifi-standard-shared-bundle/nifi-standard-shared-bom</relativePath>
+ </parent>
+
+ <artifactId>nifi-github-bundle</artifactId>
+ <packaging>pom</packaging>
+
+ <properties>
+ <github-api.version>1.321</github-api.version>
+ </properties>
+
+ <modules>
+ <module>nifi-github-extensions</module>
+ <module>nifi-github-nar</module>
+ </modules>
+</project>
diff --git a/nifi-extension-bundles/pom.xml b/nifi-extension-bundles/pom.xml
index 192bccb..c999eec 100755
--- a/nifi-extension-bundles/pom.xml
+++ b/nifi-extension-bundles/pom.xml
@@ -104,5 +104,6 @@
<module>nifi-jolt-bundle</module>
<module>nifi-questdb-bundle</module>
<module>nifi-protobuf-bundle</module>
+ <module>nifi-github-bundle</module>
</modules>
</project>
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java
index a099a15..62d6d0f 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java
@@ -462,6 +462,11 @@
}
@Override
+ public String generateFlowId(final String flowName) throws IOException, FlowRegistryException {
+ return node.generateFlowId(flowName);
+ }
+
+ @Override
public void setComponent(final LoggableComponent<FlowRegistryClient> component) {
node.setComponent(component);
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
index 0fbe4a3..4fed6f6 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
@@ -285,6 +285,11 @@
}
@Override
+ public String generateFlowId(final String flowName) throws IOException, FlowRegistryException {
+ return execute(() -> client.get().getComponent().generateFlowId(flowName));
+ }
+
+ @Override
public void setComponent(final LoggableComponent<FlowRegistryClient> component) {
client.set(component);
}
@@ -299,7 +304,7 @@
}
final List<String> validationProblems = validationResults.stream()
- .map(e -> e.getExplanation())
+ .map(ValidationResult::getExplanation)
.collect(Collectors.toList());
throw new FlowRegistryInvalidException(validationProblems);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java
index bc4b584..e58457e 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java
@@ -63,5 +63,7 @@
Set<RegisteredFlowSnapshotMetadata> getFlowVersions(FlowRegistryClientUserContext context, FlowLocation flowLocation) throws FlowRegistryException, IOException;
Optional<String> getLatestVersion(FlowRegistryClientUserContext context, FlowLocation flowLocation) throws FlowRegistryException, IOException;
+ String generateFlowId(String flowName) throws IOException, FlowRegistryException;
+
void setComponent(LoggableComponent<FlowRegistryClient> component);
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index c88dc88..0244136 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -394,7 +394,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -3276,7 +3275,7 @@
final FlowRegistryBranch defaultBranch = flowRegistryDAO.getDefaultBranchForUser(clientUserContext, registryClientId);
return flowRegistryDAO.getFlowVersionsForUser(clientUserContext, registryClientId, defaultBranch.getName(), bucketId, flowId).stream()
.map(md -> createVersionedFlowSnapshotMetadataEntity(registryClientId, md))
- .collect(Collectors.toSet());
+ .collect(Collectors.toCollection(LinkedHashSet::new));
}
@Override
@@ -5021,7 +5020,6 @@
final Map<String, ParameterProviderReference> parameterProviderReferences = new HashMap<>();
final Map<String, VersionedParameterContext> parameterContexts = createVersionedParameterContexts(processGroup, parameterProviderReferences);
- final String flowId = versionedFlowDto.getFlowId() == null ? UUID.randomUUID().toString() : versionedFlowDto.getFlowId();
final String registryId = requestEntity.getVersionedFlow().getRegistryId();
final String selectedBranch;
@@ -5040,7 +5038,7 @@
versionedFlow.setDescription(versionedFlowDto.getDescription());
versionedFlow.setLastModifiedTimestamp(versionedFlow.getCreatedTimestamp());
versionedFlow.setName(versionedFlowDto.getFlowName());
- versionedFlow.setIdentifier(flowId);
+ versionedFlow.setIdentifier(versionedFlowDto.getFlowId());
// Add the Versioned Flow and first snapshot to the Flow Registry
final RegisteredFlowSnapshot registeredSnapshot;
@@ -5069,7 +5067,7 @@
// then we need to capture the created versioned flow information as a partial successful result.
if (registerNewFlow) {
try {
- final FlowLocation flowLocation = new FlowLocation(selectedBranch, versionedFlowDto.getBucketId(), flowId);
+ final FlowLocation flowLocation = new FlowLocation(selectedBranch, versionedFlowDto.getBucketId(), registeredFlow.getIdentifier());
final FlowRegistryClientNode flowRegistryClientNode = flowRegistryDAO.getFlowRegistryClient(registryId);
flowRegistryClientNode.deregisterFlow(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()), flowLocation);
} catch (final IOException | FlowRegistryException e2) {
@@ -5357,6 +5355,8 @@
}
try {
+ final String generatedId = registry.generateFlowId(flow.getName());
+ flow.setIdentifier(generatedId);
return registry.registerFlow(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()), flow);
} catch (final IOException | FlowRegistryException e) {
throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index 352c976..a6d04a1 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -1247,7 +1247,7 @@
final RegisteredFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
final VersionControlInformationDTO versionControlInfo = new VersionControlInformationDTO();
- versionControlInfo.setBranch(requestVci.getBranch());
+ versionControlInfo.setBranch(metadata.getBranch());
versionControlInfo.setBucketId(metadata.getBucketIdentifier());
versionControlInfo.setBucketName(bucket.getName());
versionControlInfo.setFlowDescription(flow.getDescription());
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java
index 5f4dba8..430f902 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java
@@ -198,7 +198,12 @@
final FlowLocation flowLocation = new FlowLocation(branch, bucketId, flowId);
final Set<RegisteredFlowSnapshotMetadata> flowVersions = flowRegistry.getFlowVersions(context, flowLocation);
- final Set<RegisteredFlowSnapshotMetadata> sortedFlowVersions = new TreeSet<>(Comparator.comparingLong(RegisteredFlowSnapshotMetadata::getTimestamp));
+
+ // if somehow the timestamp of two versions is exactly the same, then we use version as a secondary comparison,
+ // otherwise one of the objects won't be added to the set since compareTo returns 0 indicating it already exists
+ final Set<RegisteredFlowSnapshotMetadata> sortedFlowVersions = new TreeSet<>(
+ Comparator.comparingLong(RegisteredFlowSnapshotMetadata::getTimestamp)
+ .thenComparing(RegisteredFlowSnapshotMetadata::getVersion));
sortedFlowVersions.addAll(flowVersions);
return sortedFlowVersions;
} catch (final IOException | FlowRegistryException ioe) {