| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.db.lifecycle; |
| |
| import java.io.File; |
| import java.util.Collection; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.utils.Throwables; |
| |
| /** |
| * A set of log replicas. This class mostly iterates over replicas when writing or reading, |
| * ensuring consistency among them and hiding replication details from LogFile. |
| * |
| * @see LogReplica |
| * @see LogFile |
| */ |
| public class LogReplicaSet |
| { |
| private static final Logger logger = LoggerFactory.getLogger(LogReplicaSet.class); |
| |
| private final Map<File, LogReplica> replicasByFile = new LinkedHashMap<>(); |
| |
| private Collection<LogReplica> replicas() |
| { |
| return replicasByFile.values(); |
| } |
| |
| void addReplicas(List<File> replicas) |
| { |
| replicas.forEach(this::addReplica); |
| } |
| |
| void addReplica(File file) |
| { |
| File directory = file.getParentFile(); |
| assert !replicasByFile.containsKey(directory); |
| replicasByFile.put(directory, LogReplica.open(file)); |
| |
| if (logger.isTraceEnabled()) |
| logger.trace("Added log file replica {} ", file); |
| } |
| |
| void maybeCreateReplica(File directory, String fileName, Set<LogRecord> records) |
| { |
| if (replicasByFile.containsKey(directory)) |
| return; |
| |
| final LogReplica replica = LogReplica.create(directory, fileName); |
| |
| records.forEach(replica::append); |
| replicasByFile.put(directory, replica); |
| |
| if (logger.isTraceEnabled()) |
| logger.trace("Created new file replica {}", replica); |
| } |
| |
| Throwable syncDirectory(Throwable accumulate) |
| { |
| return Throwables.perform(accumulate, replicas().stream().map(s -> s::syncDirectory)); |
| } |
| |
| Throwable delete(Throwable accumulate) |
| { |
| return Throwables.perform(accumulate, replicas().stream().map(s -> s::delete)); |
| } |
| |
| private static boolean isPrefixMatch(String first, String second) |
| { |
| return first.length() >= second.length() ? |
| first.startsWith(second) : |
| second.startsWith(first); |
| } |
| |
| boolean readRecords(Set<LogRecord> records) |
| { |
| Map<LogReplica, List<String>> linesByReplica = replicas().stream() |
| .collect(Collectors.toMap(Function.<LogReplica>identity(), |
| LogReplica::readLines, |
| (k, v) -> {throw new IllegalStateException("Duplicated key: " + k);}, |
| LinkedHashMap::new)); |
| |
| int maxNumLines = linesByReplica.values().stream().map(List::size).reduce(0, Integer::max); |
| for (int i = 0; i < maxNumLines; i++) |
| { |
| String firstLine = null; |
| boolean partial = false; |
| for (Map.Entry<LogReplica, List<String>> entry : linesByReplica.entrySet()) |
| { |
| List<String> currentLines = entry.getValue(); |
| if (i >= currentLines.size()) |
| continue; |
| |
| String currentLine = currentLines.get(i); |
| if (firstLine == null) |
| { |
| firstLine = currentLine; |
| continue; |
| } |
| |
| if (!isPrefixMatch(firstLine, currentLine)) |
| { // not a prefix match |
| logger.error("Mismatched line in file {}: got '{}' expected '{}', giving up", |
| entry.getKey().getFileName(), |
| currentLine, |
| firstLine); |
| entry.getKey().setError(currentLine, String.format("Does not match <%s> in first replica file", firstLine)); |
| return false; |
| } |
| |
| if (!firstLine.equals(currentLine)) |
| { |
| if (i == currentLines.size() - 1) |
| { // last record, just set record as invalid and move on |
| logger.warn("Mismatched last line in file {}: '{}' not the same as '{}'", |
| entry.getKey().getFileName(), |
| currentLine, |
| firstLine); |
| |
| if (currentLine.length() > firstLine.length()) |
| firstLine = currentLine; |
| |
| partial = true; |
| } |
| else |
| { // mismatched entry file has more lines, giving up |
| logger.error("Mismatched line in file {}: got '{}' expected '{}', giving up", |
| entry.getKey().getFileName(), |
| currentLine, |
| firstLine); |
| entry.getKey().setError(currentLine, String.format("Does not match <%s> in first replica file", firstLine)); |
| return false; |
| } |
| } |
| } |
| |
| LogRecord record = LogRecord.make(firstLine); |
| if (records.contains(record)) |
| { // duplicate records |
| logger.error("Found duplicate record {} for {}, giving up", record, record.fileName()); |
| setError(record, "Duplicated record"); |
| return false; |
| } |
| |
| if (partial) |
| record.setPartial(); |
| |
| records.add(record); |
| |
| if (record.isFinal() && i != (maxNumLines - 1)) |
| { // too many final records |
| logger.error("Found too many lines for {}, giving up", record.fileName()); |
| setError(record, "This record should have been the last one in all replicas"); |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| void setError(LogRecord record, String error) |
| { |
| record.setError(error); |
| setErrorInReplicas(record); |
| } |
| |
| void setErrorInReplicas(LogRecord record) |
| { |
| replicas().forEach(r -> r.setError(record.raw, record.error())); |
| } |
| |
| void printContentsWithAnyErrors(StringBuilder str) |
| { |
| replicas().forEach(r -> r.printContentsWithAnyErrors(str)); |
| } |
| |
| /** |
| * Add the record to all the replicas: if it is a final record then we throw only if we fail to write it |
| * to all, otherwise we throw if we fail to write it to any file, see CASSANDRA-10421 for details |
| */ |
| void append(LogRecord record) |
| { |
| Throwable err = Throwables.perform(null, replicas().stream().map(r -> () -> r.append(record))); |
| if (err != null) |
| { |
| if (!record.isFinal() || err.getSuppressed().length == replicas().size() -1) |
| Throwables.maybeFail(err); |
| |
| logger.error("Failed to add record '{}' to some replicas '{}'", record, this); |
| } |
| } |
| |
| boolean exists() |
| { |
| Optional<Boolean> ret = replicas().stream().map(LogReplica::exists).reduce(Boolean::logicalAnd); |
| return ret.isPresent() ? |
| ret.get() |
| : false; |
| } |
| |
| void close() |
| { |
| Throwables.maybeFail(Throwables.perform(null, replicas().stream().map(r -> r::close))); |
| } |
| |
| @Override |
| public String toString() |
| { |
| Optional<String> ret = replicas().stream().map(LogReplica::toString).reduce(String::concat); |
| return ret.isPresent() ? |
| ret.get() |
| : "[-]"; |
| } |
| |
| String getDirectories() |
| { |
| return String.join(", ", replicas().stream().map(LogReplica::getDirectory).collect(Collectors.toList())); |
| } |
| |
| @VisibleForTesting |
| List<File> getFiles() |
| { |
| return replicas().stream().map(LogReplica::file).collect(Collectors.toList()); |
| } |
| |
| @VisibleForTesting |
| List<String> getFilePaths() |
| { |
| return replicas().stream().map(LogReplica::file).map(File::getPath).collect(Collectors.toList()); |
| } |
| } |