NIFI-13105 Implemented FlowRegistryClient using GitHub for versioning

This closes #8765

Signed-off-by: David Handermann <exceptionfactory@apache.org>
diff --git a/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java b/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
index 749e5b8..4436c55 100644
--- a/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
+++ b/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 
 /**
  * <p>
@@ -146,7 +147,7 @@
     FlowRegistryBucket getBucket(FlowRegistryClientConfigurationContext context, BucketLocation bucketLocation) throws FlowRegistryException, IOException;
 
     /**
-     * Registers the given RegisteredFlow into the the Flow Registry.
+     * Registers the given RegisteredFlow into the Flow Registry.
      *
      * @param context Configuration context.
      * @param flow The RegisteredFlow to add to the Registry.
@@ -252,4 +253,15 @@
      * @throws IOException If there is issue with the communication between NiFi and the Flow Registry.
      */
     Optional<String> getLatestVersion(FlowRegistryClientConfigurationContext context, FlowLocation flowLocation) throws FlowRegistryException, IOException;
+
+    /**
+     * Generates the id for registering a flow.
+     *
+     * @param flowName the name of the flow
+     * @return the generated id
+     */
+    default String generateFlowId(final String flowName) {
+        return UUID.randomUUID().toString();
+    }
+
 }
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 00ee910..6501ad0 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -872,6 +872,12 @@
             <version>2.0.0-SNAPSHOT</version>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-github-nar</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
         <!-- AspectJ library needed by the Java Agent used for native library loading (see bootstrap.conf) -->
         <dependency>
             <groupId>org.aspectj</groupId>
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/pom.xml b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/pom.xml
new file mode 100644
index 0000000..be73f95
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/pom.xml
@@ -0,0 +1,43 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-github-bundle</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-github-extensions</artifactId>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-jakarta-xmlbind-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.kohsuke</groupId>
+            <artifactId>github-api</artifactId>
+            <version>${github-api.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/FlowSnapshotSerializer.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/FlowSnapshotSerializer.java
new file mode 100644
index 0000000..9f464e1
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/FlowSnapshotSerializer.java
@@ -0,0 +1,34 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.github;
+
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Serializer for flow snapshots.
+ */
+public interface FlowSnapshotSerializer {
+
+    String serialize(final RegisteredFlowSnapshot flowSnapshot) throws IOException;
+
+    RegisteredFlowSnapshot deserialize(final InputStream inputStream) throws IOException;
+
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubAuthenticationType.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubAuthenticationType.java
new file mode 100644
index 0000000..964c376
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubAuthenticationType.java
@@ -0,0 +1,29 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.github;
+
+/**
+ * Enumeration of authentication types for the GitHub client.
+ */
+public enum GitHubAuthenticationType {
+
+    NONE,
+    PERSONAL_ACCESS_TOKEN,
+    APP_INSTALLATION_TOKEN;
+
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubCreateContentRequest.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubCreateContentRequest.java
new file mode 100644
index 0000000..f1864eb
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubCreateContentRequest.java
@@ -0,0 +1,101 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.nifi.github;
+
+import java.util.Objects;
+
+public class GitHubCreateContentRequest {
+
+    private final String branch;
+    private final String path;
+    private final String content;
+    private final String message;
+    private final String existingContentSha;
+
+    private GitHubCreateContentRequest(final Builder builder) {
+        this.branch = Objects.requireNonNull(builder.branch);
+        this.path = Objects.requireNonNull(builder.path);
+        this.content = Objects.requireNonNull(builder.content);
+        this.message = Objects.requireNonNull(builder.message);
+        // Will be null for create, and populated for update
+        this.existingContentSha = builder.existingContentSha;
+    }
+
+    public String getBranch() {
+        return branch;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public String getContent() {
+        return content;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public String getExistingContentSha() {
+        return existingContentSha;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static final class Builder {
+        private String branch;
+        private String path;
+        private String content;
+        private String message;
+        private String existingContentSha;
+
+        public Builder branch(final String branch) {
+            this.branch = branch;
+            return this;
+        }
+
+        public Builder path(final String path) {
+            this.path = path;
+            return this;
+        }
+
+        public Builder content(final String content) {
+            this.content = content;
+            return this;
+        }
+
+        public Builder message(final String message) {
+            this.message = message;
+            return this;
+        }
+
+        public Builder existingContentSha(final String existingSha) {
+            this.existingContentSha = existingSha;
+            return this;
+        }
+
+        public GitHubCreateContentRequest build() {
+            return new GitHubCreateContentRequest(this);
+        }
+    }
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubFlowRegistryClient.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubFlowRegistryClient.java
new file mode 100644
index 0000000..34fce1d
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubFlowRegistryClient.java
@@ -0,0 +1,677 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.nifi.github;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.registry.flow.AbstractFlowRegistryClient;
+import org.apache.nifi.registry.flow.AuthorizationException;
+import org.apache.nifi.registry.flow.BucketLocation;
+import org.apache.nifi.registry.flow.FlowAlreadyExistsException;
+import org.apache.nifi.registry.flow.FlowLocation;
+import org.apache.nifi.registry.flow.FlowRegistryBranch;
+import org.apache.nifi.registry.flow.FlowRegistryBucket;
+import org.apache.nifi.registry.flow.FlowRegistryClientConfigurationContext;
+import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext;
+import org.apache.nifi.registry.flow.FlowRegistryException;
+import org.apache.nifi.registry.flow.FlowRegistryPermissions;
+import org.apache.nifi.registry.flow.FlowVersionLocation;
+import org.apache.nifi.registry.flow.RegisterAction;
+import org.apache.nifi.registry.flow.RegisteredFlow;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
+import org.kohsuke.github.GHCommit;
+import org.kohsuke.github.GHContent;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link org.apache.nifi.registry.flow.FlowRegistryClient} that uses GitHub for version controlling flows.
+ */
+public class GitHubFlowRegistryClient extends AbstractFlowRegistryClient {
+
+    static final PropertyDescriptor GITHUB_API_URL = new PropertyDescriptor.Builder()
+            .name("GitHub API URL")
+            .description("The URL of the GitHub API")
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .defaultValue("https://api.github.com/")
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor REPOSITORY_NAME = new PropertyDescriptor.Builder()
+            .name("Repository Name")
+            .description("The name of the repository")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor REPOSITORY_OWNER = new PropertyDescriptor.Builder()
+            .name("Repository Owner")
+            .description("The owner of the repository")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor REPOSITORY_BRANCH = new PropertyDescriptor.Builder()
+            .name("Default Branch")
+            .description("The default branch to use for this client")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue("main")
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor REPOSITORY_PATH = new PropertyDescriptor.Builder()
+            .name("Repository Path")
+            .description("The path with in the repository that this client will use to store all data. " +
+                    "If left blank, then the root of the repository will be used.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder()
+            .name("Authentication Type")
+            .description("The type of authentication to use for accessing GitHub")
+            .allowableValues(GitHubAuthenticationType.values())
+            .defaultValue(GitHubAuthenticationType.NONE.name())
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PERSONAL_ACCESS_TOKEN = new PropertyDescriptor.Builder()
+            .name("Personal Access Token")
+            .description("The personal access token to use for authentication")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .dependsOn(AUTHENTICATION_TYPE, GitHubAuthenticationType.PERSONAL_ACCESS_TOKEN.name())
+            .build();
+
+    static final PropertyDescriptor APP_INSTALLATION_TOKEN = new PropertyDescriptor.Builder()
+            .name("App Installation Token")
+            .description("The app installation token to use for authentication")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .dependsOn(AUTHENTICATION_TYPE, GitHubAuthenticationType.APP_INSTALLATION_TOKEN.name())
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
+            GITHUB_API_URL,
+            REPOSITORY_OWNER,
+            REPOSITORY_NAME,
+            REPOSITORY_BRANCH,
+            REPOSITORY_PATH,
+            AUTHENTICATION_TYPE,
+            PERSONAL_ACCESS_TOKEN,
+            APP_INSTALLATION_TOKEN
+    );
+
+    static final String DEFAULT_BUCKET_NAME = "default";
+    static final String DEFAULT_BUCKET_KEEP_FILE_PATH = DEFAULT_BUCKET_NAME + "/.keep";
+    static final String DEFAULT_BUCKET_KEEP_FILE_CONTENT = "Do Not Delete";
+    static final String DEFAULT_BUCKET_KEEP_FILE_MESSAGE = "Creating default bucket";
+
+    static final String REGISTER_FLOW_MESSAGE_PREFIX = "Registering Flow";
+    static final String REGISTER_FLOW_MESSAGE_FORMAT = REGISTER_FLOW_MESSAGE_PREFIX + " [%s]";
+    static final String DEREGISTER_FLOW_MESSAGE_FORMAT = "Deregistering Flow [%s]";
+    static final String DEFAULT_FLOW_SNAPSHOT_MESSAGE_FORMAT = "Saving Flow Snapshot %s";
+    static final String SNAPSHOT_FILE_EXTENSION = ".json";
+    static final String SNAPSHOT_FILE_PATH_FORMAT = "%s/%s" + SNAPSHOT_FILE_EXTENSION;
+    static final String FLOW_CONTENTS_GROUP_ID = "flow-contents-group";
+
+    static final String STORAGE_LOCATION_PREFIX = "git@github.com:";
+    static final String STORAGE_LOCATION_FORMAT = STORAGE_LOCATION_PREFIX + "%s/%s.git";
+
+    private volatile FlowSnapshotSerializer flowSnapshotSerializer;
+    private volatile GitHubRepositoryClient repositoryClient;
+    private final AtomicBoolean clientInitialized = new AtomicBoolean(false);
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        final String repoPath = validationContext.getProperty(REPOSITORY_PATH).getValue();
+        if (repoPath != null && (repoPath.startsWith("/") || repoPath.endsWith("/"))) {
+            results.add(new ValidationResult.Builder()
+                    .subject(REPOSITORY_PATH.getDisplayName())
+                    .valid(false)
+                    .explanation("Path can not start or end with /")
+                    .build());
+        }
+
+        return results;
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+        super.onPropertyModified(descriptor, oldValue, newValue);
+        synchronized (this) {
+            clientInitialized.set(false);
+        }
+    }
+
+    @Override
+    public void initialize(final FlowRegistryClientInitializationContext context) {
+        super.initialize(context);
+        flowSnapshotSerializer = createFlowSnapshotSerializer(context);
+    }
+
+    // protected to allow for overriding from tests
+    protected FlowSnapshotSerializer createFlowSnapshotSerializer(final FlowRegistryClientInitializationContext initializationContext) {
+        return new JacksonFlowSnapshotSerializer();
+    }
+
+    @Override
+    public boolean isStorageLocationApplicable(final FlowRegistryClientConfigurationContext context, final String location) {
+        return location != null && location.startsWith(STORAGE_LOCATION_PREFIX);
+    }
+
+    @Override
+    public boolean isBranchingSupported(final FlowRegistryClientConfigurationContext context) {
+        return true;
+    }
+
+    @Override
+    public Set<FlowRegistryBranch> getBranches(final FlowRegistryClientConfigurationContext context) throws FlowRegistryException, IOException {
+        final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+        verifyReadPermissions(repositoryClient);
+
+        return repositoryClient.getBranches().stream()
+                .map(branchName -> {
+                    final FlowRegistryBranch flowRegistryBranch = new FlowRegistryBranch();
+                    flowRegistryBranch.setName(branchName);
+                    return flowRegistryBranch;
+                }).collect(Collectors.toSet());
+    }
+
+    @Override
+    public FlowRegistryBranch getDefaultBranch(final FlowRegistryClientConfigurationContext context) {
+        final FlowRegistryBranch defaultBranch = new FlowRegistryBranch();
+        defaultBranch.setName(context.getProperty(REPOSITORY_BRANCH).getValue());
+        return defaultBranch;
+    }
+
+    @Override
+    public Set<FlowRegistryBucket> getBuckets(final FlowRegistryClientConfigurationContext context, final String branch) throws IOException, FlowRegistryException {
+        final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+        verifyReadPermissions(repositoryClient);
+
+        final Set<FlowRegistryBucket> buckets = repositoryClient.getTopLevelDirectoryNames(branch).stream()
+                .map(bucketName -> createFlowRegistryBucket(repositoryClient, bucketName))
+                .collect(Collectors.toSet());
+
+        // if the repository has no top-level directories, then return a default bucket entry, this won't exist in the repository until the first time a flow is saved to it
+        return buckets.isEmpty() ? Set.of(createFlowRegistryBucket(repositoryClient, DEFAULT_BUCKET_NAME)) : buckets;
+    }
+
+    @Override
+    public FlowRegistryBucket getBucket(final FlowRegistryClientConfigurationContext context, final BucketLocation bucketLocation) throws FlowRegistryException, IOException {
+        final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+        verifyReadPermissions(repositoryClient);
+        return createFlowRegistryBucket(repositoryClient, bucketLocation.getBucketId());
+    }
+
+    @Override
+    public RegisteredFlow registerFlow(final FlowRegistryClientConfigurationContext context, final RegisteredFlow flow) throws FlowRegistryException, IOException {
+        final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+        verifyWritePermissions(repositoryClient);
+
+        final String branch = flow.getBranch();
+        final FlowLocation flowLocation = new FlowLocation(branch, flow.getBucketIdentifier(), flow.getIdentifier());
+        final String filePath = getSnapshotFilePath(flowLocation);
+        final String commitMessage = REGISTER_FLOW_MESSAGE_FORMAT.formatted(flow.getIdentifier());
+
+        final Optional<String> existingFileSha = repositoryClient.getContentSha(filePath, branch);
+        if (existingFileSha.isPresent()) {
+            throw new FlowAlreadyExistsException("Another flow is already registered at [" + filePath + "] on branch [" + branch + "]");
+        }
+
+        // Clear values we don't want in the json stored in GitHub
+        final String originalBucketId = flow.getBucketIdentifier();
+        flow.setBucketIdentifier(null);
+        flow.setBucketName(null);
+        flow.setBranch(null);
+
+        final RegisteredFlowSnapshot flowSnapshot = new RegisteredFlowSnapshot();
+        flowSnapshot.setFlow(flow);
+
+        final GitHubCreateContentRequest request = GitHubCreateContentRequest.builder()
+                .branch(branch)
+                .path(filePath)
+                .content(flowSnapshotSerializer.serialize(flowSnapshot))
+                .message(commitMessage)
+                .build();
+
+        repositoryClient.createContent(request);
+
+        // Re-populate fields before returning
+        flow.setBucketName(originalBucketId);
+        flow.setBucketIdentifier(originalBucketId);
+        flow.setBranch(branch);
+
+        return flow;
+    }
+
+    @Override
+    public RegisteredFlow deregisterFlow(final FlowRegistryClientConfigurationContext context, final FlowLocation flowLocation) throws FlowRegistryException, IOException {
+        final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+        verifyWritePermissions(repositoryClient);
+
+        final String branch = flowLocation.getBranch();
+        final String filePath = getSnapshotFilePath(flowLocation);
+        final String commitMessage = DEREGISTER_FLOW_MESSAGE_FORMAT.formatted(flowLocation.getFlowId());
+        final GHContent deletedSnapshotContent = repositoryClient.deleteContent(filePath, commitMessage, branch);
+
+        final RegisteredFlowSnapshot deletedSnapshot = getSnapshot(deletedSnapshotContent.read());
+        updateBucketReferences(repositoryClient, deletedSnapshot, flowLocation.getBucketId());
+        return deletedSnapshot.getFlow();
+    }
+
+    @Override
+    public RegisteredFlow getFlow(final FlowRegistryClientConfigurationContext context, final FlowLocation flowLocation) throws FlowRegistryException, IOException {
+        final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+        verifyReadPermissions(repositoryClient);
+
+        final String branch = flowLocation.getBranch();
+        final String filePath = getSnapshotFilePath(flowLocation);
+
+        final RegisteredFlowSnapshot existingSnapshot = getSnapshot(filePath, branch);
+        populateFlowAndSnapshotMetadata(existingSnapshot, flowLocation);
+        updateBucketReferences(repositoryClient, existingSnapshot, flowLocation.getBucketId());
+
+        final RegisteredFlow registeredFlow = existingSnapshot.getFlow();
+        registeredFlow.setBranch(branch);
+        return registeredFlow;
+    }
+
+    @Override
+    public Set<RegisteredFlow> getFlows(final FlowRegistryClientConfigurationContext context, final BucketLocation bucketLocation) throws IOException, FlowRegistryException {
+        final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+        verifyReadPermissions(repositoryClient);
+
+        final String branch = bucketLocation.getBranch();
+        final String bucketId = bucketLocation.getBucketId();
+
+        return repositoryClient.getFileNames(bucketId, branch).stream()
+                .filter(filename -> filename.endsWith(SNAPSHOT_FILE_EXTENSION))
+                .map(filename -> mapToRegisteredFlow(bucketLocation, filename))
+                .collect(Collectors.toSet());
+    }
+
+    @Override
+    public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final FlowVersionLocation flowVersionLocation)
+            throws FlowRegistryException, IOException {
+        final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+        verifyReadPermissions(repositoryClient);
+
+        final String version = flowVersionLocation.getVersion();
+        final String filePath = getSnapshotFilePath(flowVersionLocation);
+
+        final InputStream inputStream = repositoryClient.getContentFromCommit(filePath, version);
+        final RegisteredFlowSnapshot flowSnapshot = getSnapshot(inputStream);
+        populateFlowAndSnapshotMetadata(flowSnapshot, flowVersionLocation);
+
+        // populate values that aren't store in GitHub
+        flowSnapshot.getSnapshotMetadata().setVersion(version);
+        flowSnapshot.getSnapshotMetadata().setBranch(flowVersionLocation.getBranch());
+        flowSnapshot.getFlow().setBranch(flowVersionLocation.getBranch());
+
+        // populate outgoing bucket references
+        updateBucketReferences(repositoryClient, flowSnapshot, flowVersionLocation.getBucketId());
+
+        // determine if the version is the "latest" version by comparing to the response of getLatestVersion
+        final String latestVersion = getLatestVersion(context, flowVersionLocation).orElse(null);
+        flowSnapshot.setLatest(version.equals(latestVersion));
+
+        return flowSnapshot;
+    }
+
+    @Override
+    public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot, final RegisterAction action)
+            throws FlowRegistryException, IOException {
+        final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+        verifyWritePermissions(repositoryClient);
+
+        final RegisteredFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
+        final String branch = snapshotMetadata.getBranch();
+        final FlowLocation flowLocation = new FlowLocation(snapshotMetadata.getBranch(), snapshotMetadata.getBucketIdentifier(), snapshotMetadata.getFlowIdentifier());
+        final String filePath = getSnapshotFilePath(flowLocation);
+        final String previousSha = repositoryClient.getContentSha(filePath, branch).orElse(null);
+
+        final String snapshotComments = snapshotMetadata.getComments();
+        final String commitMessage = StringUtils.isBlank(snapshotComments) ? DEFAULT_FLOW_SNAPSHOT_MESSAGE_FORMAT.formatted(flowLocation.getFlowId()) : snapshotComments;
+
+        final RegisteredFlowSnapshot existingSnapshot = getSnapshot(filePath, branch);
+        populateFlowAndSnapshotMetadata(existingSnapshot, flowLocation);
+
+        final RegisteredFlow existingFlow = existingSnapshot.getFlow();
+        existingFlow.setBranch(null);
+        flowSnapshot.setFlow(existingFlow);
+
+        // Clear values we don't want stored in the json in GitHub
+        flowSnapshot.setBucket(null);
+        flowSnapshot.getSnapshotMetadata().setBucketIdentifier(null);
+        flowSnapshot.getSnapshotMetadata().setBranch(null);
+        flowSnapshot.getSnapshotMetadata().setVersion(null);
+        flowSnapshot.getSnapshotMetadata().setComments(null);
+        flowSnapshot.getSnapshotMetadata().setTimestamp(0);
+
+        // replace the id of the top level group and all of its references with a constant value prior to serializing to avoid
+        // unnecessary diffs when different instances of the same flow are imported and have different top-level PG ids
+        final String originalFlowContentsGroupId = replaceGroupId(flowSnapshot.getFlowContents(), FLOW_CONTENTS_GROUP_ID);
+        final Position originalFlowContentsPosition = replacePosition(flowSnapshot.getFlowContents(), new Position(0, 0));
+
+        final GitHubCreateContentRequest createContentRequest = GitHubCreateContentRequest.builder()
+                .branch(branch)
+                .path(filePath)
+                .content(flowSnapshotSerializer.serialize(flowSnapshot))
+                .message(commitMessage)
+                .existingContentSha(previousSha)
+                .build();
+
+        final String createContentCommitSha = repositoryClient.createContent(createContentRequest);
+
+        final VersionedFlowCoordinates versionedFlowCoordinates = new VersionedFlowCoordinates();
+        versionedFlowCoordinates.setRegistryId(getIdentifier());
+        versionedFlowCoordinates.setBranch(flowLocation.getBranch());
+        versionedFlowCoordinates.setBucketId(flowLocation.getBucketId());
+        versionedFlowCoordinates.setFlowId(flowLocation.getFlowId());
+        versionedFlowCoordinates.setVersion(createContentCommitSha);
+        versionedFlowCoordinates.setStorageLocation(getStorageLocation(repositoryClient));
+
+        flowSnapshot.getFlowContents().setVersionedFlowCoordinates(versionedFlowCoordinates);
+        flowSnapshot.getFlow().setBranch(branch);
+        flowSnapshot.getSnapshotMetadata().setBranch(branch);
+        flowSnapshot.getSnapshotMetadata().setVersion(createContentCommitSha);
+        flowSnapshot.setLatest(true);
+
+        // populate outgoing bucket references
+        updateBucketReferences(repositoryClient, flowSnapshot, flowLocation.getBucketId());
+
+        // set back to the original id so that the returned snapshot is has the correct values from what was passed in
+        replaceGroupId(flowSnapshot.getFlowContents(), originalFlowContentsGroupId);
+        replacePosition(flowSnapshot.getFlowContents(), originalFlowContentsPosition);
+
+        return flowSnapshot;
+    }
+
+    @Override
+    public Set<RegisteredFlowSnapshotMetadata> getFlowVersions(final FlowRegistryClientConfigurationContext context, final FlowLocation flowLocation)
+            throws FlowRegistryException, IOException {
+        final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+        verifyReadPermissions(repositoryClient);
+
+        final String branch = flowLocation.getBranch();
+        final String filePath = getSnapshotFilePath(flowLocation);
+
+        final Set<RegisteredFlowSnapshotMetadata> snapshotMetadataSet = new LinkedHashSet<>();
+        for (final GHCommit ghCommit : repositoryClient.getCommits(filePath, branch)) {
+            final RegisteredFlowSnapshotMetadata snapshotMetadata = createSnapshotMetadata(ghCommit, flowLocation);
+            if (snapshotMetadata.getComments() != null && snapshotMetadata.getComments().startsWith(REGISTER_FLOW_MESSAGE_PREFIX)) {
+                continue;
+            }
+            snapshotMetadataSet.add(snapshotMetadata);
+        }
+        return snapshotMetadataSet;
+    }
+
+    @Override
+    public Optional<String> getLatestVersion(final FlowRegistryClientConfigurationContext context, final FlowLocation flowLocation) throws FlowRegistryException, IOException {
+        final GitHubRepositoryClient repositoryClient = getRepositoryClient(context);
+        verifyReadPermissions(repositoryClient);
+
+        final String branch = flowLocation.getBranch();
+        final String filePath = getSnapshotFilePath(flowLocation);
+
+        final List<GHCommit> commits = repositoryClient.getCommits(filePath, branch);
+        final String latestVersion = commits.isEmpty() ? null : commits.getFirst().getSHA1();
+        return Optional.ofNullable(latestVersion);
+    }
+
+    @Override
+    public String generateFlowId(final String flowName) {
+        return flowName.trim()
+                .replaceAll("\\s", "-") // replace whitespace with -
+                .replaceAll("[^a-zA-Z0-9-]", "") // replace all other invalid chars with empty string
+                .replaceAll("(-)\\1+", "$1"); // replace consecutive - with single -
+    }
+
+    private FlowRegistryBucket createFlowRegistryBucket(final GitHubRepositoryClient repositoryClient, final String name) {
+        final FlowRegistryPermissions bucketPermissions = new FlowRegistryPermissions();
+        bucketPermissions.setCanRead(repositoryClient.getCanRead());
+        bucketPermissions.setCanWrite(repositoryClient.getCanWrite());
+        bucketPermissions.setCanDelete(repositoryClient.getCanWrite());
+
+        final FlowRegistryBucket bucket = new FlowRegistryBucket();
+        bucket.setIdentifier(name);
+        bucket.setName(name);
+        bucket.setPermissions(bucketPermissions);
+        return bucket;
+    }
+
+    private RegisteredFlowSnapshotMetadata createSnapshotMetadata(final GHCommit ghCommit, final FlowLocation flowLocation) throws IOException {
+        final GHCommit.ShortInfo shortInfo = ghCommit.getCommitShortInfo();
+
+        final RegisteredFlowSnapshotMetadata snapshotMetadata = new RegisteredFlowSnapshotMetadata();
+        snapshotMetadata.setBranch(flowLocation.getBranch());
+        snapshotMetadata.setBucketIdentifier(flowLocation.getBucketId());
+        snapshotMetadata.setFlowIdentifier(flowLocation.getFlowId());
+        snapshotMetadata.setVersion(ghCommit.getSHA1());
+        snapshotMetadata.setAuthor(ghCommit.getAuthor().getLogin());
+        snapshotMetadata.setComments(shortInfo.getMessage());
+        snapshotMetadata.setTimestamp(shortInfo.getCommitDate().getTime());
+        return snapshotMetadata;
+    }
+
+    private RegisteredFlow mapToRegisteredFlow(final BucketLocation bucketLocation, final String filename) {
+        final String branch = bucketLocation.getBranch();
+        final String bucketId = bucketLocation.getBucketId();
+        final String flowId = filename.replace(SNAPSHOT_FILE_EXTENSION, "");
+
+        final RegisteredFlow registeredFlow = new RegisteredFlow();
+        registeredFlow.setIdentifier(flowId);
+        registeredFlow.setName(flowId);
+        registeredFlow.setBranch(branch);
+        registeredFlow.setBucketIdentifier(bucketId);
+        registeredFlow.setBucketName(bucketId);
+        return registeredFlow;
+    }
+
+    private String getSnapshotFilePath(final FlowLocation flowLocation) {
+        return SNAPSHOT_FILE_PATH_FORMAT.formatted(flowLocation.getBucketId(), flowLocation.getFlowId());
+    }
+
+    private RegisteredFlowSnapshot getSnapshot(final String filePath, final String branch) throws IOException, FlowRegistryException {
+        try (final InputStream contentInputStream = repositoryClient.getContentFromBranch(filePath, branch)) {
+            return flowSnapshotSerializer.deserialize(contentInputStream);
+        }
+    }
+
+    private RegisteredFlowSnapshot getSnapshot(final InputStream inputStream) throws IOException {
+        try {
+            return flowSnapshotSerializer.deserialize(inputStream);
+        } finally {
+            IOUtils.closeQuietly(inputStream);
+        }
+    }
+
+    private Position replacePosition(final VersionedProcessGroup group, final Position newPosition) {
+        final Position originalPosition = group.getPosition();
+        group.setPosition(newPosition);
+        return originalPosition;
+    }
+
+    private String replaceGroupId(final VersionedProcessGroup group, final String newGroupId) {
+        final String originalGroupId = group.getIdentifier();
+        group.setIdentifier(newGroupId);
+
+        replaceGroupId(group.getProcessGroups(), newGroupId);
+        replaceGroupId(group.getRemoteProcessGroups(), newGroupId);
+        replaceGroupId(group.getProcessors(), newGroupId);
+        replaceGroupId(group.getFunnels(), newGroupId);
+        replaceGroupId(group.getLabels(), newGroupId);
+        replaceGroupId(group.getInputPorts(), newGroupId);
+        replaceGroupId(group.getOutputPorts(), newGroupId);
+        replaceGroupId(group.getControllerServices(), newGroupId);
+        replaceGroupId(group.getConnections(), newGroupId);
+
+        if (group.getConnections() != null) {
+            for (final VersionedConnection connection : group.getConnections()) {
+                replaceGroupId(connection.getSource(), originalGroupId, newGroupId);
+                replaceGroupId(connection.getDestination(), originalGroupId, newGroupId);
+            }
+        }
+
+        return originalGroupId;
+    }
+
+    private <T extends VersionedComponent> void replaceGroupId(final Collection<T> components, final String newGroupIdentifier) {
+        if (components == null) {
+            return;
+        }
+        components.forEach(c -> c.setGroupIdentifier(newGroupIdentifier));
+    }
+
+    private void replaceGroupId(final ConnectableComponent connectableComponent, final String originalGroupId, final String newGroupId) {
+        if (connectableComponent == null) {
+            return;
+        }
+
+        if (originalGroupId.equals(connectableComponent.getGroupId())) {
+            connectableComponent.setGroupId(newGroupId);
+        }
+    }
+
+    private void updateBucketReferences(final GitHubRepositoryClient repositoryClient, final RegisteredFlowSnapshot flowSnapshot, final String bucketId) {
+        final FlowRegistryBucket bucket = createFlowRegistryBucket(repositoryClient, bucketId);
+        flowSnapshot.setBucket(bucket);
+
+        final RegisteredFlow flow = flowSnapshot.getFlow();
+        flow.setBucketName(bucketId);
+        flow.setBucketIdentifier(bucketId);
+
+        final RegisteredFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
+        snapshotMetadata.setBucketIdentifier(bucketId);
+    }
+
+    // Ensures the snapshot has non-null flow and metadata fields, which would only be null if taking a flow from "Download Flow Definition" and adding directly to GitHub
+    private void populateFlowAndSnapshotMetadata(final RegisteredFlowSnapshot flowSnapshot, final FlowLocation flowLocation) {
+        if (flowSnapshot.getFlow() == null) {
+            final RegisteredFlow registeredFlow = new RegisteredFlow();
+            registeredFlow.setName(flowLocation.getFlowId());
+            registeredFlow.setIdentifier(flowLocation.getFlowId());
+            flowSnapshot.setFlow(registeredFlow);
+        }
+        if (flowSnapshot.getSnapshotMetadata() == null) {
+            final RegisteredFlowSnapshotMetadata snapshotMetadata = new RegisteredFlowSnapshotMetadata();
+            snapshotMetadata.setFlowIdentifier(flowLocation.getFlowId());
+            flowSnapshot.setSnapshotMetadata(snapshotMetadata);
+        }
+    }
+
+    private String getStorageLocation(final GitHubRepositoryClient repositoryClient) {
+        return STORAGE_LOCATION_FORMAT.formatted(repositoryClient.getRepoOwner(), repositoryClient.getRepoName());
+    }
+
+    private void verifyWritePermissions(final GitHubRepositoryClient repositoryClient) throws AuthorizationException {
+        if (!repositoryClient.getCanWrite()) {
+            throw new AuthorizationException("Client does not have write access to the GitHub repository");
+        }
+    }
+
+    private void verifyReadPermissions(final GitHubRepositoryClient repositoryClient) throws AuthorizationException {
+        if (!repositoryClient.getCanRead()) {
+            throw new AuthorizationException("Client does not have read access to the GitHub repository");
+        }
+    }
+
+    private synchronized GitHubRepositoryClient getRepositoryClient(final FlowRegistryClientConfigurationContext context) throws IOException, FlowRegistryException {
+        if (!clientInitialized.get()) {
+            getLogger().info("Initializing GitHub repository client");
+            repositoryClient = createRepositoryClient(context);
+            clientInitialized.set(true);
+            initializeDefaultBucket(context);
+        }
+        return repositoryClient;
+    }
+
+    // protected so can be overridden during tests
+    protected GitHubRepositoryClient createRepositoryClient(final FlowRegistryClientConfigurationContext context) throws IOException, FlowRegistryException {
+        return GitHubRepositoryClient.builder()
+                .apiUrl(context.getProperty(GITHUB_API_URL).getValue())
+                .authenticationType(GitHubAuthenticationType.valueOf(context.getProperty(AUTHENTICATION_TYPE).getValue()))
+                .personalAccessToken(context.getProperty(PERSONAL_ACCESS_TOKEN).getValue())
+                .appInstallationToken(context.getProperty(APP_INSTALLATION_TOKEN).getValue())
+                .repoOwner(context.getProperty(REPOSITORY_OWNER).getValue())
+                .repoName(context.getProperty(REPOSITORY_NAME).getValue())
+                .repoPath(context.getProperty(REPOSITORY_PATH).getValue())
+                .build();
+    }
+
+    // If the client has write permissions to the repo, then ensure the directory for the default bucket is present and if not create it,
+    // otherwise the client can only be used to import flows from the repo and won't be able to set up the default bucket
+    private void initializeDefaultBucket(final FlowRegistryClientConfigurationContext context) throws IOException, FlowRegistryException {
+        if (!repositoryClient.getCanWrite()) {
+            getLogger().info("GitHub repository client does not have write permissions to the repository, skipping setup of default bucket");
+            return;
+        }
+
+        final String branch = context.getProperty(REPOSITORY_BRANCH).getValue();
+        final Set<String> bucketDirectoryNames = repositoryClient.getTopLevelDirectoryNames(branch);
+        if (!bucketDirectoryNames.isEmpty()) {
+            getLogger().info("Found existing buckets, skipping setup of default bucket");
+            return;
+        }
+
+        getLogger().info("Creating default bucket in repo [{}/{}] on branch [{}]", repositoryClient.getRepoOwner(), repositoryClient.getRepoName(), branch);
+
+        repositoryClient.createContent(
+                GitHubCreateContentRequest.builder()
+                        .branch(branch)
+                        .path(DEFAULT_BUCKET_KEEP_FILE_PATH)
+                        .content(DEFAULT_BUCKET_KEEP_FILE_CONTENT)
+                        .message(DEFAULT_BUCKET_KEEP_FILE_MESSAGE)
+                        .build()
+        );
+    }
+
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java
new file mode 100644
index 0000000..fc85db0
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java
@@ -0,0 +1,471 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.nifi.github;
+
+import org.apache.nifi.registry.flow.FlowRegistryException;
+import org.kohsuke.github.GHCommit;
+import org.kohsuke.github.GHContent;
+import org.kohsuke.github.GHContentUpdateResponse;
+import org.kohsuke.github.GHMyself;
+import org.kohsuke.github.GHPermissionType;
+import org.kohsuke.github.GHRef;
+import org.kohsuke.github.GHRepository;
+import org.kohsuke.github.GitHub;
+import org.kohsuke.github.GitHubBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * Client to encapsulate access to a GitHub Repository through the Hub4j GitHub client.
+ */
+public class GitHubRepositoryClient {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(GitHubRepositoryClient.class);
+
+    private static final String BRANCH_REF_PATTERN = "refs/heads/%s";
+    private static final int COMMIT_PAGE_SIZE = 50;
+
+    private final String repoOwner;
+    private final String repoName;
+    private final String repoPath;
+
+    private final GitHub gitHub;
+    private final GHRepository repository;
+    private final GitHubAuthenticationType authenticationType;
+    private final boolean canRead;
+    private final boolean canWrite;
+
+    private GitHubRepositoryClient(final Builder builder) throws IOException, FlowRegistryException {
+        final String apiUrl = Objects.requireNonNull(builder.apiUrl, "API URL is required");
+        final GitHubBuilder gitHubBuilder = new GitHubBuilder().withEndpoint(apiUrl);
+
+        repoPath = builder.repoPath;
+        repoOwner = Objects.requireNonNull(builder.repoOwner, "Repository Owner is required");
+        repoName = Objects.requireNonNull(builder.repoName, "Repository Name is required");
+        authenticationType = Objects.requireNonNull(builder.authenticationType, "Authentication Type is required");
+
+        switch (authenticationType) {
+            case PERSONAL_ACCESS_TOKEN -> gitHubBuilder.withOAuthToken(builder.personalAccessToken);
+            case APP_INSTALLATION_TOKEN -> gitHubBuilder.withAppInstallationToken(builder.appInstallationToken);
+        }
+
+        gitHub = gitHubBuilder.build();
+
+        final String fullRepoName = repoOwner + "/" + repoName;
+        try {
+            repository = gitHub.getRepository(fullRepoName);
+        } catch (final FileNotFoundException fnf) {
+            throw new FlowRegistryException("Repository [" + fullRepoName + "] not found");
+        }
+
+        // if anonymous then we assume the client has read permissions, otherwise the call to getRepository above would have failed
+        // if not anonymous then we get the identity of the current user and then ask for the permissions the current user has on the repo
+        if (gitHub.isAnonymous()) {
+            canRead = true;
+            canWrite = false;
+        } else {
+            final GHMyself currentUser = gitHub.getMyself();
+            canRead = repository.hasPermission(currentUser, GHPermissionType.READ);
+            canWrite = repository.hasPermission(currentUser, GHPermissionType.WRITE);
+        }
+    }
+
+    /**
+     * @return the repo owner
+     */
+    public String getRepoOwner() {
+        return repoOwner;
+    }
+
+    /**
+     * @return the repo name
+     */
+    public String getRepoName() {
+        return repoName;
+    }
+
+    /**
+     * @return the authentication type this client is configured with
+     */
+    public GitHubAuthenticationType getAuthenticationType() {
+        return authenticationType;
+    }
+
+    /**
+     * @return true if the repository is readable by configured credentials
+     */
+    public boolean getCanRead() {
+        return canRead;
+    }
+
+    /**
+     * @return true if the repository is writable by the configured credentials
+     */
+    public boolean getCanWrite() {
+        return canWrite;
+    }
+
+    /**
+     * Creates the content specified by the given builder.
+     *
+     * @param request the request for the content to create
+     * @return the update response
+     *
+     * @throws IOException if an I/O error happens calling GitHub
+     * @throws FlowRegistryException if a non I/O error happens calling GitHub
+     */
+    public String createContent(final GitHubCreateContentRequest request) throws IOException, FlowRegistryException {
+        final String branch = request.getBranch();
+        final String resolvedPath = getResolvedPath(request.getPath());
+        LOGGER.debug("Creating content at path [{}] on branch [{}] in repo [{}] ", resolvedPath, branch, repository.getName());
+        return execute(() -> {
+            try {
+                final GHContentUpdateResponse response = repository.createContent()
+                        .branch(branch)
+                        .path(resolvedPath)
+                        .content(request.getContent())
+                        .message(request.getMessage())
+                        .sha(request.getExistingContentSha())
+                        .commit();
+                return response.getCommit().getSha();
+            } catch (final FileNotFoundException fnf) {
+                throwPathOrBranchNotFound(fnf, resolvedPath, branch);
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Gets the names of all the branches in the repo.
+     *
+     * @return the set of all branches in the repo
+     *
+     * @throws IOException if an I/O error happens calling GitHub
+     * @throws FlowRegistryException if a non I/O error happens calling GitHub
+     */
+    public Set<String> getBranches() throws IOException, FlowRegistryException {
+        LOGGER.debug("Getting branches for repo [{}]", repository.getName());
+        return execute(() -> repository.getBranches().keySet());
+    }
+
+    /**
+     * Gets an InputStream to read the latest content of the given path from the given branch.
+     * The returned stream already contains the contents of the requested file.
+     *
+     * @param path the path to the content
+     * @param branch the branch
+     * @return an input stream containing the contents of the path
+     *
+     * @throws IOException if an I/O error happens calling GitHub
+     * @throws FlowRegistryException if a non I/O error happens calling GitHub
+     */
+    public InputStream getContentFromBranch(final String path, final String branch) throws IOException, FlowRegistryException {
+        final String resolvedPath = getResolvedPath(path);
+        final String branchRef = BRANCH_REF_PATTERN.formatted(branch);
+        LOGGER.debug("Getting content for [{}] from branch [{}] in repo [{}] ", resolvedPath, branch, repository.getName());
+
+        return execute(() -> {
+            try {
+                final GHContent ghContent = repository.getFileContent(resolvedPath, branchRef);
+                return ghContent.read();
+            } catch (final FileNotFoundException fnf) {
+                throwPathOrBranchNotFound(fnf, resolvedPath, branchRef);
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Gets the content of the given path from the given commit.
+     * The returned stream already contains the contents of the requested file.
+     *
+     * @param path the path to the content
+     * @param commitSha the commit SHA
+     * @return an input stream containing the contents of the path
+     *
+     * @throws IOException if an I/O error happens calling GitHub
+     * @throws FlowRegistryException if a non I/O error happens calling GitHub
+     */
+    public InputStream getContentFromCommit(final String path, final String commitSha) throws IOException, FlowRegistryException {
+        final String resolvedPath = getResolvedPath(path);
+        LOGGER.debug("Getting content for [{}] from commit [{}] in repo [{}] ", resolvedPath, commitSha, repository.getName());
+
+        return execute(() -> {
+            try {
+                final GHContent ghContent = repository.getFileContent(resolvedPath, commitSha);
+                return ghContent.read();
+            } catch (final FileNotFoundException fnf) {
+                throw new FlowRegistryException("Path [" + resolvedPath + "] or Commit [" + commitSha + "] not found", fnf);
+            }
+        });
+    }
+
+    /**
+     * Gets the commits for a given path on a given branch.
+     *
+     * @param path the path
+     * @param branch the branch
+     * @return the list of commits for the given path
+     *
+     * @throws IOException if an I/O error happens calling GitHub
+     * @throws FlowRegistryException if a non I/O error happens calling GitHub
+     */
+    public List<GHCommit> getCommits(final String path, final String branch) throws IOException, FlowRegistryException {
+        final String resolvedPath = getResolvedPath(path);
+        final String branchRef = BRANCH_REF_PATTERN.formatted(branch);
+        LOGGER.debug("Getting commits for [{}] from branch [{}] in repo [{}]", resolvedPath, branch, repository.getName());
+
+        return execute(() -> {
+            try {
+                final GHRef branchGhRef = repository.getRef(branchRef);
+                return repository.queryCommits()
+                        .path(resolvedPath)
+                        .from(branchGhRef.getObject().getSha())
+                        .pageSize(COMMIT_PAGE_SIZE)
+                        .list()
+                        .toList();
+            } catch (final FileNotFoundException fnf) {
+                throwPathOrBranchNotFound(fnf, resolvedPath, branchRef);
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Gets the top-level directory names, which are the directories at the root of the repo, or within the prefix if specified.
+     *
+     * @param branch the branch
+     * @return the set of directory names
+     *
+     * @throws IOException if an I/O error happens calling GitHub
+     * @throws FlowRegistryException if a non I/O error happens calling GitHub
+     */
+    public Set<String> getTopLevelDirectoryNames(final String branch) throws IOException, FlowRegistryException {
+        return getDirectoryItems("", branch, GHContent::isDirectory);
+    }
+
+    /**
+     * Gets the names of the directories contained within the given directory.
+     *
+     * @param directory the directory to list
+     * @param branch the branch
+     * @return the set of directory names
+     *
+     * @throws IOException if an I/O error happens calling GitHub
+     * @throws FlowRegistryException if a non I/O error happens calling GitHub
+     */
+    public Set<String> getDirectoryNames(final String directory, final String branch) throws IOException, FlowRegistryException {
+        return getDirectoryItems(directory, branch, GHContent::isDirectory);
+    }
+
+    /**
+     * Gets the names of the directories container within the given directory.
+     *
+     * @param directory the directory to list
+     * @param branch the branch
+     * @return the set of file names
+     *
+     * @throws IOException if an I/O error happens calling GitHub
+     * @throws FlowRegistryException if a non I/O error happens calling GitHub
+     */
+    public Set<String> getFileNames(final String directory, final String branch) throws IOException, FlowRegistryException {
+        return getDirectoryItems(directory, branch, GHContent::isFile);
+    }
+
+    /**
+     * Get the names of the items in the given directory on the given branch, filtered by the provided filter.
+     *
+     * @param directory the directory
+     * @param branch the branch
+     * @param filter the filter to determine which items get included
+     * @return the set of item names
+     *
+     * @throws IOException if an I/O error happens calling GitHub
+     * @throws FlowRegistryException if a non I/O error happens calling GitHub
+     */
+    private Set<String> getDirectoryItems(final String directory, final String branch, final Predicate<GHContent> filter) throws IOException, FlowRegistryException {
+        final String resolvedDirectory = getResolvedPath(directory);
+        final String branchRef = BRANCH_REF_PATTERN.formatted(branch);
+        LOGGER.debug("Getting directory items for [{}] from branch [{}] in repo [{}] ", resolvedDirectory, branch, repository.getName());
+
+        return execute(() -> {
+            try {
+                return repository.getDirectoryContent(resolvedDirectory, branchRef).stream()
+                        .filter(filter)
+                        .map(GHContent::getName)
+                        .collect(Collectors.toSet());
+            } catch (final FileNotFoundException fnf) {
+                throwPathOrBranchNotFound(fnf, resolvedDirectory, branchRef);
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Gets the current SHA for the given path from the given branch.
+     *
+     * @param path the path to the content
+     * @param branch the branch
+     * @return current sha for the given file, or empty optional
+     *
+     * @throws IOException if an I/O error happens calling GitHub
+     */
+    public Optional<String> getContentSha(final String path, final String branch) throws IOException, FlowRegistryException {
+        final String resolvedPath = getResolvedPath(path);
+        final String branchRef = BRANCH_REF_PATTERN.formatted(branch);
+        LOGGER.debug("Getting content SHA for [{}] from branch [{}] in repo [{}] ", resolvedPath, branch, repository.getName());
+
+        return execute(() -> {
+            try {
+                final GHContent ghContent = repository.getFileContent(resolvedPath, branchRef);
+                return Optional.of(ghContent.getSha());
+            } catch (final FileNotFoundException e) {
+                LOGGER.warn("Unable to get content SHA for [{}] from branch [{}] because content does not exist", resolvedPath, branch);
+                return Optional.empty();
+            }
+        });
+    }
+
+    /**
+     * Deletes the contents for the given file on the given branch.
+     *
+     * @param filePath the file path to delete
+     * @param commitMessage the commit message for the delete commit
+     * @param branch the branch to delete from
+     * @return the deleted content
+     *
+     * @throws IOException if an I/O error happens calling GitHub
+     * @throws FlowRegistryException if a non I/O error happens calling GitHub
+     */
+    public GHContent deleteContent(final String filePath, final String commitMessage, final String branch) throws FlowRegistryException, IOException {
+        final String resolvedPath = getResolvedPath(filePath);
+        LOGGER.debug("Deleting file [{}] in repo [{}] on branch [{}]", resolvedPath, repository.getName(), branch);
+        return execute(() -> {
+            try {
+                GHContent ghContent = repository.getFileContent(resolvedPath);
+                ghContent.delete(commitMessage, branch);
+                return ghContent;
+            } catch (final FileNotFoundException fnf) {
+                throwPathOrBranchNotFound(fnf, resolvedPath, branch);
+                return null;
+            }
+        });
+    }
+
+    private String getResolvedPath(final String path) {
+        return repoPath == null ? path : repoPath + "/" + path;
+    }
+
+    private void throwPathOrBranchNotFound(final FileNotFoundException fileNotFoundException, final String path, final String branch) throws FlowRegistryException {
+        throw new FlowRegistryException("Path [" + path + "] or Branch [" + branch + "] not found", fileNotFoundException);
+    }
+
+    private <T> T execute(final GHRequest<T> action) throws FlowRegistryException, IOException {
+        try {
+            return action.execute();
+        } catch (final FlowRegistryException | IOException e) {
+            throw e;
+        } catch (final Exception e) {
+            throw new FlowRegistryException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Functional interface for making a request to GitHub which may throw IOException.
+     *
+     * @param <T> the result of the request
+     */
+    private interface GHRequest<T> {
+
+        T execute() throws IOException, FlowRegistryException;
+
+    }
+
+    /**
+     * @return a new builder
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for the repository client.
+     */
+    public static class Builder {
+
+        private String apiUrl;
+        private GitHubAuthenticationType authenticationType;
+        private String personalAccessToken;
+        private String appInstallationToken;
+        private String repoOwner;
+        private String repoName;
+        private String repoPath;
+
+        public Builder apiUrl(final String apiUrl) {
+            this.apiUrl = apiUrl;
+            return this;
+        }
+
+        public Builder authenticationType(final GitHubAuthenticationType authenticationType) {
+            this.authenticationType = authenticationType;
+            return this;
+        }
+
+        public Builder personalAccessToken(final String personalAccessToken) {
+            this.personalAccessToken = personalAccessToken;
+            return this;
+        }
+
+        public Builder appInstallationToken(final String appInstallationToken) {
+            this.appInstallationToken = appInstallationToken;
+            return this;
+        }
+
+        public Builder repoOwner(final String repoOwner) {
+            this.repoOwner = repoOwner;
+            return this;
+        }
+
+        public Builder repoName(final String repoName) {
+            this.repoName = repoName;
+            return this;
+        }
+
+        public Builder repoPath(final String repoPath) {
+            this.repoPath = repoPath;
+            return this;
+        }
+
+        public GitHubRepositoryClient build() throws IOException, FlowRegistryException {
+            return new GitHubRepositoryClient(this);
+        }
+
+    }
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/JacksonFlowSnapshotSerializer.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/JacksonFlowSnapshotSerializer.java
new file mode 100644
index 0000000..b21b9f7
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/JacksonFlowSnapshotSerializer.java
@@ -0,0 +1,58 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.github;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntrospector;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Implementation of {@link FlowSnapshotSerializer} that is Jackson's ObjectMapper.
+ */
+public class JacksonFlowSnapshotSerializer implements FlowSnapshotSerializer {
+
+    private static final ObjectMapper OBJECT_MAPPER = JsonMapper.builder()
+            .serializationInclusion(JsonInclude.Include.NON_NULL)
+            .defaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL))
+            .annotationIntrospector(new JakartaXmlBindAnnotationIntrospector(TypeFactory.defaultInstance()))
+            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true)
+            .enable(SerializationFeature.INDENT_OUTPUT)
+            .addModule(new VersionedComponentModule())
+            .build();
+
+    @Override
+    public String serialize(final RegisteredFlowSnapshot flowSnapshot) throws IOException {
+        return OBJECT_MAPPER.writeValueAsString(flowSnapshot);
+    }
+
+    @Override
+    public RegisteredFlowSnapshot deserialize(final InputStream inputStream) throws IOException {
+        return OBJECT_MAPPER.readValue(inputStream, RegisteredFlowSnapshot.class);
+    }
+
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/VersionedComponentModule.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/VersionedComponentModule.java
new file mode 100644
index 0000000..813c6ba
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/VersionedComponentModule.java
@@ -0,0 +1,64 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.nifi.github;
+
+import com.fasterxml.jackson.databind.BeanDescription;
+import com.fasterxml.jackson.databind.SerializationConfig;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.BeanPropertyWriter;
+import com.fasterxml.jackson.databind.ser.BeanSerializerModifier;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.VersionedComponent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Jackson Module to customize serialization of versioned component objects.
+ */
+public class VersionedComponentModule extends SimpleModule {
+
+    private static final Set<String> EXCLUDE_JSON_FIELDS = Set.of("instanceIdentifier", "instanceGroupId");
+
+    @Override
+    public void setupModule(final SetupContext context) {
+        super.setupModule(context);
+        context.addBeanSerializerModifier(new VersionedComponentBeanSerializerModifier());
+    }
+
+    private static class VersionedComponentBeanSerializerModifier extends BeanSerializerModifier {
+        @Override
+        public List<BeanPropertyWriter> changeProperties(final SerializationConfig config, final BeanDescription beanDesc, final List<BeanPropertyWriter> beanProperties) {
+            if (!VersionedComponent.class.isAssignableFrom(beanDesc.getBeanClass())
+                    && !ConnectableComponent.class.isAssignableFrom(beanDesc.getBeanClass())) {
+                return super.changeProperties(config, beanDesc, beanProperties);
+            }
+
+            final List<BeanPropertyWriter> includedProperties = new ArrayList<>();
+            for (final BeanPropertyWriter property : beanProperties) {
+                if (!EXCLUDE_JSON_FIELDS.contains(property.getName())) {
+                    includedProperties.add(property);
+                }
+            }
+            return includedProperties;
+        }
+    }
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowRegistryClient b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowRegistryClient
new file mode 100644
index 0000000..bd467f5
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowRegistryClient
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.github.GitHubFlowRegistryClient
\ No newline at end of file
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java
new file mode 100644
index 0000000..0b09a96
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java
@@ -0,0 +1,202 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.github;
+
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.registry.flow.FlowRegistryBucket;
+import org.apache.nifi.registry.flow.FlowRegistryClientConfigurationContext;
+import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext;
+import org.apache.nifi.registry.flow.FlowRegistryException;
+import org.apache.nifi.registry.flow.RegisterAction;
+import org.apache.nifi.registry.flow.RegisteredFlow;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class GitHubFlowRegistryClientTest {
+
+    static final String DEFAULT_REPO_PATH = "some-path";
+    static final String DEFAULT_REPO_BRANCH = "some-branch";
+
+    private GitHubRepositoryClient repositoryClient;
+    private FlowSnapshotSerializer flowSnapshotSerializer;
+    private GitHubFlowRegistryClient flowRegistryClient;
+    private FlowRegistryClientConfigurationContext clientConfigurationContext;
+    private ComponentLog componentLog;
+
+    @BeforeEach
+    public void setup() throws IOException, FlowRegistryException {
+        repositoryClient = mock(GitHubRepositoryClient.class);
+        flowSnapshotSerializer = mock(FlowSnapshotSerializer.class);
+        flowRegistryClient = new TestableGitHubRepositoryClient(repositoryClient, flowSnapshotSerializer);
+
+        clientConfigurationContext = mock(FlowRegistryClientConfigurationContext.class);
+        componentLog = mock(ComponentLog.class);
+
+        final FlowRegistryClientInitializationContext initializationContext = mock(FlowRegistryClientInitializationContext.class);
+        when(initializationContext.getLogger()).thenReturn(componentLog);
+        flowRegistryClient.initialize(initializationContext);
+
+        when(repositoryClient.getCanRead()).thenReturn(true);
+        when(repositoryClient.getCanWrite()).thenReturn(true);
+        when(repositoryClient.getTopLevelDirectoryNames(anyString())).thenReturn(Set.of("existing-bucket"));
+    }
+
+    @Test
+    public void testRegisterFlow() throws IOException, FlowRegistryException {
+        setupClientConfigurationContextWithDefaults();
+
+        final String serializedSnapshotContent = "placeholder";
+        when(flowSnapshotSerializer.serialize(any(RegisteredFlowSnapshot.class))).thenReturn(serializedSnapshotContent);
+
+        final RegisteredFlow incomingFlow = createIncomingRegisteredFlow();
+        final RegisteredFlow resultFlow = flowRegistryClient.registerFlow(clientConfigurationContext, incomingFlow);
+        assertEquals(incomingFlow.getIdentifier(), resultFlow.getIdentifier());
+        assertEquals(incomingFlow.getName(), resultFlow.getName());
+        assertEquals(incomingFlow.getBucketIdentifier(), resultFlow.getBucketIdentifier());
+        assertEquals(incomingFlow.getBucketName(), resultFlow.getBucketName());
+        assertEquals(incomingFlow.getBranch(), resultFlow.getBranch());
+
+        final ArgumentCaptor<GitHubCreateContentRequest> argumentCaptor = ArgumentCaptor.forClass(GitHubCreateContentRequest.class);
+        verify(repositoryClient).createContent(argumentCaptor.capture());
+
+        final GitHubCreateContentRequest capturedArgument = argumentCaptor.getValue();
+        assertEquals(incomingFlow.getBranch(), capturedArgument.getBranch());
+        assertEquals(GitHubFlowRegistryClient.SNAPSHOT_FILE_PATH_FORMAT.formatted(incomingFlow.getBucketIdentifier(), incomingFlow.getIdentifier()), capturedArgument.getPath());
+        assertEquals(serializedSnapshotContent, capturedArgument.getContent());
+        assertNull(capturedArgument.getExistingContentSha());
+    }
+
+    @Test
+    public void testRegisterFlowSnapshot() throws IOException, FlowRegistryException {
+        setupClientConfigurationContextWithDefaults();
+
+        final RegisteredFlow incomingFlow = createIncomingRegisteredFlow();
+
+        final RegisteredFlowSnapshotMetadata incomingMetadata = new RegisteredFlowSnapshotMetadata();
+        incomingMetadata.setBranch(incomingFlow.getBranch());
+        incomingMetadata.setBucketIdentifier(incomingFlow.getBucketIdentifier());
+        incomingMetadata.setFlowIdentifier(incomingFlow.getIdentifier());
+        incomingMetadata.setComments("Unit test");
+
+        final RegisteredFlowSnapshot incomingSnapshot = new RegisteredFlowSnapshot();
+        incomingSnapshot.setFlow(incomingFlow);
+        incomingSnapshot.setSnapshotMetadata(incomingMetadata);
+        incomingSnapshot.setFlowContents(new VersionedProcessGroup());
+
+        final String snapshotFilePath = GitHubFlowRegistryClient.SNAPSHOT_FILE_PATH_FORMAT.formatted(incomingFlow.getBucketIdentifier(), incomingFlow.getIdentifier());
+        when(repositoryClient.getContentFromBranch(snapshotFilePath, DEFAULT_REPO_BRANCH)).thenReturn(new ByteArrayInputStream(new byte[0]));
+        when(flowSnapshotSerializer.deserialize(any(InputStream.class))).thenReturn(incomingSnapshot);
+
+        final String serializedSnapshotContent = "placeholder";
+        when(flowSnapshotSerializer.serialize(any(RegisteredFlowSnapshot.class))).thenReturn(serializedSnapshotContent);
+
+        final String commitSha = "commitSha";
+        when(repositoryClient.createContent(any(GitHubCreateContentRequest.class))).thenReturn(commitSha);
+
+        final RegisteredFlowSnapshot resultSnapshot = flowRegistryClient.registerFlowSnapshot(clientConfigurationContext, incomingSnapshot, RegisterAction.COMMIT);
+        assertNotNull(resultSnapshot);
+
+        final RegisteredFlow resultFlow = resultSnapshot.getFlow();
+        assertNotNull(resultFlow);
+        assertEquals(incomingFlow.getIdentifier(), resultFlow.getIdentifier());
+        assertEquals(incomingFlow.getName(), resultFlow.getName());
+        assertEquals(incomingFlow.getBucketIdentifier(), resultFlow.getBucketIdentifier());
+        assertEquals(incomingFlow.getBucketName(), resultFlow.getBucketName());
+        assertEquals(incomingFlow.getBranch(), resultFlow.getBranch());
+
+        final RegisteredFlowSnapshotMetadata resultMetadata = resultSnapshot.getSnapshotMetadata();
+        assertNotNull(resultMetadata);
+        assertEquals(incomingMetadata.getBranch(), resultMetadata.getBranch());
+        assertEquals(incomingMetadata.getBucketIdentifier(), resultMetadata.getBucketIdentifier());
+        assertEquals(incomingMetadata.getFlowIdentifier(), resultMetadata.getFlowIdentifier());
+        assertEquals(incomingMetadata.getComments(), resultMetadata.getComments());
+
+        final FlowRegistryBucket resultBucket = resultSnapshot.getBucket();
+        assertNotNull(resultBucket);
+        assertEquals(incomingMetadata.getBucketIdentifier(), resultBucket.getIdentifier());
+        assertEquals(incomingMetadata.getBucketIdentifier(), resultBucket.getName());
+    }
+
+    private RegisteredFlow createIncomingRegisteredFlow() {
+        final RegisteredFlow incomingFlow = new RegisteredFlow();
+        incomingFlow.setIdentifier("Flow1");
+        incomingFlow.setName("Flow1");
+        incomingFlow.setBucketIdentifier("Bucket1");
+        incomingFlow.setBucketName("Bucket1");
+        incomingFlow.setBranch(DEFAULT_REPO_BRANCH);
+        return incomingFlow;
+    }
+
+    private void setupClientConfigurationContext(final String repoPath, final String branch) {
+        final PropertyValue repoPathPropertyValue = createMockPropertyValue(repoPath);
+        when(clientConfigurationContext.getProperty(GitHubFlowRegistryClient.REPOSITORY_PATH)).thenReturn(repoPathPropertyValue);
+
+        final PropertyValue branchPropertyValue = createMockPropertyValue(branch);
+        when(clientConfigurationContext.getProperty(GitHubFlowRegistryClient.REPOSITORY_BRANCH)).thenReturn(branchPropertyValue);
+    }
+
+    private void setupClientConfigurationContextWithDefaults() {
+        setupClientConfigurationContext(DEFAULT_REPO_PATH, DEFAULT_REPO_BRANCH);
+    }
+
+    private PropertyValue createMockPropertyValue(final String value) {
+        final PropertyValue propertyValue = mock(PropertyValue.class);
+        when(propertyValue.getValue()).thenReturn(value);
+        return propertyValue;
+    }
+
+    private static class TestableGitHubRepositoryClient extends GitHubFlowRegistryClient {
+        private final GitHubRepositoryClient repositoryClient;
+        private final FlowSnapshotSerializer flowSnapshotSerializer;
+
+        public TestableGitHubRepositoryClient(final GitHubRepositoryClient repositoryClient, final FlowSnapshotSerializer flowSnapshotSerializer) {
+            this.repositoryClient = repositoryClient;
+            this.flowSnapshotSerializer = flowSnapshotSerializer;
+        }
+
+        @Override
+        protected GitHubRepositoryClient createRepositoryClient(final FlowRegistryClientConfigurationContext context) throws IOException, FlowRegistryException {
+            return repositoryClient;
+        }
+
+        @Override
+        protected FlowSnapshotSerializer createFlowSnapshotSerializer(final FlowRegistryClientInitializationContext initializationContext) {
+            return flowSnapshotSerializer;
+        }
+    }
+
+}
diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-nar/pom.xml b/nifi-extension-bundles/nifi-github-bundle/nifi-github-nar/pom.xml
new file mode 100644
index 0000000..25b10f5
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-nar/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+       Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-github-bundle</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-github-nar</artifactId>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-github-extensions</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-shared-nar</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+</project>
+
diff --git a/nifi-extension-bundles/nifi-github-bundle/pom.xml b/nifi-extension-bundles/nifi-github-bundle/pom.xml
new file mode 100644
index 0000000..d33cf43
--- /dev/null
+++ b/nifi-extension-bundles/nifi-github-bundle/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>nifi-standard-shared-bom</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../nifi-standard-shared-bundle/nifi-standard-shared-bom</relativePath>
+    </parent>
+
+    <artifactId>nifi-github-bundle</artifactId>
+    <packaging>pom</packaging>
+
+    <properties>
+        <github-api.version>1.321</github-api.version>
+    </properties>
+
+    <modules>
+        <module>nifi-github-extensions</module>
+        <module>nifi-github-nar</module>
+    </modules>
+</project>
diff --git a/nifi-extension-bundles/pom.xml b/nifi-extension-bundles/pom.xml
index 192bccb..c999eec 100755
--- a/nifi-extension-bundles/pom.xml
+++ b/nifi-extension-bundles/pom.xml
@@ -104,5 +104,6 @@
         <module>nifi-jolt-bundle</module>
         <module>nifi-questdb-bundle</module>
         <module>nifi-protobuf-bundle</module>
+        <module>nifi-github-bundle</module>
     </modules>
 </project>
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java
index a099a15..62d6d0f 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java
@@ -462,6 +462,11 @@
     }
 
     @Override
+    public String generateFlowId(final String flowName) throws IOException, FlowRegistryException {
+        return node.generateFlowId(flowName);
+    }
+
+    @Override
     public void setComponent(final LoggableComponent<FlowRegistryClient> component) {
         node.setComponent(component);
     }
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
index 0fbe4a3..4fed6f6 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
@@ -285,6 +285,11 @@
     }
 
     @Override
+    public String generateFlowId(final String flowName) throws IOException, FlowRegistryException {
+        return execute(() -> client.get().getComponent().generateFlowId(flowName));
+    }
+
+    @Override
     public void setComponent(final LoggableComponent<FlowRegistryClient> component) {
         client.set(component);
     }
@@ -299,7 +304,7 @@
             }
 
             final List<String> validationProblems = validationResults.stream()
-                .map(e -> e.getExplanation())
+                .map(ValidationResult::getExplanation)
                 .collect(Collectors.toList());
 
             throw new FlowRegistryInvalidException(validationProblems);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java
index bc4b584..e58457e 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java
@@ -63,5 +63,7 @@
     Set<RegisteredFlowSnapshotMetadata> getFlowVersions(FlowRegistryClientUserContext context, FlowLocation flowLocation) throws FlowRegistryException, IOException;
     Optional<String> getLatestVersion(FlowRegistryClientUserContext context, FlowLocation flowLocation) throws FlowRegistryException, IOException;
 
+    String generateFlowId(String flowName) throws IOException, FlowRegistryException;
+
     void setComponent(LoggableComponent<FlowRegistryClient> component);
 }
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index c88dc88..0244136 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -394,7 +394,6 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -3276,7 +3275,7 @@
         final FlowRegistryBranch defaultBranch = flowRegistryDAO.getDefaultBranchForUser(clientUserContext, registryClientId);
         return flowRegistryDAO.getFlowVersionsForUser(clientUserContext, registryClientId, defaultBranch.getName(), bucketId, flowId).stream()
                 .map(md -> createVersionedFlowSnapshotMetadataEntity(registryClientId, md))
-                .collect(Collectors.toSet());
+                .collect(Collectors.toCollection(LinkedHashSet::new));
     }
 
     @Override
@@ -5021,7 +5020,6 @@
         final Map<String, ParameterProviderReference> parameterProviderReferences = new HashMap<>();
         final Map<String, VersionedParameterContext> parameterContexts = createVersionedParameterContexts(processGroup, parameterProviderReferences);
 
-        final String flowId = versionedFlowDto.getFlowId() == null ? UUID.randomUUID().toString() : versionedFlowDto.getFlowId();
         final String registryId = requestEntity.getVersionedFlow().getRegistryId();
 
         final String selectedBranch;
@@ -5040,7 +5038,7 @@
         versionedFlow.setDescription(versionedFlowDto.getDescription());
         versionedFlow.setLastModifiedTimestamp(versionedFlow.getCreatedTimestamp());
         versionedFlow.setName(versionedFlowDto.getFlowName());
-        versionedFlow.setIdentifier(flowId);
+        versionedFlow.setIdentifier(versionedFlowDto.getFlowId());
 
         // Add the Versioned Flow and first snapshot to the Flow Registry
         final RegisteredFlowSnapshot registeredSnapshot;
@@ -5069,7 +5067,7 @@
             // then we need to capture the created versioned flow information as a partial successful result.
             if (registerNewFlow) {
                 try {
-                    final FlowLocation flowLocation = new FlowLocation(selectedBranch, versionedFlowDto.getBucketId(), flowId);
+                    final FlowLocation flowLocation = new FlowLocation(selectedBranch, versionedFlowDto.getBucketId(), registeredFlow.getIdentifier());
                     final FlowRegistryClientNode flowRegistryClientNode = flowRegistryDAO.getFlowRegistryClient(registryId);
                     flowRegistryClientNode.deregisterFlow(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()), flowLocation);
                 } catch (final IOException | FlowRegistryException e2) {
@@ -5357,6 +5355,8 @@
         }
 
         try {
+            final String generatedId = registry.generateFlowId(flow.getName());
+            flow.setIdentifier(generatedId);
             return registry.registerFlow(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()), flow);
         } catch (final IOException | FlowRegistryException e) {
             throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index 352c976..a6d04a1 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -1247,7 +1247,7 @@
 
         final RegisteredFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
         final VersionControlInformationDTO versionControlInfo = new VersionControlInformationDTO();
-        versionControlInfo.setBranch(requestVci.getBranch());
+        versionControlInfo.setBranch(metadata.getBranch());
         versionControlInfo.setBucketId(metadata.getBucketIdentifier());
         versionControlInfo.setBucketName(bucket.getName());
         versionControlInfo.setFlowDescription(flow.getDescription());
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java
index 5f4dba8..430f902 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java
@@ -198,7 +198,12 @@
 
             final FlowLocation flowLocation = new FlowLocation(branch, bucketId, flowId);
             final Set<RegisteredFlowSnapshotMetadata> flowVersions = flowRegistry.getFlowVersions(context, flowLocation);
-            final Set<RegisteredFlowSnapshotMetadata> sortedFlowVersions = new TreeSet<>(Comparator.comparingLong(RegisteredFlowSnapshotMetadata::getTimestamp));
+
+            // if somehow the timestamp of two versions is exactly the same, then we use version as a secondary comparison,
+            // otherwise one of the objects won't be added to the set since compareTo returns 0 indicating it already exists
+            final Set<RegisteredFlowSnapshotMetadata> sortedFlowVersions = new TreeSet<>(
+                    Comparator.comparingLong(RegisteredFlowSnapshotMetadata::getTimestamp)
+                            .thenComparing(RegisteredFlowSnapshotMetadata::getVersion));
             sortedFlowVersions.addAll(flowVersions);
             return sortedFlowVersions;
         } catch (final IOException | FlowRegistryException ioe) {