| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.writelog.recover; |
| |
| import org.apache.iotdb.db.conf.IoTDBConstant; |
| import org.apache.iotdb.db.engine.memtable.IMemTable; |
| import org.apache.iotdb.db.engine.modification.Deletion; |
| import org.apache.iotdb.db.engine.modification.ModificationFile; |
| import org.apache.iotdb.db.engine.storagegroup.TsFileResource; |
| import org.apache.iotdb.db.exception.WriteProcessException; |
| import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; |
| import org.apache.iotdb.db.exception.metadata.MetadataException; |
| import org.apache.iotdb.db.exception.metadata.PathNotExistException; |
| import org.apache.iotdb.db.exception.query.QueryProcessException; |
| import org.apache.iotdb.db.metadata.PartialPath; |
| import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; |
| import org.apache.iotdb.db.qp.physical.PhysicalPlan; |
| import org.apache.iotdb.db.qp.physical.crud.DeletePlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; |
| import org.apache.iotdb.db.service.IoTDB; |
| import org.apache.iotdb.db.writelog.io.ILogReader; |
| import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; |
| import org.apache.iotdb.db.writelog.node.WriteLogNode; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.function.Supplier; |
| |
| /** |
| * LogReplayer finds the logNode of the TsFile given by insertFilePath and logNodePrefix, reads the |
| * WALs from the logNode and redoes them into a given MemTable and ModificationFile. |
| */ |
| public class LogReplayer { |
| |
| private Logger logger = LoggerFactory.getLogger(LogReplayer.class); |
| private String logNodePrefix; |
| private String insertFilePath; |
| private ModificationFile modFile; |
| private TsFileResource currentTsFileResource; |
| private IMemTable recoverMemTable; |
| |
| // only unsequence file tolerates duplicated data |
| private boolean sequence; |
| |
| private Map<String, Long> tempStartTimeMap = new HashMap<>(); |
| private Map<String, Long> tempEndTimeMap = new HashMap<>(); |
| |
| public LogReplayer( |
| String logNodePrefix, |
| String insertFilePath, |
| ModificationFile modFile, |
| TsFileResource currentTsFileResource, |
| IMemTable memTable, |
| boolean sequence) { |
| this.logNodePrefix = logNodePrefix; |
| this.insertFilePath = insertFilePath; |
| this.modFile = modFile; |
| this.currentTsFileResource = currentTsFileResource; |
| this.recoverMemTable = memTable; |
| this.sequence = sequence; |
| } |
| |
| /** |
| * finds the logNode of the TsFile given by insertFilePath and logNodePrefix, reads the WALs from |
| * the logNode and redoes them into a given MemTable and ModificationFile. |
| */ |
| public void replayLogs(Supplier<ByteBuffer[]> supplier) { |
| WriteLogNode logNode = |
| MultiFileLogNodeManager.getInstance() |
| .getNode( |
| logNodePrefix + FSFactoryProducer.getFSFactory().getFile(insertFilePath).getName(), |
| supplier); |
| |
| ILogReader logReader = logNode.getLogReader(); |
| try { |
| while (logReader.hasNext()) { |
| try { |
| PhysicalPlan plan = logReader.next(); |
| if (plan instanceof InsertPlan) { |
| replayInsert((InsertPlan) plan); |
| } else if (plan instanceof DeletePlan) { |
| replayDelete((DeletePlan) plan); |
| } |
| } catch (PathNotExistException ignored) { |
| // can not get path because it is deleted |
| } catch (Exception e) { |
| logger.warn("recover wal of {} failed", insertFilePath, e); |
| } |
| } |
| } catch (IOException e) { |
| logger.warn("meet error when redo wal of {}", insertFilePath, e); |
| } finally { |
| logReader.close(); |
| try { |
| modFile.close(); |
| } catch (IOException e) { |
| logger.error("Cannot close the modifications file {}", modFile.getFilePath(), e); |
| } |
| } |
| tempStartTimeMap.forEach((k, v) -> currentTsFileResource.updateStartTime(k, v)); |
| tempEndTimeMap.forEach((k, v) -> currentTsFileResource.updateEndTime(k, v)); |
| } |
| |
| private void replayDelete(DeletePlan deletePlan) throws IOException, MetadataException { |
| List<PartialPath> paths = deletePlan.getPaths(); |
| for (PartialPath path : paths) { |
| for (PartialPath device : IoTDB.metaManager.getDevices(path.getDevicePath())) { |
| recoverMemTable.delete( |
| path, device, deletePlan.getDeleteStartTime(), deletePlan.getDeleteEndTime()); |
| } |
| modFile.write( |
| new Deletion( |
| path, |
| currentTsFileResource.getTsFileSize(), |
| deletePlan.getDeleteStartTime(), |
| deletePlan.getDeleteEndTime())); |
| } |
| } |
| |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| private void replayInsert(InsertPlan plan) throws WriteProcessException, QueryProcessException { |
| if (currentTsFileResource != null) { |
| long minTime, maxTime; |
| if (plan instanceof InsertRowPlan) { |
| minTime = ((InsertRowPlan) plan).getTime(); |
| maxTime = ((InsertRowPlan) plan).getTime(); |
| } else { |
| minTime = ((InsertTabletPlan) plan).getMinTime(); |
| maxTime = ((InsertTabletPlan) plan).getMaxTime(); |
| } |
| // the last chunk group may contain the same data with the logs, ignore such logs in seq file |
| long lastEndTime = currentTsFileResource.getEndTime(plan.getPrefixPath().getFullPath()); |
| if (lastEndTime != Long.MIN_VALUE && lastEndTime >= minTime && sequence) { |
| return; |
| } |
| Long startTime = tempStartTimeMap.get(plan.getPrefixPath().getFullPath()); |
| if (startTime == null || startTime > minTime) { |
| tempStartTimeMap.put(plan.getPrefixPath().getFullPath(), minTime); |
| } |
| Long endTime = tempEndTimeMap.get(plan.getPrefixPath().getFullPath()); |
| if (endTime == null || endTime < maxTime) { |
| tempEndTimeMap.put(plan.getPrefixPath().getFullPath(), maxTime); |
| } |
| } |
| IMeasurementMNode[] mNodes; |
| try { |
| mNodes = IoTDB.metaManager.getMNodes(plan.getPrefixPath(), plan.getMeasurements()); |
| } catch (MetadataException e) { |
| throw new QueryProcessException(e); |
| } |
| // set measurementMNodes, WAL already serializes the real data type, so no need to infer type |
| plan.setMeasurementMNodes(mNodes); |
| // mark failed plan manually |
| checkDataTypeAndMarkFailed(mNodes, plan); |
| if (plan instanceof InsertRowPlan) { |
| recoverMemTable.insert((InsertRowPlan) plan); |
| } else { |
| recoverMemTable.insertTablet( |
| (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount()); |
| } |
| } |
| |
| private void checkDataTypeAndMarkFailed(final IMeasurementMNode[] mNodes, InsertPlan tPlan) { |
| int columnIndex = 0; |
| for (int i = 0; i < mNodes.length; i++) { |
| if (mNodes[i] == null) { |
| tPlan.markFailedMeasurementInsertion( |
| i, |
| new PathNotExistException( |
| tPlan.getPrefixPath().getFullPath() |
| + IoTDBConstant.PATH_SEPARATOR |
| + tPlan.getMeasurements()[i])); |
| columnIndex++; |
| } else if (tPlan.isAligned()) { |
| List<TSDataType> datatypes = mNodes[i].getSchema().getValueTSDataTypeList(); |
| for (int j = 0; j < datatypes.size(); j++) { |
| if (tPlan.getDataTypes()[columnIndex] == null) { |
| tPlan.getDataTypes()[columnIndex] = datatypes.get(j); |
| } else if (datatypes.get(j) != tPlan.getDataTypes()[columnIndex]) { |
| tPlan.markFailedMeasurementInsertion( |
| i, |
| new DataTypeMismatchException( |
| mNodes[i].getSchema().getValueMeasurementIdList().get(j), |
| tPlan.getDataTypes()[columnIndex], |
| datatypes.get(j))); |
| } |
| columnIndex++; |
| } |
| } else if (mNodes[i].getSchema().getType() != tPlan.getDataTypes()[columnIndex]) { |
| tPlan.markFailedMeasurementInsertion( |
| i, |
| new DataTypeMismatchException( |
| mNodes[i].getName(), |
| tPlan.getDataTypes()[columnIndex], |
| mNodes[i].getSchema().getType())); |
| columnIndex++; |
| } else { |
| columnIndex++; |
| } |
| } |
| } |
| } |