| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ignite.internal; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.OutputStreamWriter; |
| import java.io.Writer; |
| import java.nio.channels.FileChannel; |
| import java.nio.channels.FileLock; |
| import java.nio.channels.OverlappingFileLockException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.locks.Lock; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.internal.util.GridStripedLock; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.marshaller.MarshallerContext; |
| |
| /** |
| * File-based persistence provider for {@link MarshallerContextImpl}. |
| * |
| * Saves mappings in format <b>{typeId}.classname{platformId}</b>, e.g. 123.classname0. |
| * |
| * It writes new mapping when it is accepted by all grid members and reads mapping |
| * when a classname is requested but is not presented in local cache of {@link MarshallerContextImpl}. |
| */ |
| final class MarshallerMappingFileStore { |
| /** File lock timeout in milliseconds. */ |
| private static final int FILE_LOCK_TIMEOUT_MS = 5000; |
| |
| /** */ |
| private static final GridStripedLock fileLock = new GridStripedLock(32); |
| |
| /** */ |
| private final IgniteLogger log; |
| |
| /** Marshaller mapping directory */ |
| private final File workDir; |
| |
| /** */ |
| private final String FILE_EXTENSION = ".classname"; |
| |
| /** |
| * @param igniteWorkDir Ignite work directory |
| * @param log Logger. |
| */ |
| MarshallerMappingFileStore(String igniteWorkDir, IgniteLogger log) throws IgniteCheckedException { |
| workDir = U.resolveWorkDirectory(igniteWorkDir, "marshaller", false); |
| this.log = log; |
| } |
| |
| /** |
| * Creates marshaller mapping file store with custom predefined work directory |
| * @param log logger. |
| * @param marshallerMappingFileStoreDir custom marshaller work directory |
| */ |
| MarshallerMappingFileStore(final IgniteLogger log, final File marshallerMappingFileStoreDir) { |
| this.workDir = marshallerMappingFileStoreDir; |
| this.log = log; |
| } |
| |
| /** |
| * @param platformId Platform id. |
| * @param typeId Type id. |
| * @param typeName Type name. |
| */ |
| void writeMapping(byte platformId, int typeId, String typeName) { |
| String fileName = getFileName(platformId, typeId); |
| |
| Lock lock = fileLock(fileName); |
| |
| lock.lock(); |
| |
| try { |
| File file = new File(workDir, fileName); |
| |
| try (FileOutputStream out = new FileOutputStream(file)) { |
| try (Writer writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) { |
| try (FileLock ignored = fileLock(out.getChannel(), false)) { |
| writer.write(typeName); |
| |
| writer.flush(); |
| } |
| } |
| } |
| catch (IOException e) { |
| U.error(log, "Failed to write class name to file [platformId=" + platformId + "id=" + typeId + |
| ", clsName=" + typeName + ", file=" + file.getAbsolutePath() + ']', e); |
| } |
| catch(OverlappingFileLockException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("File already locked (will ignore): " + file.getAbsolutePath()); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| U.error(log, "Interrupted while waiting for acquiring file lock: " + file, e); |
| } |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @param fileName File name. |
| */ |
| private String readMapping(String fileName) throws IgniteCheckedException { |
| ThreadLocalRandom rnd = null; |
| |
| Lock lock = fileLock(fileName); |
| |
| lock.lock(); |
| |
| try { |
| File file = new File(workDir, fileName); |
| |
| long time = 0; |
| |
| while (true) { |
| try (FileInputStream in = new FileInputStream(file)) { |
| try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) { |
| try (FileLock ignored = fileLock(in.getChannel(), true)) { |
| if (file.length() > 0) |
| return reader.readLine(); |
| |
| if (rnd == null) |
| rnd = ThreadLocalRandom.current(); |
| |
| if (time == 0) |
| time = U.currentTimeMillis(); |
| else if ((U.currentTimeMillis() - time) >= FILE_LOCK_TIMEOUT_MS) |
| return null; |
| |
| U.sleep(rnd.nextLong(50)); |
| } |
| } |
| } |
| catch (IOException ignored) { |
| return null; |
| } |
| } |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @param platformId Platform id. |
| * @param typeId Type id. |
| */ |
| String readMapping(byte platformId, int typeId) throws IgniteCheckedException { |
| return readMapping(getFileName(platformId, typeId)); |
| } |
| |
| /** |
| * Restores all mappings available in file system to marshaller context. |
| * This method should be used only on node startup. |
| * |
| * @param marshCtx Marshaller context to register mappings. |
| */ |
| void restoreMappings(MarshallerContext marshCtx) throws IgniteCheckedException { |
| for (File file : workDir.listFiles()) { |
| String name = file.getName(); |
| |
| byte platformId = getPlatformId(name); |
| |
| int typeId = getTypeId(name); |
| |
| String clsName = readMapping(name); |
| |
| if (clsName == null) { |
| throw new IgniteCheckedException("Class name is null for [platformId=" + platformId + |
| ", typeId=" + typeId + "], marshaller mappings storage is broken. " + |
| "Clean up marshaller directory (<work_dir>/marshaller) and restart the node. File name: " + name + |
| ", FileSize: " + file.length()); |
| } |
| |
| marshCtx.registerClassNameLocally(platformId, typeId, clsName); |
| } |
| } |
| |
| /** |
| * Checks if marshaller mapping for given [platformId, typeId] pair is already presented on disk. |
| * If so verifies that it is the same (if no {@link IgniteCheckedException} is thrown). |
| * If there is not such mapping writes it. |
| * |
| * @param platformId Platform id. |
| * @param typeId Type id. |
| * @param typeName Type name. |
| */ |
| void mergeAndWriteMapping(byte platformId, int typeId, String typeName) throws IgniteCheckedException { |
| String existingTypeName = readMapping(platformId, typeId); |
| |
| if (existingTypeName != null) { |
| if (!existingTypeName.equals(typeName)) |
| throw new IgniteCheckedException("Failed to merge new and existing marshaller mappings." + |
| " For [platformId=" + platformId + ", typeId=" + typeId + "]" + |
| " new typeName=" + typeName + ", existing typeName=" + existingTypeName + "." + |
| " Consider cleaning up persisted mappings from <workDir>/marshaller directory."); |
| } |
| else |
| writeMapping(platformId, typeId, typeName); |
| } |
| |
| /** |
| * @param fileName Name of file with marshaller mapping information. |
| * @throws IgniteCheckedException If file name format is broken. |
| */ |
| private byte getPlatformId(String fileName) throws IgniteCheckedException { |
| String lastSymbol = fileName.substring(fileName.length() - 1); |
| |
| byte platformId; |
| |
| try { |
| platformId = Byte.parseByte(lastSymbol); |
| } |
| catch (NumberFormatException e) { |
| throw new IgniteCheckedException("Reading marshaller mapping from file " |
| + fileName |
| + " failed; last symbol of file name is expected to be numeric.", e); |
| } |
| |
| return platformId; |
| } |
| |
| /** |
| * @param fileName Name of file with marshaller mapping information. |
| * @throws IgniteCheckedException If file name format is broken. |
| */ |
| private int getTypeId(String fileName) throws IgniteCheckedException { |
| int typeId; |
| |
| try { |
| typeId = Integer.parseInt(fileName.substring(0, fileName.indexOf(FILE_EXTENSION))); |
| } |
| catch (NumberFormatException e) { |
| throw new IgniteCheckedException("Reading marshaller mapping from file " |
| + fileName |
| + " failed; type ID is expected to be numeric.", e); |
| } |
| |
| return typeId; |
| } |
| |
| /** |
| * @param platformId Platform id. |
| * @param typeId Type id. |
| */ |
| private String getFileName(byte platformId, int typeId) { |
| return typeId + FILE_EXTENSION + platformId; |
| } |
| |
| /** |
| * @param fileName File name. |
| * @return Lock instance. |
| */ |
| private static Lock fileLock(String fileName) { |
| return fileLock.getLock(fileName.hashCode()); |
| } |
| |
| /** |
| * @param ch File channel. |
| * @param shared Shared. |
| */ |
| private static FileLock fileLock( |
| FileChannel ch, |
| boolean shared |
| ) throws IOException, IgniteInterruptedCheckedException { |
| ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
| |
| while (true) { |
| FileLock fileLock = ch.tryLock(0L, Long.MAX_VALUE, shared); |
| |
| if (fileLock != null) |
| return fileLock; |
| |
| U.sleep(rnd.nextLong(50)); |
| } |
| } |
| } |