| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.confignode.procedure.store; |
| |
| import org.apache.iotdb.commons.conf.CommonDescriptor; |
| import org.apache.iotdb.commons.utils.TestOnly; |
| import org.apache.iotdb.confignode.procedure.Procedure; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.util.List; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| public class ProcedureStore implements IProcedureStore { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ProcedureStore.class); |
| private String procedureWalDir = |
| CommonDescriptor.getInstance().getConfig().getProcedureWalFolder(); |
| private final ConcurrentHashMap<Long, ProcedureWAL> procWALMap = new ConcurrentHashMap<>(); |
| public static final String PROCEDURE_WAL_SUFFIX = ".proc.wal"; |
| private final IProcedureFactory procedureFactory; |
| private volatile boolean isRunning = false; |
| |
| public ProcedureStore(IProcedureFactory procedureFactory) { |
| try { |
| this.procedureFactory = procedureFactory; |
| Files.createDirectories(Paths.get(procedureWalDir)); |
| } catch (IOException e) { |
| throw new RuntimeException("Create procedure wal directory failed.", e); |
| } |
| } |
| |
| @TestOnly |
| public ProcedureStore(String testWALDir, IProcedureFactory procedureFactory) { |
| this.procedureFactory = procedureFactory; |
| try { |
| Files.createDirectories(Paths.get(testWALDir)); |
| procedureWalDir = testWALDir; |
| } catch (IOException e) { |
| throw new RuntimeException("Create procedure wal directory failed.", e); |
| } |
| } |
| |
| public boolean isRunning() { |
| return this.isRunning; |
| } |
| |
| public void setRunning(boolean running) { |
| isRunning = running; |
| } |
| |
| /** |
| * Load procedure wal files into memory. |
| * |
| * @param procedureList procedureList |
| */ |
| public void load(List<Procedure> procedureList) { |
| try { |
| Files.list(Paths.get(procedureWalDir)) |
| .filter(path -> path.getFileName().toString().endsWith(PROCEDURE_WAL_SUFFIX)) |
| .sorted( |
| (p1, p2) -> |
| Long.compareUnsigned( |
| Long.parseLong(p1.getFileName().toString().split("\\.")[0]), |
| Long.parseLong(p2.getFileName().toString().split("\\.")[0]))) |
| .forEach( |
| path -> { |
| String fileName = path.getFileName().toString(); |
| long procId = Long.parseLong(fileName.split("\\.")[0]); |
| ProcedureWAL procedureWAL = |
| procWALMap.computeIfAbsent( |
| procId, id -> new ProcedureWAL(path, procedureFactory)); |
| procedureWAL.load(procedureList); |
| }); |
| } catch (IOException e) { |
| LOG.error("Load procedure wal failed.", e); |
| } |
| } |
| |
| /** |
| * Update procedure, roughly delete and create a new wal file. |
| * |
| * @param procedure procedure |
| */ |
| public void update(Procedure procedure) { |
| if (!procedure.needPersistance()) { |
| procWALMap.remove(procedure.getProcId()); |
| return; |
| } |
| long procId = procedure.getProcId(); |
| Path path = Paths.get(procedureWalDir, procId + ProcedureStore.PROCEDURE_WAL_SUFFIX); |
| ProcedureWAL procedureWAL = |
| procWALMap.computeIfAbsent(procId, id -> new ProcedureWAL(path, procedureFactory)); |
| try { |
| procedureWAL.save(procedure); |
| } catch (IOException e) { |
| LOG.error("Update Procedure (pid={}) wal failed", procedure.getProcId()); |
| } |
| } |
| |
| /** |
| * Batch update |
| * |
| * @param subprocs procedure array |
| */ |
| public void update(Procedure[] subprocs) { |
| for (Procedure subproc : subprocs) { |
| update(subproc); |
| } |
| } |
| |
| /** |
| * Delete procedure wal file |
| * |
| * @param procId procedure id |
| */ |
| public void delete(long procId) { |
| ProcedureWAL procedureWAL = procWALMap.get(procId); |
| if (procedureWAL != null) { |
| procedureWAL.delete(); |
| } |
| procWALMap.remove(procId); |
| } |
| |
| /** |
| * Batch delete |
| * |
| * @param childProcIds procedure id array |
| */ |
| public void delete(long[] childProcIds) { |
| for (long childProcId : childProcIds) { |
| delete(childProcId); |
| } |
| } |
| |
| /** |
| * Batch delete by index |
| * |
| * @param batchIds batchIds |
| * @param startIndex start index |
| * @param batchCount delete procedure count |
| */ |
| public void delete(long[] batchIds, int startIndex, int batchCount) { |
| for (int i = startIndex; i < batchCount; i++) { |
| delete(batchIds[i]); |
| } |
| } |
| |
| /** clean all the wal, used for unit test. */ |
| public void cleanup() { |
| try { |
| FileUtils.cleanDirectory(new File(procedureWalDir)); |
| } catch (IOException e) { |
| LOG.error("Clean wal directory failed", e); |
| } |
| } |
| |
| public void stop() { |
| isRunning = false; |
| } |
| |
| @Override |
| public void start() { |
| if (!isRunning) { |
| isRunning = true; |
| } |
| } |
| } |