| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.cassandra.db.commitlog; |
| |
| import java.io.DataInput; |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Collections; |
| import java.util.Map; |
| import java.util.TreeMap; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import java.util.zip.CRC32; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Objects; |
| |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.config.ParameterizedClass; |
| import org.apache.cassandra.exceptions.ConfigurationException; |
| import org.apache.cassandra.io.FSReadError; |
| import org.apache.cassandra.io.util.File; |
| import org.apache.cassandra.io.util.FileInputStreamPlus; |
| import org.apache.cassandra.net.MessagingService; |
| import org.apache.cassandra.security.EncryptionContext; |
| import org.apache.cassandra.utils.JsonUtils; |
| |
| import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; |
| |
| public class CommitLogDescriptor |
| { |
| private static final String SEPARATOR = "-"; |
| private static final String FILENAME_PREFIX = "CommitLog" + SEPARATOR; |
| private static final String FILENAME_EXTENSION = ".log"; |
| private static final String INDEX_FILENAME_SUFFIX = "_cdc.idx"; |
| // match both legacy and new version of commitlogs Ex: CommitLog-12345.log and CommitLog-4-12345.log. |
| private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION); |
| |
| static final String COMPRESSION_PARAMETERS_KEY = "compressionParameters"; |
| static final String COMPRESSION_CLASS_KEY = "compressionClass"; |
| |
| // We don't support anything pre-3.0 |
| public static final int VERSION_30 = 6; |
| public static final int VERSION_40 = 7; |
| |
| /** |
| * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes. |
| * Note: make sure to handle {@link #getMessagingVersion()} |
| */ |
| @VisibleForTesting |
| public static final int current_version = VERSION_40; |
| |
| final int version; |
| public final long id; |
| public final ParameterizedClass compression; |
| private final EncryptionContext encryptionContext; |
| |
| public CommitLogDescriptor(int version, long id, ParameterizedClass compression, EncryptionContext encryptionContext) |
| { |
| this.version = version; |
| this.id = id; |
| this.compression = compression; |
| this.encryptionContext = encryptionContext; |
| } |
| |
| public CommitLogDescriptor(long id, ParameterizedClass compression, EncryptionContext encryptionContext) |
| { |
| this(current_version, id, compression, encryptionContext); |
| } |
| |
| public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor) |
| { |
| writeHeader(out, descriptor, Collections.<String, String>emptyMap()); |
| } |
| |
| /** |
| * @param additionalHeaders Allow segments to pass custom header data |
| */ |
| public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor, Map<String, String> additionalHeaders) |
| { |
| CRC32 crc = new CRC32(); |
| out.putInt(descriptor.version); |
| updateChecksumInt(crc, descriptor.version); |
| out.putLong(descriptor.id); |
| updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL)); |
| updateChecksumInt(crc, (int) (descriptor.id >>> 32)); |
| String parametersString = constructParametersString(descriptor.compression, descriptor.encryptionContext, additionalHeaders); |
| byte[] parametersBytes = parametersString.getBytes(StandardCharsets.UTF_8); |
| if (parametersBytes.length != (((short) parametersBytes.length) & 0xFFFF)) |
| throw new ConfigurationException(String.format("Compression parameters too long, length %d cannot be above 65535.", |
| parametersBytes.length)); |
| out.putShort((short) parametersBytes.length); |
| updateChecksumInt(crc, parametersBytes.length); |
| out.put(parametersBytes); |
| crc.update(parametersBytes, 0, parametersBytes.length); |
| out.putInt((int) crc.getValue()); |
| } |
| |
| @VisibleForTesting |
| static String constructParametersString(ParameterizedClass compression, EncryptionContext encryptionContext, Map<String, String> additionalHeaders) |
| { |
| Map<String, Object> params = new TreeMap<>(); |
| if (compression != null) |
| { |
| params.put(COMPRESSION_PARAMETERS_KEY, compression.parameters); |
| params.put(COMPRESSION_CLASS_KEY, compression.class_name); |
| } |
| if (encryptionContext != null) |
| params.putAll(encryptionContext.toHeaderParameters()); |
| params.putAll(additionalHeaders); |
| return JsonUtils.writeAsJsonString(params); |
| } |
| |
| public static CommitLogDescriptor fromHeader(File file, EncryptionContext encryptionContext) |
| { |
| try (FileInputStreamPlus raf = new FileInputStreamPlus(file)) |
| { |
| return readHeader(raf, encryptionContext); |
| } |
| catch (EOFException e) |
| { |
| throw new RuntimeException(e); |
| } |
| catch (IOException e) |
| { |
| throw new FSReadError(e, file); |
| } |
| } |
| |
| public static CommitLogDescriptor readHeader(DataInput input, EncryptionContext encryptionContext) throws IOException |
| { |
| CRC32 checkcrc = new CRC32(); |
| int version = input.readInt(); |
| if (version < VERSION_30) |
| throw new IllegalArgumentException("Unsupported pre-3.0 commit log found; cannot read."); |
| |
| updateChecksumInt(checkcrc, version); |
| long id = input.readLong(); |
| updateChecksumInt(checkcrc, (int) (id & 0xFFFFFFFFL)); |
| updateChecksumInt(checkcrc, (int) (id >>> 32)); |
| int parametersLength = input.readShort() & 0xFFFF; |
| updateChecksumInt(checkcrc, parametersLength); |
| // This should always succeed as parametersLength cannot be too long even for a |
| // corrupt segment file. |
| byte[] parametersBytes = new byte[parametersLength]; |
| input.readFully(parametersBytes); |
| checkcrc.update(parametersBytes, 0, parametersBytes.length); |
| int crc = input.readInt(); |
| |
| if (crc == (int) checkcrc.getValue()) |
| { |
| Map<?, ?> map = (Map<?, ?>) JsonUtils.decodeJson(parametersBytes); |
| return new CommitLogDescriptor(version, id, parseCompression(map), EncryptionContext.createFromMap(map, encryptionContext)); |
| } |
| return null; |
| } |
| |
| @SuppressWarnings("unchecked") |
| @VisibleForTesting |
| static ParameterizedClass parseCompression(Map<?, ?> params) |
| { |
| if (params == null || params.isEmpty()) |
| return null; |
| String className = (String) params.get(COMPRESSION_CLASS_KEY); |
| if (className == null) |
| return null; |
| |
| Map<String, String> cparams = (Map<String, String>) params.get(COMPRESSION_PARAMETERS_KEY); |
| return new ParameterizedClass(className, cparams); |
| } |
| |
| public static CommitLogDescriptor fromFileName(String name) |
| { |
| Matcher matcher = extactFromFileName(name); |
| long id = Long.parseLong(matcher.group(3).split(SEPARATOR)[1]); |
| return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id, null, new EncryptionContext()); |
| } |
| |
| public static long idFromFileName(String name) |
| { |
| Matcher matcher = extactFromFileName(name); |
| return Long.parseLong(matcher.group(3).split(SEPARATOR)[1]); |
| } |
| |
| private static Matcher extactFromFileName(String name) |
| { |
| Matcher matcher = COMMIT_LOG_FILE_PATTERN.matcher(name); |
| if (!matcher.matches()) |
| throw new RuntimeException("Cannot parse the version of the file: " + name); |
| |
| if (matcher.group(3) == null) |
| throw new UnsupportedOperationException("Commitlog segment is too old to open; upgrade to 1.2.5+ first"); |
| |
| return matcher; |
| } |
| |
| public int getMessagingVersion() |
| { |
| switch (version) |
| { |
| case VERSION_30: |
| return MessagingService.VERSION_30; |
| case VERSION_40: |
| return MessagingService.VERSION_40; |
| default: |
| throw new IllegalStateException("Unknown commitlog version " + version); |
| } |
| } |
| |
| public String fileName() |
| { |
| return FILENAME_PREFIX + version + SEPARATOR + id + FILENAME_EXTENSION; |
| } |
| |
| public String cdcIndexFileName() |
| { |
| return FILENAME_PREFIX + version + SEPARATOR + id + INDEX_FILENAME_SUFFIX; |
| } |
| |
| /** |
| * Infer the corresponding cdc index file using its cdc commitlog file |
| * @param cdcCommitLogSegment |
| * @return cdc index file or null if the cdc index file cannot be inferred. |
| */ |
| public static File inferCdcIndexFile(File cdcCommitLogSegment) |
| { |
| if (!isValid(cdcCommitLogSegment.name())) |
| return null; |
| String cdcFileName = cdcCommitLogSegment.name(); |
| String indexFileName = cdcFileName.substring(0, cdcFileName.length() - FILENAME_EXTENSION.length()) + INDEX_FILENAME_SUFFIX; |
| return new File(DatabaseDescriptor.getCDCLogLocation(), indexFileName); |
| } |
| |
| /** |
| * @param filename the filename to check |
| * @return true if filename could be a commit log based on it's filename |
| */ |
| public static boolean isValid(String filename) |
| { |
| return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches(); |
| } |
| |
| public EncryptionContext getEncryptionContext() |
| { |
| return encryptionContext; |
| } |
| |
| public String toString() |
| { |
| return "(" + version + "," + id + (compression != null ? "," + compression : "") + ")"; |
| } |
| |
| public boolean equals(Object that) |
| { |
| return that instanceof CommitLogDescriptor && equals((CommitLogDescriptor) that); |
| } |
| |
| public boolean equalsIgnoringCompression(CommitLogDescriptor that) |
| { |
| return this.version == that.version && this.id == that.id; |
| } |
| |
| public boolean equals(CommitLogDescriptor that) |
| { |
| return equalsIgnoringCompression(that) && Objects.equal(this.compression, that.compression) |
| && Objects.equal(encryptionContext, that.encryptionContext); |
| } |
| } |