blob: c9dc58098cb4d884860922032ade85869c45128d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.compaction;
import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class CompactionWorker implements Runnable {
private static final Logger log = LoggerFactory.getLogger("COMPACTION");
private final int threadId;
private final FixedPriorityBlockingQueue<AbstractCompactionTask> compactionTaskQueue;
public CompactionWorker(
int threadId, FixedPriorityBlockingQueue<AbstractCompactionTask> compactionTaskQueue) {
this.threadId = threadId;
this.compactionTaskQueue = compactionTaskQueue;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
AbstractCompactionTask task = null;
try {
task = compactionTaskQueue.take();
} catch (InterruptedException e) {
log.warn("CompactionThread-{} terminates because interruption", threadId);
return;
}
if (task != null) {
// add metrics
CompactionMetricsRecorder.recordTaskInfo(
task, CompactionTaskStatus.POLL_FROM_QUEUE, compactionTaskQueue.size());
if (task.checkValidAndSetMerging()) {
CompactionTaskSummary summary = task.getSummary();
CompactionTaskFuture future = new CompactionTaskFuture(summary);
CompactionTaskManager.getInstance().recordTask(task, future);
task.start();
}
}
} catch (Throwable t) {
log.error("CompactionWorker.run(), Exception.", t);
}
}
}
static class CompactionTaskFuture implements Future<CompactionTaskSummary> {
CompactionTaskSummary summary;
public CompactionTaskFuture(CompactionTaskSummary summary) {
this.summary = summary;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
summary.cancel();
return true;
}
@Override
public boolean isCancelled() {
return summary.isCancel();
}
@Override
public boolean isDone() {
return summary.isFinished();
}
@Override
public CompactionTaskSummary get() throws InterruptedException, ExecutionException {
while (!summary.isFinished()) {
TimeUnit.MILLISECONDS.sleep(100);
}
return summary;
}
@Override
public CompactionTaskSummary get(long timeout, @NotNull TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
long perSleepTime = timeout < 100 ? timeout : 100;
long totalSleepTime = 0L;
while (!summary.isFinished()) {
if (totalSleepTime >= timeout) {
throw new TimeoutException("Timeout when trying to get compaction task summary");
}
unit.sleep(perSleepTime);
totalSleepTime += perSleepTime;
}
return summary;
}
}
}