blob: 0aff597ebdc217ebe57d7f32fb8b401df6479547 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.kv;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.DeletePolicy;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.DeletePolicyUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class CompactionService extends ServiceThread {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final CompactionStore compactionStore;
private final DefaultMessageStore defaultMessageStore;
private final CommitLog commitLog;
private final LinkedBlockingQueue<TopicPartitionOffset> compactionMsgQ = new LinkedBlockingQueue<>();
public CompactionService(CommitLog commitLog, DefaultMessageStore messageStore, CompactionStore compactionStore) {
this.commitLog = commitLog;
this.defaultMessageStore = messageStore;
this.compactionStore = compactionStore;
}
public void putRequest(DispatchRequest request) {
if (request == null) {
return;
}
String topic = request.getTopic();
Optional<TopicConfig> topicConfig = defaultMessageStore.getTopicConfig(topic);
DeletePolicy policy = DeletePolicyUtils.getDeletePolicy(topicConfig);
//check request topic flag
if (Objects.equals(policy, DeletePolicy.COMPACTION)) {
int queueId = request.getQueueId();
long physicalOffset = request.getCommitLogOffset();
TopicPartitionOffset tpo = new TopicPartitionOffset(topic, queueId, physicalOffset);
compactionMsgQ.offer(tpo);
this.wakeup();
} // else skip if message isn't compaction
}
public GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final int maxTotalMsgSize) {
return compactionStore.getMessage(group, topic, queueId, offset, maxMsgNums, maxTotalMsgSize);
}
@Override
public String getServiceName() {
if (defaultMessageStore != null && defaultMessageStore.getBrokerConfig().isInBrokerContainer()) {
return defaultMessageStore.getBrokerConfig().getLoggerIdentifier() + CompactionService.class.getSimpleName();
}
return CompactionService.class.getSimpleName();
}
@Override
public void run() {
while (!isStopped()) {
try {
TopicPartitionOffset tpo = compactionMsgQ.poll(1, TimeUnit.MILLISECONDS);
if (null != tpo) {
SelectMappedBufferResult smr = null;
try {
smr = commitLog.getData(tpo.physicalOffset);
if (smr != null) {
compactionStore.putMessage(tpo.topic, tpo.queueId, smr);
}
} catch (Exception e) {
log.error("putMessage into {}:{} compactionLog exception: ", tpo.topic, tpo.queueId, e);
} finally {
if (smr != null) {
smr.release();
}
}
} else {
waitForRunning(100);
}
} catch (InterruptedException e) {
log.error("poll from compaction pos queue interrupted.");
}
}
}
public boolean load(boolean exitOK) {
try {
compactionStore.load(exitOK);
return true;
} catch (Exception e) {
log.error("load compaction store error ", e);
return false;
}
}
// @Override
// public void start() {
// compactionStore.load();
// super.start();
// }
@Override
public void shutdown() {
super.shutdown();
compactionStore.shutdown();
}
public void updateMasterAddress(String addr) {
compactionStore.updateMasterAddress(addr);
}
static class TopicPartitionOffset {
String topic;
int queueId;
long physicalOffset;
public TopicPartitionOffset(final String topic, final int queueId, final long physicalOffset) {
this.topic = topic;
this.queueId = queueId;
this.physicalOffset = physicalOffset;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getQueueId() {
return queueId;
}
public void setQueueId(int queueId) {
this.queueId = queueId;
}
public long getPhysicalOffset() {
return physicalOffset;
}
public void setPhysicalOffset(long physicalOffset) {
this.physicalOffset = physicalOffset;
}
}
}