blob: 4d32790c259e803fc1210dc35eecf9ded9de140f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.db;
import org.apache.nifi.registry.db.entity.BucketEntity;
import org.apache.nifi.registry.db.entity.BucketItemEntity;
import org.apache.nifi.registry.db.entity.BucketItemEntityType;
import org.apache.nifi.registry.db.entity.FlowEntity;
import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
import org.apache.nifi.registry.db.mapper.BucketEntityRowMapper;
import org.apache.nifi.registry.db.mapper.BucketItemEntityRowMapper;
import org.apache.nifi.registry.db.mapper.FlowEntityRowMapper;
import org.apache.nifi.registry.db.mapper.FlowSnapshotEntityRowMapper;
import org.apache.nifi.registry.service.MetadataService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Repository
public class DatabaseMetadataService implements MetadataService {
private final JdbcTemplate jdbcTemplate;
@Autowired
public DatabaseMetadataService(final JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
//----------------- Buckets ---------------------------------
@Override
public BucketEntity createBucket(final BucketEntity b) {
final String sql = "INSERT INTO bucket (ID, NAME, DESCRIPTION, CREATED) VALUES (?, ?, ?, ?)";
jdbcTemplate.update(sql, b.getId(), b.getName(), b.getDescription(), b.getCreated());
return b;
}
@Override
public BucketEntity getBucketById(final String bucketIdentifier) {
final String sql = "SELECT * FROM bucket WHERE id = ?";
try {
return jdbcTemplate.queryForObject(sql, new BucketEntityRowMapper(), bucketIdentifier);
} catch (EmptyResultDataAccessException e) {
return null;
}
}
@Override
public List<BucketEntity> getBucketsByName(final String name) {
final String sql = "SELECT * FROM bucket WHERE name = ? ORDER BY name ASC";
return jdbcTemplate.query(sql, new Object[] {name} , new BucketEntityRowMapper());
}
@Override
public BucketEntity updateBucket(final BucketEntity bucket) {
final String sql = "UPDATE bucket SET name = ?, description = ? WHERE id = ?";
jdbcTemplate.update(sql, bucket.getName(), bucket.getDescription(), bucket.getId());
return bucket;
}
@Override
public void deleteBucket(final BucketEntity bucket) {
final String snapshotDeleteSql = "DELETE FROM flow_snapshot WHERE flow_id IN ( " +
"SELECT f.id FROM flow f, bucket_item item WHERE f.id = item.id AND item.bucket_id = ?" +
")";
jdbcTemplate.update(snapshotDeleteSql, bucket.getId());
final String flowDeleteSql = "DELETE FROM flow WHERE id IN ( " +
"SELECT f.id FROM flow f, bucket_item item WHERE f.id = item.id AND item.bucket_id = ?" +
")";
jdbcTemplate.update(flowDeleteSql, bucket.getId());
final String itemDeleteSql = "DELETE FROM bucket_item WHERE bucket_id = ?";
jdbcTemplate.update(itemDeleteSql, bucket.getId());
final String sql = "DELETE FROM bucket WHERE id = ?";
jdbcTemplate.update(sql, bucket.getId());
}
@Override
public List<BucketEntity> getBuckets(final Set<String> bucketIds) {
if (bucketIds == null || bucketIds.isEmpty()) {
return Collections.emptyList();
}
final StringBuilder sqlBuilder = new StringBuilder("SELECT * FROM bucket WHERE id IN (");
for (int i=0; i < bucketIds.size(); i++) {
if (i > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append("?");
}
sqlBuilder.append(") ");
sqlBuilder.append("ORDER BY name ASC");
return jdbcTemplate.query(sqlBuilder.toString(), bucketIds.toArray(), new BucketEntityRowMapper());
}
@Override
public List<BucketEntity> getAllBuckets() {
final String sql = "SELECT * FROM bucket ORDER BY name ASC";
return jdbcTemplate.query(sql, new BucketEntityRowMapper());
}
//----------------- BucketItems ---------------------------------
@Override
public List<BucketItemEntity> getBucketItems(final String bucketIdentifier) {
final String sql =
"SELECT " +
"item.id as ID, " +
"item.name as NAME, " +
"item.description as DESCRIPTION, " +
"item.created as CREATED, " +
"item.modified as MODIFIED, " +
"item.item_type as ITEM_TYPE, " +
"b.id as BUCKET_ID, " +
"b.name as BUCKET_NAME " +
"FROM " +
"bucket_item item, bucket b " +
"WHERE " +
"item.bucket_id = b.id " +
"AND " +
"item.bucket_id = ?";
final List<BucketItemEntity> items = jdbcTemplate.query(sql, new Object[] { bucketIdentifier }, new BucketItemEntityRowMapper());
return getItemsWithCounts(items);
}
@Override
public List<BucketItemEntity> getBucketItems(final Set<String> bucketIds) {
if (bucketIds == null || bucketIds.isEmpty()) {
return Collections.emptyList();
}
final StringBuilder sqlBuilder = new StringBuilder(
"SELECT " +
"item.id as ID, " +
"item.name as NAME, " +
"item.description as DESCRIPTION, " +
"item.created as CREATED, " +
"item.modified as MODIFIED, " +
"item.item_type as ITEM_TYPE, " +
"b.id as BUCKET_ID, " +
"b.name as BUCKET_NAME " +
"FROM " +
"bucket_item item, bucket b " +
"WHERE " +
"item.bucket_id = b.id " +
"AND " +
"item.bucket_id IN (");
for (int i=0; i < bucketIds.size(); i++) {
if (i > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append("?");
}
sqlBuilder.append(")");
final List<BucketItemEntity> items = jdbcTemplate.query(sqlBuilder.toString(), bucketIds.toArray(), new BucketItemEntityRowMapper());
return getItemsWithCounts(items);
}
private List<BucketItemEntity> getItemsWithCounts(final Iterable<BucketItemEntity> items) {
final Map<String,Long> snapshotCounts = getFlowSnapshotCounts();
final List<BucketItemEntity> itemWithCounts = new ArrayList<>();
for (final BucketItemEntity item : items) {
if (item.getType() == BucketItemEntityType.FLOW) {
final Long snapshotCount = snapshotCounts.get(item.getId());
if (snapshotCount != null) {
final FlowEntity flowEntity = (FlowEntity) item;
flowEntity.setSnapshotCount(snapshotCount);
}
}
itemWithCounts.add(item);
}
return itemWithCounts;
}
private Map<String,Long> getFlowSnapshotCounts() {
final String sql = "SELECT flow_id, count(*) FROM flow_snapshot GROUP BY flow_id";
final Map<String,Long> results = new HashMap<>();
jdbcTemplate.query(sql, (rs) -> {
results.put(rs.getString(1), rs.getLong(2));
});
return results;
}
private Long getFlowSnapshotCount(final String flowIdentifier) {
final String sql = "SELECT count(*) FROM flow_snapshot WHERE flow_id = ?";
return jdbcTemplate.queryForObject(sql, new Object[] {flowIdentifier}, (rs, num) -> {
return rs.getLong(1);
});
}
//----------------- Flows ---------------------------------
@Override
public FlowEntity createFlow(final FlowEntity flow) {
final String itemSql = "INSERT INTO bucket_item (ID, NAME, DESCRIPTION, CREATED, MODIFIED, ITEM_TYPE, BUCKET_ID) VALUES (?, ?, ?, ?, ?, ?, ?)";
jdbcTemplate.update(itemSql,
flow.getId(),
flow.getName(),
flow.getDescription(),
flow.getCreated(),
flow.getModified(),
flow.getType().toString(),
flow.getBucketId());
final String flowSql = "INSERT INTO flow (ID) VALUES (?)";
jdbcTemplate.update(flowSql, flow.getId());
return flow;
}
@Override
public FlowEntity getFlowById(final String flowIdentifier) {
final String sql = "SELECT * FROM flow f, bucket_item item WHERE f.id = ? AND item.id = f.id";
try {
return jdbcTemplate.queryForObject(sql, new FlowEntityRowMapper(), flowIdentifier);
} catch (EmptyResultDataAccessException e) {
return null;
}
}
@Override
public FlowEntity getFlowByIdWithSnapshotCounts(final String flowIdentifier) {
final FlowEntity flowEntity = getFlowById(flowIdentifier);
if (flowEntity == null) {
return flowEntity;
}
final Long snapshotCount = getFlowSnapshotCount(flowIdentifier);
if (snapshotCount != null) {
flowEntity.setSnapshotCount(snapshotCount);
}
return flowEntity;
}
@Override
public List<FlowEntity> getFlowsByName(final String name) {
final String sql = "SELECT * FROM flow f, bucket_item item WHERE item.name = ? AND item.id = f.id";
return jdbcTemplate.query(sql, new Object[] {name}, new FlowEntityRowMapper());
}
@Override
public List<FlowEntity> getFlowsByName(final String bucketIdentifier, final String name) {
final String sql = "SELECT * FROM flow f, bucket_item item WHERE item.name = ? AND item.id = f.id AND item.bucket_id = ?";
return jdbcTemplate.query(sql, new Object[] {name, bucketIdentifier}, new FlowEntityRowMapper());
}
@Override
public List<FlowEntity> getFlowsByBucket(final String bucketIdentifier) {
final String sql = "SELECT * FROM flow f, bucket_item item WHERE item.bucket_id = ? AND item.id = f.id";
final List<FlowEntity> flows = jdbcTemplate.query(sql, new Object[] {bucketIdentifier}, new FlowEntityRowMapper());
final Map<String,Long> snapshotCounts = getFlowSnapshotCounts();
for (final FlowEntity flowEntity : flows) {
final Long snapshotCount = snapshotCounts.get(flowEntity.getId());
if (snapshotCount != null) {
flowEntity.setSnapshotCount(snapshotCount);
}
}
return flows;
}
@Override
public FlowEntity updateFlow(final FlowEntity flow) {
flow.setModified(new Date());
final String sql = "UPDATE bucket_item SET name = ?, description = ?, modified = ? WHERE id = ?";
jdbcTemplate.update(sql, flow.getName(), flow.getDescription(), flow.getModified(), flow.getId());
return flow;
}
@Override
public void deleteFlow(final FlowEntity flow) {
final String snapshotDeleteSql = "DELETE FROM flow_snapshot WHERE flow_id = ?";
jdbcTemplate.update(snapshotDeleteSql, flow.getId());
final String flowDeleteSql = "DELETE FROM flow WHERE id = ?";
jdbcTemplate.update(flowDeleteSql, flow.getId());
final String itemDeleteSql = "DELETE FROM bucket_item WHERE id = ?";
jdbcTemplate.update(itemDeleteSql, flow.getId());
}
//----------------- Flow Snapshots ---------------------------------
@Override
public FlowSnapshotEntity createFlowSnapshot(final FlowSnapshotEntity flowSnapshot) {
final String sql = "INSERT INTO flow_snapshot (FLOW_ID, VERSION, CREATED, CREATED_BY, COMMENTS) VALUES (?, ?, ?, ?, ?)";
jdbcTemplate.update(sql,
flowSnapshot.getFlowId(),
flowSnapshot.getVersion(),
flowSnapshot.getCreated(),
flowSnapshot.getCreatedBy(),
flowSnapshot.getComments());
return flowSnapshot;
}
@Override
public FlowSnapshotEntity getFlowSnapshot(final String flowIdentifier, final Integer version) {
final String sql =
"SELECT " +
"fs.flow_id, " +
"fs.version, " +
"fs.created, " +
"fs.created_by, " +
"fs.comments " +
"FROM " +
"flow_snapshot fs, " +
"flow f, " +
"bucket_item item " +
"WHERE " +
"item.id = f.id AND " +
"f.id = ? AND " +
"f.id = fs.flow_id AND " +
"fs.version = ?";
try {
return jdbcTemplate.queryForObject(sql, new FlowSnapshotEntityRowMapper(),
flowIdentifier, version);
} catch (EmptyResultDataAccessException e) {
return null;
}
}
@Override
public FlowSnapshotEntity getLatestSnapshot(final String flowIdentifier) {
final String sql = "SELECT * FROM flow_snapshot WHERE flow_id = ? ORDER BY version DESC LIMIT 1";
try {
return jdbcTemplate.queryForObject(sql, new FlowSnapshotEntityRowMapper(), flowIdentifier);
} catch (EmptyResultDataAccessException e) {
return null;
}
}
@Override
public List<FlowSnapshotEntity> getSnapshots(final String flowIdentifier) {
final String sql =
"SELECT " +
"fs.flow_id, " +
"fs.version, " +
"fs.created, " +
"fs.created_by, " +
"fs.comments " +
"FROM " +
"flow_snapshot fs, " +
"flow f, " +
"bucket_item item " +
"WHERE " +
"item.id = f.id AND " +
"f.id = ? AND " +
"f.id = fs.flow_id";
final Object[] args = new Object[] { flowIdentifier };
return jdbcTemplate.query(sql, args, new FlowSnapshotEntityRowMapper());
}
@Override
public void deleteFlowSnapshot(final FlowSnapshotEntity flowSnapshot) {
final String sql = "DELETE FROM flow_snapshot WHERE flow_id = ? AND version = ?";
jdbcTemplate.update(sql, flowSnapshot.getFlowId(), flowSnapshot.getVersion());
}
//----------------- BucketItems ---------------------------------
@Override
public Set<String> getBucketFields() {
final Set<String> fields = new LinkedHashSet<>();
fields.add("ID");
fields.add("NAME");
fields.add("DESCRIPTION");
fields.add("CREATED");
return fields;
}
@Override
public Set<String> getBucketItemFields() {
final Set<String> fields = new LinkedHashSet<>();
fields.add("ID");
fields.add("NAME");
fields.add("DESCRIPTION");
fields.add("CREATED");
fields.add("MODIFIED");
fields.add("ITEM_TYPE");
fields.add("BUCKET_ID");
return fields;
}
@Override
public Set<String> getFlowFields() {
final Set<String> fields = new LinkedHashSet<>();
fields.add("ID");
fields.add("NAME");
fields.add("DESCRIPTION");
fields.add("CREATED");
fields.add("MODIFIED");
fields.add("ITEM_TYPE");
fields.add("BUCKET_ID");
return fields;
}
}