blob: 48c7edd2c292b81ed3748960bbfa953661998cb6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.manager.pipe.extractor;
import org.apache.iotdb.commons.auth.user.LocalFileUserAccessor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.pipe.datastructure.queue.listening.AbstractPipeListeningQueue;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
import org.apache.iotdb.commons.pipe.event.SerializableEvent;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.UnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionSnapshotEvent;
import org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionWritePlanEvent;
import org.apache.iotdb.confignode.manager.pipe.event.PipeConfigSerializableEventType;
import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType;
import org.apache.iotdb.confignode.service.ConfigNode;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class ConfigRegionListeningQueue extends AbstractPipeListeningQueue
implements SnapshotProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRegionListeningQueue.class);
private static final String SNAPSHOT_FILE_NAME = "pipe_config_region_listening_queue.bin";
/////////////////////////////// Function ///////////////////////////////
public synchronized void tryListenToPlan(ConfigPhysicalPlan plan, boolean isGeneratedByPipe) {
if (ConfigRegionListeningFilter.shouldPlanBeListened(plan)) {
PipeConfigRegionWritePlanEvent event;
switch (plan.getType()) {
case PipeEnriched:
tryListenToPlan(((PipeEnrichedPlan) plan).getInnerPlan(), true);
return;
case UnsetTemplate:
// Different clusters have different template ids, so we need to
// convert template id to template name, in order to make the event
// in receiver side executable.
try {
event =
new PipeConfigRegionWritePlanEvent(
new PipeUnsetSchemaTemplatePlan(
ConfigNode.getInstance()
.getConfigManager()
.getClusterSchemaManager()
.getTemplate(((UnsetSchemaTemplatePlan) plan).getTemplateId())
.getName(),
((UnsetSchemaTemplatePlan) plan).getPath().getFullPath()),
isGeneratedByPipe);
} catch (MetadataException e) {
LOGGER.warn("Failed to collect UnsetTemplatePlan", e);
return;
}
break;
default:
event = new PipeConfigRegionWritePlanEvent(plan, isGeneratedByPipe);
}
tryListen(event);
}
}
public synchronized void tryListenToSnapshots(
List<Pair<Pair<Path, Path>, CNSnapshotFileType>> snapshotPathInfoList) {
List<PipeSnapshotEvent> events = new ArrayList<>();
for (Pair<Pair<Path, Path>, CNSnapshotFileType> snapshotPathInfo : snapshotPathInfoList) {
final Path snapshotPath = snapshotPathInfo.getLeft().getLeft();
final CNSnapshotFileType type = snapshotPathInfo.getRight();
// Filter empty and superuser snapshots
if (snapshotPath.toFile().length() == 0
|| type == CNSnapshotFileType.USER
&& snapshotPath
.toFile()
.getName()
.equals(AuthorityChecker.SUPER_USER + IoTDBConstant.PROFILE_SUFFIX)
|| type == CNSnapshotFileType.USER_ROLE
&& snapshotPath
.toFile()
.getName()
.equals(
AuthorityChecker.SUPER_USER
+ LocalFileUserAccessor.ROLE_SUFFIX
+ IoTDBConstant.PROFILE_SUFFIX)) {
continue;
}
final Path templateFilePath = snapshotPathInfo.getLeft().getRight();
events.add(
new PipeConfigRegionSnapshotEvent(
snapshotPath.toString(),
Objects.nonNull(templateFilePath) && templateFilePath.toFile().length() > 0
? templateFilePath.toString()
: null,
snapshotPathInfo.getRight()));
}
tryListen(events);
}
/////////////////////////////// Element Ser / De Method ////////////////////////////////
@Override
protected ByteBuffer serializeToByteBuffer(Event event) {
return ((SerializableEvent) event).serializeToByteBuffer();
}
@Override
protected Event deserializeFromByteBuffer(ByteBuffer byteBuffer) {
try {
SerializableEvent result = PipeConfigSerializableEventType.deserialize(byteBuffer);
// We assume the caller of this method will put the deserialize result into a queue,
// so we increase the reference count here.
((EnrichedEvent) result).increaseReferenceCount(ConfigRegionListeningQueue.class.getName());
return result;
} catch (IOException e) {
LOGGER.error("Failed to load snapshot from byteBuffer {}.", byteBuffer);
}
return null;
}
/////////////////////////////// Snapshot ///////////////////////////////
@Override
public synchronized boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
return super.serializeToFile(new File(snapshotDir, SNAPSHOT_FILE_NAME));
}
@Override
public synchronized void processLoadSnapshot(File snapshotDir) throws TException, IOException {
super.deserializeFromFile(new File(snapshotDir, SNAPSHOT_FILE_NAME));
}
}