blob: 6a972af714a4d6440ae6568bb437905cc4e39502 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.registry.provider.flow.git;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.LsRemoteCommand;
import org.eclipse.jgit.api.PushCommand;
import org.eclipse.jgit.api.Status;
import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.api.errors.NoHeadException;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.RepositoryCache;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevTree;
import org.eclipse.jgit.transport.CredentialsProvider;
import org.eclipse.jgit.transport.PushResult;
import org.eclipse.jgit.transport.RemoteConfig;
import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
import org.eclipse.jgit.treewalk.TreeWalk;
import org.eclipse.jgit.util.FS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
import static org.apache.commons.lang3.StringUtils.isEmpty;
class GitFlowMetaData {
static final int CURRENT_LAYOUT_VERSION = 1;
static final String LAYOUT_VERSION = "layoutVer";
static final String BUCKET_ID = "bucketId";
static final String FLOWS = "flows";
static final String VER = "ver";
static final String FILE = "file";
static final String FLOW_NAME = "flowName";
static final String FLOW_DESC = "flowDesc";
static final String AUTHOR = "author";
static final String COMMENTS = "comments";
static final String CREATED = "created";
static final String BUCKET_FILENAME = "bucket.yml";
private static final Logger logger = LoggerFactory.getLogger(GitFlowMetaData.class);
private Repository gitRepo;
private String remoteToPush;
private CredentialsProvider credentialsProvider;
private final BlockingQueue<Long> pushQueue = new ArrayBlockingQueue<>(1);
* Bucket ID to Bucket.
private Map<String, Bucket> buckets = new HashMap<>();
public void setRemoteToPush(String remoteToPush) {
this.remoteToPush = remoteToPush;
public void setRemoteCredential(String userName, String password) {
this.credentialsProvider = new UsernamePasswordCredentialsProvider(userName, password);
* Open a Git repository using the specified directory.
* @param gitProjectRootDir a root directory of a Git project
* @return created Repository
* @throws IOException thrown when the specified directory does not exist,
* does not have read/write privilege or not containing .git directory
private Repository openRepository(final File gitProjectRootDir) throws IOException {
// Instead of using FileUtils.ensureDirectoryExistAndCanReadAndWrite, check availability manually here.
// Because the util will try to create a dir if not exist.
// The git dir should be initialized and configured by users.
if (!gitProjectRootDir.isDirectory()) {
throw new IOException(format("'%s' is not a directory or does not exist.", gitProjectRootDir));
if (!(gitProjectRootDir.canRead() && gitProjectRootDir.canWrite())) {
throw new IOException(format("Directory '%s' does not have read/write privilege.", gitProjectRootDir));
// Search .git dir but avoid searching parent directories.
final FileRepositoryBuilder builder = new FileRepositoryBuilder()
if (builder.getGitDir() == null) {
throw new IOException(format("Directory '%s' does not contain a .git directory." +
" Please init and configure the directory with 'git init' command before using it from NiFi Registry.",
private static boolean hasAtLeastOneReference(Repository repo) {"Checking references for repository {}", repo.toString());
for (Ref ref : repo.getAllRefs().values()) {
if (ref.getObjectId() == null) {
return true;
return false;
* Check if the provided local repository exists or not, provided by the 'Flow Storage Directory'
* configuration in the providers.xml.
* @param localRepo {@link File} object of the 'Flow Storage Directory' configuration
* @return true if the local repository exists, false otherwise
* @throws IOException if the .git directory of the local repository cannot be opened
public boolean localRepoExists(File localRepo) throws IOException {
if (!localRepo.isDirectory()) {"{} is not a directory or does not exist.", localRepo.getPath());
return false;
if (RepositoryCache.FileKey.isGitRepository(new File(localRepo.getPath()+"/.git"), FS.DETECTED)) {
final Git git = File(localRepo.getPath() + "/.git"));
final Repository repository = git.getRepository();"Checking for git references in {}", localRepo.getPath());
final boolean referenceExists = hasAtLeastOneReference(repository);
if (referenceExists) {"{} local repository exists with references so no need to clone remote", localRepo.getPath());
// Can be an empty repository if no references are present should we pull from remote?
return true;
return false;
* Validate if the provided 'Remote Clone Repository' configuration in the providers.xml exists or not.
* If the remote repository does not exist, an {@link IllegalArgumentException} will be thrown.
* @param remoteRepository the URI value of the 'Remote Clone Repository' configuration
* @throws IOException if creating the repository fails
public void remoteRepoExists(String remoteRepository) throws IOException {
final Git git = new Git(FileRepositoryBuilder.create(new File(remoteRepository)));
final LsRemoteCommand lsCmd = git.lsRemote();
try {
} catch (Exception e){
throw new IllegalArgumentException("InvalidRemoteRepository : Given remote repository is not valid");
* If validation of remote clone repository throws no exception then clone the repository given
* in the 'Remote Clone Repository' configuration. Currently the default branch of remote will be cloned.
* @param localRepo {@link File} object of the 'Flow Storage Directory' configuration
* @param remoteRepository the URI value of the 'Remote Clone Repository' configuration
* @throws GitAPIException if unable to call the remote repository
public void cloneRepository(File localRepo, String remoteRepository) throws GitAPIException {"Cloning the repository {} in {}", remoteRepository, localRepo.getPath());
public void loadGitRepository(File gitProjectRootDir) throws IOException, GitAPIException {
gitRepo = openRepository(gitProjectRootDir);
try (final Git git = new Git(gitRepo)) {
// Check if remote exists.
if (!isEmpty(remoteToPush)) {
final List<RemoteConfig> remotes = git.remoteList().call();
final boolean isRemoteExist = -> remote.getName().equals(remoteToPush));
if (!isRemoteExist) {
final List<String> remoteNames =;
throw new IllegalArgumentException(
format("The configured remote '%s' to push does not exist. Available remotes are %s", remoteToPush, remoteNames));
boolean isLatestCommit = true;
try {
for (RevCommit commit : git.log().call()) {
final String shortCommitId = commit.getId().abbreviate(7).name();
logger.debug("Processing a commit: {}", shortCommitId);
final RevTree tree = commit.getTree();
try (final TreeWalk treeWalk = new TreeWalk(gitRepo)) {
// Path -> ObjectId
final Map<String, ObjectId> bucketObjectIds = new HashMap<>();
final Map<String, ObjectId> flowSnapshotObjectIds = new HashMap<>();
while ( {
if (treeWalk.isSubtree()) {
} else {
final String pathString = treeWalk.getPathString();
// TODO: what is this nth?? When does it get grater than 0? Tree count seems to be always 1..
if (pathString.endsWith("/" + BUCKET_FILENAME)) {
bucketObjectIds.put(pathString, treeWalk.getObjectId(0));
} else if (pathString.endsWith(GitFlowPersistenceProvider.SNAPSHOT_EXTENSION)) {
flowSnapshotObjectIds.put(pathString, treeWalk.getObjectId(0));
if (bucketObjectIds.isEmpty()) {
// No bucket.yml means at this point, all flows are deleted. No need to scan older commits because those are already deleted.
logger.debug("Tree at commit {} does not contain any " + BUCKET_FILENAME + ". Stop loading commits here.", shortCommitId);
loadBuckets(gitRepo, commit, isLatestCommit, bucketObjectIds, flowSnapshotObjectIds);
isLatestCommit = false;
} catch (NoHeadException e) {
logger.debug("'{}' does not have any commit yet. Starting with empty buckets.", gitProjectRootDir);
void startPushThread() {
// If successfully loaded, start pushing thread if necessary.
if (isEmpty(remoteToPush)) {
final ThreadFactory threadFactory = new BasicThreadFactory.Builder()
.daemon(true).namingPattern(getClass().getSimpleName() + " Push thread").build();
// Use scheduled fixed delay to control the minimum interval between push activities.
// The necessity of executing push is controlled by offering messages to the pushQueue.
// If multiple commits are made within this time window, those are pushed by a single push execution.
final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
executorService.scheduleWithFixedDelay(() -> {
final Long offeredTimestamp;
try {
offeredTimestamp = pushQueue.take();
} catch (InterruptedException e) {
logger.warn("Waiting for push request has been interrupted due to {}", e.getMessage(), e);
logger.debug("Took a push request sent at {} to {}...", offeredTimestamp, remoteToPush);
final PushCommand pushCommand = new Git(gitRepo).push().setRemote(remoteToPush);
if (credentialsProvider != null) {
try {
final Iterable<PushResult> pushResults =;
for (PushResult pushResult : pushResults) {
} catch (GitAPIException e) {
logger.error(format("Failed to push commits to %s due to %s", remoteToPush, e), e);
}, 10, 10, TimeUnit.SECONDS);
private void loadBuckets(Repository gitRepo, RevCommit commit, boolean isLatestCommit, Map<String, ObjectId> bucketObjectIds, Map<String, ObjectId> flowSnapshotObjectIds) throws IOException {
final Yaml yaml = new Yaml();
for (String bucketFilePath : bucketObjectIds.keySet()) {
final ObjectId bucketObjectId = bucketObjectIds.get(bucketFilePath);
final Map<String, Object> bucketMeta;
try (InputStream bucketIn = gitRepo.newObjectReader().open(bucketObjectId).openStream()) {
bucketMeta = yaml.load(bucketIn);
if (!validateRequiredValue(bucketMeta, bucketFilePath, LAYOUT_VERSION, BUCKET_ID, FLOWS)) {
int layoutVersion = (int) bucketMeta.get(LAYOUT_VERSION);
if (layoutVersion > CURRENT_LAYOUT_VERSION) {
logger.warn("{} has unsupported {} {}. This Registry can only support {} or lower. Skipping it.",
final String bucketId = (String) bucketMeta.get(BUCKET_ID);
final Bucket bucket;
if (isLatestCommit) {
// If this is the latest commit, then create one.
bucket = getBucketOrCreate(bucketId);
} else {
// Otherwise non-existing bucket means it's already deleted.
final Optional<Bucket> bucketOpt = getBucket(bucketId);
if (bucketOpt.isPresent()) {
bucket = bucketOpt.get();
} else {
logger.debug("Bucket {} does not exist any longer. It may have been deleted.", bucketId);
// Since the bucketName is restored from pathname, it can be different from the original bucket name when it sanitized.
final String bucketDirName = bucketFilePath.substring(0, bucketFilePath.lastIndexOf("/"));
// Since commits are read in LIFO order, avoid old commits overriding the latest bucket name.
if (isEmpty(bucket.getBucketDirName())) {
final Map<String, Object> flows = (Map<String, Object>) bucketMeta.get(FLOWS);
loadFlows(commit, isLatestCommit, bucket, bucketFilePath, flows, flowSnapshotObjectIds);
private void loadFlows(RevCommit commit, boolean isLatestCommit, Bucket bucket, String backetFilePath, Map<String, Object> flows, Map<String, ObjectId> flowSnapshotObjectIds) {
for (String flowId : flows.keySet()) {
final Map<String, Object> flowMeta = (Map<String, Object>) flows.get(flowId);
if (!validateRequiredValue(flowMeta, backetFilePath + ":" + flowId, VER, FILE)) {
final Flow flow;
if (isLatestCommit) {
// If this is the latest commit, then create one.
flow = bucket.getFlowOrCreate(flowId);
} else {
// Otherwise non-existing flow means it's already deleted.
final Optional<Flow> flowOpt = bucket.getFlow(flowId);
if (flowOpt.isPresent()) {
flow = flowOpt.get();
} else {
logger.debug("Flow {} does not exist in bucket {}:{} any longer. It may have been deleted.", flowId, bucket.getBucketDirName(), bucket.getBucketId());
final int version = (int) flowMeta.get(VER);
final String flowSnapshotFilename = (String) flowMeta.get(FILE);
// Since commits are read in LIFO order, avoid old commits overriding the latest pointer.
if (!flow.hasVersion(version)) {
final Flow.FlowPointer pointer = new Flow.FlowPointer(flowSnapshotFilename);
final File flowSnapshotFile = new File(new File(backetFilePath).getParent(), flowSnapshotFilename);
final ObjectId objectId = flowSnapshotObjectIds.get(flowSnapshotFile.getPath());
if (objectId == null) {
logger.warn("Git object id for Flow {} version {} with path {} in bucket {}:{} was not found. Ignoring this entry.",
flowId, version, flowSnapshotFile.getPath(), bucket.getBucketDirName(), bucket.getBucketId());
if (flowMeta.containsKey(FLOW_NAME)) {
if (flowMeta.containsKey(FLOW_DESC)) {
if (flowMeta.containsKey(AUTHOR)) {
if (flowMeta.containsKey(COMMENTS)) {
if (flowMeta.containsKey(CREATED)) {
flow.putVersion(version, pointer);
private boolean validateRequiredValue(final Map map, String nameOfMap, Object ... keys) {
for (Object key : keys) {
if (!map.containsKey(key)) {
logger.warn("{} does not have {}. Skipping it.", nameOfMap, key);
return false;
return true;
public Bucket getBucketOrCreate(String bucketId) {
return buckets.computeIfAbsent(bucketId, k -> new Bucket(bucketId));
public Optional<Bucket> getBucket(String bucketId) {
return Optional.ofNullable(buckets.get(bucketId));
Map<String, Bucket> getBuckets() {
return buckets;
void saveBucket(final Bucket bucket, final File bucketDir) throws IOException {
final Yaml yaml = new Yaml();
final Map<String, Object> serializedBucket = bucket.serialize();
final File bucketFile = new File(bucketDir, GitFlowMetaData.BUCKET_FILENAME);
try (final Writer writer = new OutputStreamWriter(
new FileOutputStream(bucketFile), StandardCharsets.UTF_8)) {
yaml.dump(serializedBucket, writer);
boolean isGitDirectoryClean() throws GitAPIException {
final Status status = new Git(gitRepo).status().call();
return status.isClean() && !status.hasUncommittedChanges();
* Create a Git commit.
* @param author The name of a NiFi Registry user who created the snapshot. It will be added to the commit message.
* @param message Commit message.
* @param bucket A bucket to commit.
* @param flowPointer A flow pointer for the flow snapshot which is updated.
* After a commit is created, new commit rev id and flow snapshot file object id are set to this pointer.
* It can be null if none of flow content is modified.
void commit(String author, String message, Bucket bucket, Flow.FlowPointer flowPointer) throws GitAPIException, IOException {
try (final Git git = new Git(gitRepo)) {
// Execute add command for newly added files (if any).
// Execute add command again for deleted files (if any).
final String commitMessage = isEmpty(author) ? message
: format("%s\n\nBy NiFi Registry user: %s", message, author);
final RevCommit commit = git.commit()
if (flowPointer != null) {
final RevTree tree = commit.getTree();
final String bucketDirName = bucket.getBucketDirName();
final String flowSnapshotPath = new File(bucketDirName, flowPointer.getFileName()).getPath();
try (final TreeWalk treeWalk = new TreeWalk(gitRepo)) {
while ( {
if (treeWalk.isSubtree()) {
} else {
final String pathString = treeWalk.getPathString();
if (pathString.equals(flowSnapshotPath)) {
// Capture updated object id.
final String flowSnapshotObjectId = treeWalk.getObjectId(0).getName();
// Push if necessary.
if (!isEmpty(remoteToPush)) {
// Use different thread since it takes longer.
final long offeredTimestamp = System.currentTimeMillis();
if (pushQueue.offer(offeredTimestamp)) {
logger.debug("New push request is offered at {}.", offeredTimestamp);
byte[] getContent(String objectId) throws IOException {
final ObjectId flowSnapshotObjectId = gitRepo.resolve(objectId);
return gitRepo.newObjectReader().open(flowSnapshotObjectId).getBytes();