blob: 74b2393432779a011ee7d11780f338b7a6013e69 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.revision.jdbc;
import org.apache.nifi.registry.revision.api.DeleteRevisionTask;
import org.apache.nifi.registry.revision.api.ExpiredRevisionClaimException;
import org.apache.nifi.registry.revision.api.InvalidRevisionException;
import org.apache.nifi.registry.revision.api.Revision;
import org.apache.nifi.registry.revision.api.RevisionClaim;
import org.apache.nifi.registry.revision.api.RevisionManager;
import org.apache.nifi.registry.revision.api.RevisionUpdate;
import org.apache.nifi.registry.revision.api.UpdateRevisionTask;
import org.apache.nifi.registry.revision.standard.RevisionComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* A database implementation of {@link RevisionManager} that use's Spring's {@link JdbcTemplate}.
*
* It is expected that the database has a table named REVISION with the following schema, but it is up to consumers
* of this library to manage the creation of this table:
*
* <pre>
* {@code
* CREATE TABLE REVISION (
* ENTITY_ID VARCHAR(50) NOT NULL,
* VERSION BIGINT NOT NULL DEFAULT(0),
* CLIENT_ID VARCHAR(100),
* CONSTRAINT PK__REVISION_ENTITY_ID PRIMARY KEY (ENTITY_ID)
* );
* }
* </pre>
*
* This implementation leverages the transactional semantics of a relational database to implement an optimistic-locking strategy.
*
* In order to function correctly, this must be used with in a transaction with an isolation level of at least READ_COMMITTED.
*/
public class JdbcRevisionManager implements RevisionManager {
private static Logger LOGGER = LoggerFactory.getLogger(JdbcRevisionManager.class);
private final JdbcTemplate jdbcTemplate;
public JdbcRevisionManager(final JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = Objects.requireNonNull(jdbcTemplate);
}
@Override
public Revision getRevision(final String entityId) {
final Revision revision = retrieveRevision(entityId);
if (revision == null) {
return createRevision(entityId);
} else {
return revision;
}
}
private Revision retrieveRevision(final String entityId) {
try {
final String selectSql = "SELECT * FROM REVISION WHERE ENTITY_ID = ?";
return jdbcTemplate.queryForObject(selectSql, new Object[] {entityId}, new RevisionRowMapper());
} catch (EmptyResultDataAccessException e) {
return null;
}
}
private Revision createRevision(final String entityId) {
final Revision revision = new Revision(0L, null, entityId);
final String insertSql = "INSERT INTO REVISION(ENTITY_ID, VERSION) VALUES (?, ?)";
jdbcTemplate.update(insertSql, revision.getEntityId(), revision.getVersion());
return revision;
}
@Override
public <T> RevisionUpdate<T> updateRevision(final RevisionClaim claim, final UpdateRevisionTask<T> task) {
LOGGER.debug("Attempting to update revision using {}", claim);
final List<Revision> revisionList = new ArrayList<>(claim.getRevisions());
revisionList.sort(new RevisionComparator());
// Update each revision which increments the version and locks the row.
// Since we are in transaction these changes won't be committed unless the entire task completes successfully.
// It is important this happens first so that the task won't execute unless the revision can be updated.
// This prevents any other changes from happening that might not be part of the database transaction.
for (final Revision incomingRevision : revisionList) {
// calling getRevision here will lazily create an initial revision
getRevision(incomingRevision.getEntityId());
updateRevision(incomingRevision);
}
// We successfully verified all revisions.
LOGGER.debug("Successfully verified Revision Claim for all revisions");
// Perform the update
final RevisionUpdate<T> updatedEntity = task.update();
LOGGER.debug("Update task completed");
return updatedEntity;
}
/*
* Issue an update that increments the version, but only if the incoming version OR client id match the existing revision.
*
* If no rows were updated, then the incoming revision is stale and an exception is thrown.
*
* If a row was updated, then the incoming revision is good and that row is no locked in the DB, and we can proceed.
*/
private void updateRevision(final Revision incomingRevision) {
final String sql =
"UPDATE REVISION SET " +
"VERSION = (VERSION + 1), " +
"CLIENT_ID = ? " +
"WHERE " +
"ENTITY_ID = ? AND (" +
"VERSION = ? OR CLIENT_ID = ? " +
")";
final String entityId = incomingRevision.getEntityId();
final String clientId = incomingRevision.getClientId();
final Long version = incomingRevision.getVersion();
final int rowsUpdated = jdbcTemplate.update(sql, clientId, entityId, version, clientId);
if (rowsUpdated <= 0) {
throw new InvalidRevisionException("Invalid Revision was given for entity with ID '" + entityId + "'");
}
}
@Override
public <T> T deleteRevision(final RevisionClaim claim, final DeleteRevisionTask<T> task)
throws ExpiredRevisionClaimException {
LOGGER.debug("Attempting to delete revision using {}", claim);
final List<Revision> revisionList = new ArrayList<>(claim.getRevisions());
revisionList.sort(new RevisionComparator());
// Issue the delete for each revision
// Since we are in transaction these changes won't be committed unless the entire task completes successfully.
// It is important this happens first so that the task won't execute unless the revision can be deleted.
// This prevents any other changes from happening that might not be part of the database transaction.
for (final Revision revision : revisionList) {
deleteRevision(revision);
}
// Perform the action provided
final T taskResult = task.performTask();
LOGGER.debug("Delete task completed");
return taskResult;
}
/*
* Issue a delete for a revision of a given entity, but only if the incoming version OR client id match the existing revision.
*
* If no rows were updated, then the incoming revision is stale and an exception is thrown.
*
* If a row was deleted, then the incoming revision is good and that row is no locked in the DB, and we can proceed.
*/
private void deleteRevision(final Revision revision) {
final String sql =
"DELETE FROM REVISION WHERE " +
"ENTITY_ID = ? AND (" +
"VERSION = ? OR CLIENT_ID = ? " +
")";
final String entityId = revision.getEntityId();
final String clientId = revision.getClientId();
final Long version = revision.getVersion();
final int rowsUpdated = jdbcTemplate.update(sql, entityId, version, clientId);
if (rowsUpdated <= 0) {
throw new ExpiredRevisionClaimException("Invalid Revision was given for entity with ID '" + entityId + "'");
}
}
@Override
public void reset(final Collection<Revision> revisions) {
// delete all revisions
jdbcTemplate.update("DELETE FROM REVISION");
// insert all the provided revisions
final String insertSql = "INSERT INTO REVISION(ENTITY_ID, VERSION, CLIENT_ID) VALUES (?, ?, ?)";
for (final Revision revision : revisions) {
jdbcTemplate.update(insertSql, revision.getEntityId(), revision.getVersion(), revision.getClientId());
}
}
@Override
public List<Revision> getAllRevisions() {
return jdbcTemplate.query("SELECT * FROM REVISION", new RevisionRowMapper());
}
@Override
public Map<String, Revision> getRevisionMap() {
final Map<String,Revision> revisionMap = new HashMap<>();
final RevisionRowMapper rowMapper = new RevisionRowMapper();
jdbcTemplate.query("SELECT * FROM REVISION", (rs) -> {
final Revision revision = rowMapper.mapRow(rs, 0);
revisionMap.put(revision.getEntityId(), revision);
});
return revisionMap;
}
}