blob: bdd6b8226a777211fe379237909d519b4dca0c64 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.persistence.cq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cq.CQState;
import org.apache.iotdb.commons.cq.TimeoutPolicy;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@ThreadSafe
public class CQInfo implements SnapshotProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(CQInfo.class);
private static final String SNAPSHOT_FILENAME = "cq_info.snapshot";
private static final String CQ_NOT_EXIST_FORMAT = "CQ %s doesn't exist.";
private static final String MD5_NOT_MATCH_FORMAT = "MD5 of CQ %s doesn't match";
private final Map<String, CQEntry> cqMap;
private final ReadWriteLock lock;
public CQInfo() {
this.cqMap = new HashMap<>();
this.lock = new ReentrantReadWriteLock();
}
/**
* Add a new CQ only if there was no mapping for <tt>this cqId</tt>, otherwise ignore this
* operation.
*
* @return SUCCESS_STATUS if there was no mapping for <tt>this cqId</tt>, otherwise
* CQ_AlREADY_EXIST
*/
public TSStatus addCQ(AddCQPlan plan) {
TSStatus res = new TSStatus();
String cqId = plan.getReq().cqId;
lock.writeLock().lock();
try {
if (cqMap.containsKey(cqId)) {
res.code = TSStatusCode.CQ_ALREADY_EXIST.getStatusCode();
res.message = String.format("CQ %s has already been created.", cqId);
} else {
CQEntry cqEntry =
new CQEntry(
plan.getReq(),
plan.getMd5(),
plan.getFirstExecutionTime() - plan.getReq().everyInterval);
cqMap.put(cqId, cqEntry);
res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
return res;
} finally {
lock.writeLock().unlock();
}
}
/**
* Drop the CQ whose ID is same as <tt>cqId</tt> in plan.
*
* @return SUCCESS_STATUS if there is CQ whose ID and md5 is same as <tt>cqId</tt> in plan,
* otherwise NO_SUCH_CQ.
*/
public TSStatus dropCQ(DropCQPlan plan) {
TSStatus res = new TSStatus();
String cqId = plan.getCqId();
Optional<String> md5 = plan.getMd5();
lock.writeLock().lock();
try {
CQEntry cqEntry = cqMap.get(cqId);
if (cqEntry == null) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
LOGGER.warn("Drop CQ {} failed, because it doesn't exist.", cqId);
} else if ((md5.isPresent() && !md5.get().equals(cqEntry.md5))) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
LOGGER.warn("Drop CQ {} failed, because its MD5 doesn't match.", cqId);
} else {
cqMap.remove(cqId);
res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode();
LOGGER.info("Drop CQ {} successfully.", cqId);
}
return res;
} finally {
lock.writeLock().unlock();
}
}
public ShowCQResp showCQ() {
lock.readLock().lock();
try {
return new ShowCQResp(
new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList()));
} finally {
lock.readLock().unlock();
}
}
/**
* Change the state of cq to ACTIVE.
*
* @return Optional.empty() if there is no such cq, otherwise previous state of this cq.
*/
public TSStatus activeCQ(ActiveCQPlan plan) {
TSStatus res = new TSStatus();
String cqId = plan.getCqId();
String md5 = plan.getMd5();
lock.writeLock().lock();
try {
CQEntry cqEntry = cqMap.get(cqId);
if (cqEntry == null) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
} else if (!md5.equals(cqEntry.md5)) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
} else if (cqEntry.state == CQState.ACTIVE) {
res.code = TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode();
res.message = String.format("CQ %s has already been active", cqId);
} else {
cqEntry.state = CQState.ACTIVE;
res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
return res;
} finally {
lock.writeLock().unlock();
}
}
/**
* Update the last execution time of the corresponding CQ.
*
* @return SUCCESS_STATUS if successfully updated, or NO_SUCH_CQ if 1. the CQ doesn't exist; or 2.
* md5 is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original lastExecutionTime >=
* current lastExecutionTime;
*/
public TSStatus updateCQLastExecutionTime(UpdateCQLastExecTimePlan plan) {
TSStatus res = new TSStatus();
String cqId = plan.getCqId();
String md5 = plan.getMd5();
lock.writeLock().lock();
try {
CQEntry cqEntry = cqMap.get(cqId);
if (cqEntry == null) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
} else if (!md5.equals(cqEntry.md5)) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
} else if (cqEntry.lastExecutionTime >= plan.getExecutionTime()) {
res.code = TSStatusCode.CQ_UPDATE_LAST_EXEC_TIME_ERROR.getStatusCode();
res.message =
String.format(
"Update last execution time of CQ %s failed because its original last execution time(%d) is larger than the updated one(%d).",
cqId, cqEntry.lastExecutionTime, plan.getExecutionTime());
} else {
cqEntry.lastExecutionTime = plan.getExecutionTime();
res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
return res;
} finally {
lock.writeLock().unlock();
}
}
@Override
public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
if (snapshotFile.exists() && snapshotFile.isFile()) {
LOGGER.error(
"Failed to take snapshot of CQInfo, because snapshot file [{}] is already exist.",
snapshotFile.getAbsolutePath());
return false;
}
lock.readLock().lock();
try (FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile)) {
serialize(fileOutputStream);
fileOutputStream.getFD().sync();
return true;
} finally {
lock.readLock().unlock();
}
}
private void serialize(OutputStream stream) throws IOException {
ReadWriteIOUtils.write(cqMap.size(), stream);
for (CQEntry entry : cqMap.values()) {
entry.serialize(stream);
}
}
private void deserialize(InputStream stream) throws IOException {
int size = ReadWriteIOUtils.readInt(stream);
for (int i = 0; i < size; i++) {
CQEntry cqEntry = CQEntry.deserialize(stream);
cqMap.put(cqEntry.cqId, cqEntry);
}
}
@Override
public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
if (!snapshotFile.exists() || !snapshotFile.isFile()) {
LOGGER.error(
"Failed to load snapshot of CQInfo, snapshot file [{}] does not exist.",
snapshotFile.getAbsolutePath());
return;
}
lock.writeLock().lock();
try (FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
clear();
deserialize(fileInputStream);
} finally {
lock.writeLock().unlock();
}
}
private void clear() {
cqMap.clear();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CQInfo cqInfo = (CQInfo) o;
return Objects.equals(cqMap, cqInfo.cqMap);
}
@Override
public int hashCode() {
return Objects.hash(cqMap);
}
public static class CQEntry {
private final String cqId;
private final long everyInterval;
private final long boundaryTime;
private final long startTimeOffset;
private final long endTimeOffset;
private final TimeoutPolicy timeoutPolicy;
private final String queryBody;
private final String sql;
private final String md5;
private final String zoneId;
private final String username;
private CQState state;
private long lastExecutionTime;
private CQEntry(TCreateCQReq req, String md5, long lastExecutionTime) {
this(
req.cqId,
req.everyInterval,
req.boundaryTime,
req.startTimeOffset,
req.endTimeOffset,
TimeoutPolicy.deserialize(req.timeoutPolicy),
req.queryBody,
req.sql,
md5,
req.zoneId,
req.username,
CQState.INACTIVE,
lastExecutionTime);
}
private CQEntry(CQEntry other) {
this(
other.cqId,
other.everyInterval,
other.boundaryTime,
other.startTimeOffset,
other.endTimeOffset,
other.timeoutPolicy,
other.queryBody,
other.sql,
other.md5,
other.zoneId,
other.username,
other.state,
other.lastExecutionTime);
}
@SuppressWarnings("squid:S107")
private CQEntry(
String cqId,
long everyInterval,
long boundaryTime,
long startTimeOffset,
long endTimeOffset,
TimeoutPolicy timeoutPolicy,
String queryBody,
String sql,
String md5,
String zoneId,
String username,
CQState state,
long lastExecutionTime) {
this.cqId = cqId;
this.everyInterval = everyInterval;
this.boundaryTime = boundaryTime;
this.startTimeOffset = startTimeOffset;
this.endTimeOffset = endTimeOffset;
this.timeoutPolicy = timeoutPolicy;
this.queryBody = queryBody;
this.sql = sql;
this.md5 = md5;
this.zoneId = zoneId;
this.username = username;
this.state = state;
this.lastExecutionTime = lastExecutionTime;
}
private void serialize(OutputStream stream) throws IOException {
ReadWriteIOUtils.write(cqId, stream);
ReadWriteIOUtils.write(everyInterval, stream);
ReadWriteIOUtils.write(boundaryTime, stream);
ReadWriteIOUtils.write(startTimeOffset, stream);
ReadWriteIOUtils.write(endTimeOffset, stream);
ReadWriteIOUtils.write(timeoutPolicy.getType(), stream);
ReadWriteIOUtils.write(queryBody, stream);
ReadWriteIOUtils.write(sql, stream);
ReadWriteIOUtils.write(md5, stream);
ReadWriteIOUtils.write(zoneId, stream);
ReadWriteIOUtils.write(username, stream);
ReadWriteIOUtils.write(state.getType(), stream);
ReadWriteIOUtils.write(lastExecutionTime, stream);
}
private static CQEntry deserialize(InputStream stream) throws IOException {
String cqId = ReadWriteIOUtils.readString(stream);
long everyInterval = ReadWriteIOUtils.readLong(stream);
long boundaryTime = ReadWriteIOUtils.readLong(stream);
long startTimeOffset = ReadWriteIOUtils.readLong(stream);
long endTimeOffset = ReadWriteIOUtils.readLong(stream);
TimeoutPolicy timeoutPolicy = TimeoutPolicy.deserialize(ReadWriteIOUtils.readByte(stream));
String queryBody = ReadWriteIOUtils.readString(stream);
String sql = ReadWriteIOUtils.readString(stream);
String md5 = ReadWriteIOUtils.readString(stream);
String zoneId = ReadWriteIOUtils.readString(stream);
String username = ReadWriteIOUtils.readString(stream);
CQState state = CQState.deserialize(ReadWriteIOUtils.readByte(stream));
long lastExecutionTime = ReadWriteIOUtils.readLong(stream);
return new CQEntry(
cqId,
everyInterval,
boundaryTime,
startTimeOffset,
endTimeOffset,
timeoutPolicy,
queryBody,
sql,
md5,
zoneId,
username,
state,
lastExecutionTime);
}
public String getCqId() {
return cqId;
}
public long getEveryInterval() {
return everyInterval;
}
public long getBoundaryTime() {
return boundaryTime;
}
public long getStartTimeOffset() {
return startTimeOffset;
}
public long getEndTimeOffset() {
return endTimeOffset;
}
public TimeoutPolicy getTimeoutPolicy() {
return timeoutPolicy;
}
public String getQueryBody() {
return queryBody;
}
public String getSql() {
return sql;
}
public String getMd5() {
return md5;
}
public CQState getState() {
return state;
}
public long getLastExecutionTime() {
return lastExecutionTime;
}
public String getZoneId() {
return zoneId;
}
public String getUsername() {
return username;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CQEntry cqEntry = (CQEntry) o;
return everyInterval == cqEntry.everyInterval
&& boundaryTime == cqEntry.boundaryTime
&& startTimeOffset == cqEntry.startTimeOffset
&& endTimeOffset == cqEntry.endTimeOffset
&& lastExecutionTime == cqEntry.lastExecutionTime
&& Objects.equals(cqId, cqEntry.cqId)
&& timeoutPolicy == cqEntry.timeoutPolicy
&& Objects.equals(queryBody, cqEntry.queryBody)
&& Objects.equals(sql, cqEntry.sql)
&& Objects.equals(md5, cqEntry.md5)
&& Objects.equals(zoneId, cqEntry.zoneId)
&& Objects.equals(username, cqEntry.username)
&& state == cqEntry.state;
}
@Override
public int hashCode() {
return Objects.hash(
cqId,
everyInterval,
boundaryTime,
startTimeOffset,
endTimeOffset,
timeoutPolicy,
queryBody,
sql,
md5,
zoneId,
username,
state,
lastExecutionTime);
}
}
}