blob: 3eecc9a5e5e57b39247dc85c30634ca40f8cbbed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.revision.jdbc;
import org.apache.nifi.registry.TestApplication;
import org.apache.nifi.registry.revision.api.DeleteRevisionTask;
import org.apache.nifi.registry.revision.api.EntityModification;
import org.apache.nifi.registry.revision.api.InvalidRevisionException;
import org.apache.nifi.registry.revision.api.Revision;
import org.apache.nifi.registry.revision.api.RevisionClaim;
import org.apache.nifi.registry.revision.api.RevisionManager;
import org.apache.nifi.registry.revision.api.RevisionUpdate;
import org.apache.nifi.registry.revision.api.UpdateRevisionTask;
import org.apache.nifi.registry.revision.standard.StandardRevisionClaim;
import org.apache.nifi.registry.revision.standard.StandardRevisionUpdate;
import org.flywaydb.core.internal.jdbc.DatabaseType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.TestExecutionListeners;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
import org.springframework.test.context.transaction.TransactionalTestExecutionListener;
import org.springframework.transaction.annotation.Transactional;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@Transactional
@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestApplication.class, webEnvironment = SpringBootTest.WebEnvironment.NONE)
@TestExecutionListeners({DependencyInjectionTestExecutionListener.class, TransactionalTestExecutionListener.class})
public class TestJdbcRevisionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TestJdbcRevisionManager.class);
private static final String CREATE_TABLE_SQL_DEFAULT =
"CREATE TABLE REVISION (\n" +
" ENTITY_ID VARCHAR(50) NOT NULL,\n" +
" VERSION BIGINT NOT NULL DEFAULT (0),\n" +
" CLIENT_ID VARCHAR(100),\n" +
" CONSTRAINT PK__REVISION_ENTITY_ID PRIMARY KEY (ENTITY_ID)\n" +
")";
private static final String CREATE_TABLE_SQL_MYSQL =
"CREATE TABLE REVISION (\n" +
" ENTITY_ID VARCHAR(50) NOT NULL,\n" +
" VERSION BIGINT NOT NULL DEFAULT 0,\n" +
" CLIENT_ID VARCHAR(100),\n" +
" CONSTRAINT PK__REVISION_ENTITY_ID PRIMARY KEY (ENTITY_ID)\n" +
")";
@Autowired
private JdbcTemplate jdbcTemplate;
private RevisionManager revisionManager;
@Before
public void setup() throws SQLException {
revisionManager = new JdbcRevisionManager(jdbcTemplate);
// Create the REVISION table if it does not exist
final DataSource dataSource = jdbcTemplate.getDataSource();
LOGGER.info("#### DataSource class is {}", new Object[]{dataSource.getClass().getCanonicalName()});
try (final Connection connection = dataSource.getConnection()) {
final String createTableSql;
final DatabaseType databaseType = DatabaseType.fromJdbcConnection(connection);
if (databaseType == DatabaseType.MYSQL) {
createTableSql = CREATE_TABLE_SQL_MYSQL;
} else {
createTableSql = CREATE_TABLE_SQL_DEFAULT;
}
final DatabaseMetaData meta = connection.getMetaData();
try (final ResultSet res = meta.getTables(null, null, "REVISION", new String[]{"TABLE"})) {
if (!res.next()) {
jdbcTemplate.execute(createTableSql);
}
}
}
}
@Test
public void testGetRevisionWhenDoesNotExist() {
final String entityId = "entity1";
final Revision revision = revisionManager.getRevision(entityId);
assertNotNull(revision);
assertEquals(entityId, revision.getEntityId());
assertEquals(0L, revision.getVersion().longValue());
assertNull(revision.getClientId());
}
@Test
public void testGetRevisionWhenExists() {
final String entityId = "entity1";
final Long version = new Long(99);
createRevision(entityId, version, null);
final Revision revision = revisionManager.getRevision(entityId);
assertNotNull(revision);
assertEquals(entityId, revision.getEntityId());
assertEquals(version.longValue(), revision.getVersion().longValue());
assertNull(revision.getClientId());
}
@Test
public void testUpdateRevisionWithCurrentVersionNoClientId() {
// create the revision being sent in by the client
final String entityId = "entity-1";
final Revision revision = new Revision(99L, null, entityId);
final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
// seed the database with a matching revision
createRevision(revision.getEntityId(), revision.getVersion(), null);
// perform an update task
final RevisionUpdate<RevisableEntity> revisionUpdate = revisionManager.updateRevision(
revisionClaim, createUpdateTask(entityId));
assertNotNull(revisionUpdate);
// version should go to 100 since it was 99 before
verifyRevisionUpdate(entityId, revisionUpdate, new Long(100), null);
}
@Test(expected = InvalidRevisionException.class)
public void testUpdateRevisionWithStaleVersionNoClientId() {
// create the revision being sent in by the client
final String entityId = "entity-1";
final Revision revision = new Revision(99L, null, entityId);
final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
// seed the database with a revision that has a newer version
createRevision(revision.getEntityId(), revision.getVersion() + 1, null);
// perform an update task which should throw InvalidRevisionException
revisionManager.updateRevision(revisionClaim, createUpdateTask(entityId));
}
@Test
public void testUpdateRevisionWithStaleVersionAndSameClientId() {
// create the revision being sent in by the client
final String entityId = "entity-1";
final String clientId = "client-1";
final Revision revision = new Revision(99L, clientId, entityId);
final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
// seed the database with a revision that has a newer version
createRevision(revision.getEntityId(), revision.getVersion() + 1, clientId);
// perform an update task
final RevisionUpdate<RevisableEntity> revisionUpdate = revisionManager.updateRevision(
revisionClaim, createUpdateTask(entityId));
assertNotNull(revisionUpdate);
// client in 99 which was not latest version, but since client id was the same the update was allowed
// and the incremented version should be based on the version in the DB which was 100, so it goes to 101
verifyRevisionUpdate(entityId, revisionUpdate, new Long(101), clientId);
}
@Test
public void testUpdateRevisionWhenDoesNotExist() {
// create the revision being sent in by the client
final String entityId = "entity-1";
final String clientId = "client-new";
final Revision revision = new Revision(0L, clientId, entityId);
final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
// perform an update task
final RevisionUpdate<RevisableEntity> revisionUpdate = revisionManager.updateRevision(
revisionClaim, createUpdateTask(entityId));
assertNotNull(revisionUpdate);
// version should go to 1 and client id should be updated to client-new
verifyRevisionUpdate(entityId, revisionUpdate, new Long(1), clientId);
}
@Test
public void testUpdateRevisionWithCurrentVersionAndNewClientId() {
// create the revision being sent in by the client
final String entityId = "entity-1";
final String clientId = "client-new";
final Revision revision = new Revision(99L, clientId, entityId);
final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
// seed the database with a revision that has same version but a different client id
createRevision(revision.getEntityId(), revision.getVersion(), "client-old");
// perform an update task
final RevisionUpdate<RevisableEntity> revisionUpdate = revisionManager.updateRevision(
revisionClaim, createUpdateTask(entityId));
assertNotNull(revisionUpdate);
// version should go to 100 and client id should be updated to client-new
verifyRevisionUpdate(entityId, revisionUpdate, new Long(100), clientId);
}
@Test
public void testDeleteRevisionWithCurrentVersionAndNoClientId() {
// create the revision being sent in by the client
final String entityId = "entity-1";
final Revision revision = new Revision(99L, null, entityId);
final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
// seed the database with a matching revision
createRevision(revision.getEntityId(), revision.getVersion(), null);
// perform an update task
final RevisableEntity deletedEntity = revisionManager.deleteRevision(
revisionClaim, createDeleteTask(entityId));
assertNotNull(deletedEntity);
assertEquals(entityId, deletedEntity.getId());
}
@Test(expected = InvalidRevisionException.class)
public void testDeleteRevisionWithStaleVersionAndNoClientId() {
// create the revision being sent in by the client
final String entityId = "entity-1";
final Revision revision = new Revision(99L, null, entityId);
final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
// seed the database with a revision that has a newer version
createRevision(revision.getEntityId(), revision.getVersion() + 1, null);
// perform an update task which should throw InvalidRevisionException
revisionManager.deleteRevision(revisionClaim, createDeleteTask(entityId));
}
@Test
public void testDeleteRevisionWithStaleVersionAndSameClientId() {
// create the revision being sent in by the client
final String entityId = "entity-1";
final String clientId = "client-1";
final Revision revision = new Revision(99L, clientId, entityId);
final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
// seed the database with a revision that has a newer version
createRevision(revision.getEntityId(), revision.getVersion() + 1, clientId);
// perform the delete
final RevisableEntity deletedEntity = revisionManager.deleteRevision(
revisionClaim, createDeleteTask(entityId));
assertNotNull(deletedEntity);
assertEquals(entityId, deletedEntity.getId());
}
@Test
public void testDeleteRevisionWithCurrentVersionAndNewClientId() {
// create the revision being sent in by the client
final String entityId = "entity-1";
final String clientId = "client-new";
final Revision revision = new Revision(99L, clientId, entityId);
final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
// seed the database with a revision that has same version but a different client id
createRevision(revision.getEntityId(), revision.getVersion(), "client-old");
// perform the delete
final RevisableEntity deletedEntity = revisionManager.deleteRevision(
revisionClaim, createDeleteTask(entityId));
assertNotNull(deletedEntity);
assertEquals(entityId, deletedEntity.getId());
}
@Test
public void testGetAllAndReset() {
createRevision("entity1", new Long(1), null);
createRevision("entity2", new Long(1), null);
final List<Revision> allRevisions = revisionManager.getAllRevisions();
assertNotNull(allRevisions);
assertEquals(2, allRevisions.size());
final Revision resetRevision1 = new Revision(10L, null, "resetEntity1");
final Revision resetRevision2 = new Revision(50L, null, "resetEntity2");
final Revision resetRevision3 = new Revision(20L, "client1", "resetEntity3");
revisionManager.reset(Arrays.asList(resetRevision1, resetRevision2, resetRevision3));
final List<Revision> afterResetRevisions = revisionManager.getAllRevisions();
assertNotNull(afterResetRevisions);
assertEquals(3, afterResetRevisions.size());
assertTrue(afterResetRevisions.contains(resetRevision1));
assertTrue(afterResetRevisions.contains(resetRevision2));
assertTrue(afterResetRevisions.contains(resetRevision3));
}
@Test
public void testGetRevisionMap() {
createRevision("entity1", new Long(1), null);
createRevision("entity2", new Long(1), null);
final Map<String,Revision> revisions = revisionManager.getRevisionMap();
assertNotNull(revisions);
assertEquals(2, revisions.size());
final Revision revision1 = revisions.get("entity1");
assertNotNull(revision1);
assertEquals("entity1", revision1.getEntityId());
final Revision revision2 = revisions.get("entity2");
assertNotNull(revision2);
assertEquals("entity2", revision2.getEntityId());
}
private DeleteRevisionTask<RevisableEntity> createDeleteTask(final String entityId) {
return () -> {
// normally we would retrieve the entity from some kind of service/dao
final RevisableEntity entity = new RevisableEntity();
entity.setId(entityId);
return entity;
};
}
private UpdateRevisionTask<RevisableEntity> createUpdateTask(final String entityId) {
return () -> {
// normally we would retrieve the entity from some kind of service/dao
final RevisableEntity entity = new RevisableEntity();
entity.setId(entityId);
// get the latest revision which has already been incremented
final Revision updatedRevision = revisionManager.getRevision(entity.getId());
entity.setRevision(updatedRevision);
final EntityModification entityModification = new EntityModification(updatedRevision, "user1");
return new StandardRevisionUpdate<>(entity, entityModification);
};
}
private void verifyRevisionUpdate(final String entityId, final RevisionUpdate<RevisableEntity> revisionUpdate,
final Long expectedVersion, final String expectedClientId) {
// verify we got back the entity we expected
final RevisableEntity updatedEntity = revisionUpdate.getEntity();
assertNotNull(updatedEntity);
assertEquals(entityId, updatedEntity.getId());
// verify the revision in the entity is set and is the updated revision (i.e. version of 100, not 99)
final Revision updatedRevision = updatedEntity.getRevision();
assertNotNull(updatedRevision);
assertEquals(entityId, updatedRevision.getEntityId());
assertEquals(expectedVersion, updatedRevision.getVersion());
assertEquals(expectedClientId, updatedRevision.getClientId());
// verify the entity modification is correctly populated
final EntityModification entityModification = revisionUpdate.getLastModification();
assertNotNull(entityModification);
Assert.assertEquals("user1", entityModification.getLastModifier());
assertEquals(updatedRevision, entityModification.getRevision());
// verify the updated revisions is correctly populated and matches the updated entity revision
final Set<Revision> updatedRevisions = revisionUpdate.getUpdatedRevisions();
assertNotNull(updatedRevisions);
assertEquals(1, updatedRevisions.size());
assertEquals(updatedRevision, updatedRevisions.stream().findFirst().get());
}
private void createRevision(final String entityId, final Long version, final String clientId) {
jdbcTemplate.update("INSERT INTO REVISION(ENTITY_ID, VERSION, CLIENT_ID) VALUES(?, ?, ?)", entityId, version, clientId);
}
/**
* Test object to represent a model/entity that has a revision field.
*/
private static class RevisableEntity {
private String id;
private Revision revision;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Revision getRevision() {
return revision;
}
public void setRevision(Revision revision) {
this.revision = revision;
}
}
}