blob: 65e3757d4f6388047e626762a930dc5132173e99 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.metadata.store.jdbc.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.common.function.ThrowableConsumer2;
import org.apache.eagle.common.function.ThrowableFunction;
import org.apache.eagle.metadata.exceptions.EntityNotFoundException;
import org.apache.eagle.metadata.model.PolicyEntity;
import org.apache.eagle.metadata.service.PolicyEntityService;
import org.apache.eagle.metadata.store.jdbc.JDBCMetadataQueryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
public class PolicyEntityServiceJDBCImpl implements PolicyEntityService {
private static final Logger LOGGER = LoggerFactory.getLogger(PolicyEntityServiceJDBCImpl.class);
private static final String selectSql = "SELECT * FROM policy_prototype";
private static final String queryByUUID = "SELECT * FROM policy_prototype where uuid = '%s'";
private static final String deleteSqlByUUID = "DELETE FROM policy_prototype where uuid = '%s'";
private static final String updateSqlByUUID = "UPDATE policy_prototype SET name = ?, definition = ? , alertPublisherIds = ? , createdtime = ? , modifiedtime = ? where uuid = ?";
private static final String insertSql = "INSERT INTO policy_prototype (name, definition, alertPublisherIds, createdtime, modifiedtime, uuid) VALUES (?, ?, ?, ?, ?, ?)";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Inject
JDBCMetadataQueryService queryService;
@Override
public Collection<PolicyEntity> getAllPolicyProto() {
try {
return queryService.query(selectSql, policyEntityMapper);
} catch (Exception e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
}
@Override
public PolicyEntity getPolicyProtoByUUID(String uuid) {
Preconditions.checkNotNull(uuid, "uuid should not be null");
try {
return queryService.query(String.format(queryByUUID, uuid), policyEntityMapper).stream()
.findAny().orElseThrow(() -> new EntityNotFoundException("policyProto is not found by uuid"));
} catch (Exception e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
}
@Override
public boolean deletePolicyProtoByUUID(String uuid) {
String sql = String.format(deleteSqlByUUID, uuid);
try {
return queryService.execute(sql);
} catch (Exception e) {
LOGGER.error("Error to execute {}: {}", sql, e);
throw new IllegalArgumentException("SQL execution error: " + e.getMessage(), e);
}
}
@Override
public PolicyEntity update(PolicyEntity policyProto) {
Preconditions.checkNotNull(policyProto, "Entity should not be null");
Preconditions.checkNotNull(policyProto.getUuid(), "uuid should not be null");
PolicyEntity current = getPolicyProtoByUUID(policyProto.getUuid());
if (policyProto.getName() != null) {
current.setName(policyProto.getName());
}
if (policyProto.getAlertPublishmentIds() != null) {
current.setAlertPublishmentIds(policyProto.getAlertPublishmentIds());
}
if (policyProto.getDefinition() != null) {
current.setDefinition(policyProto.getDefinition());
}
current.ensureDefault();
try {
if (!queryService.execute(updateSqlByUUID, current, policyEntityWriter)) {
throw new IllegalArgumentException("Failed to update policyProto");
}
} catch (SQLException e) {
LOGGER.error("Error to execute {}: {}", updateSqlByUUID, policyProto, e);
throw new IllegalArgumentException("SQL execution error: " + e.getMessage(), e);
}
return current;
}
@Override
public PolicyEntity create(PolicyEntity entity) {
Preconditions.checkNotNull(entity, "PolicyEntity should not be null");
entity.ensureDefault();
try {
int retCode = queryService.insert(insertSql, Collections.singletonList(entity), policyEntityWriter);
if (retCode > 0) {
return entity;
} else {
throw new SQLException("Insertion result: " + retCode);
}
} catch (SQLException e) {
LOGGER.error("Error to insert entity {} (entity: {}): {}", insertSql, entity.toString(), e.getMessage(), e);
throw new IllegalArgumentException("SQL execution error:" + e.getMessage(), e);
}
}
private ThrowableFunction<ResultSet, PolicyEntity, SQLException> policyEntityMapper = resultSet -> {
PolicyEntity entity = new PolicyEntity();
entity.setName(resultSet.getString("name"));
entity.setUuid(resultSet.getString("uuid"));
String policyStr = resultSet.getString("definition");
if (policyStr != null) {
try {
PolicyDefinition policyDefinition = OBJECT_MAPPER.readValue(policyStr, PolicyDefinition.class);
entity.setDefinition(policyDefinition);
} catch (Exception e) {
throw new SQLException("Error to deserialize JSON as {}", PolicyDefinition.class.getCanonicalName(), e);
}
}
String list = resultSet.getString("alertPublisherIds");
if (list != null) {
try {
List<String> alertPublisherIds = OBJECT_MAPPER.readValue(list, List.class);
entity.setAlertPublishmentIds(alertPublisherIds);
} catch (Exception e) {
throw new SQLException("Error to deserialize JSON as AlertPublisherIds list", e);
}
}
entity.setCreatedTime(resultSet.getLong("createdtime"));
entity.setModifiedTime(resultSet.getLong("modifiedtime"));
return entity;
};
private ThrowableConsumer2<PreparedStatement, PolicyEntity, SQLException> policyEntityWriter = (statement, policyEntity) -> {
policyEntity.ensureDefault();
statement.setString(1, policyEntity.getName());
if (policyEntity.getDefinition() != null) {
try {
statement.setString(2, OBJECT_MAPPER.writeValueAsString(policyEntity.getDefinition()));
} catch (Exception e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
}
if (policyEntity.getAlertPublishmentIds() != null) {
try {
statement.setString(3, OBJECT_MAPPER.writeValueAsString(policyEntity.getAlertPublishmentIds()));
} catch (Exception e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
}
statement.setLong(4, policyEntity.getCreatedTime());
statement.setLong(5, policyEntity.getModifiedTime());
statement.setString(6, policyEntity.getUuid());
};
}