| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <stddef.h> |
| #include <stdint.h> |
| #include <functional> |
| #include <iterator> |
| #include <map> |
| #include <memory> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "consensus_types.h" |
| #include "mutation_log.h" |
| #include "mutation_log_utils.h" |
| #include "replica/log_block.h" |
| #include "replica/log_file.h" |
| #include "replica/mutation.h" |
| #include "utils/autoref_ptr.h" |
| #include "utils/binary_reader.h" |
| #include "utils/blob.h" |
| #include "utils/error_code.h" |
| #include "utils/errors.h" |
| #include "utils/fail_point.h" |
| #include "utils/fmt_logging.h" |
| #include "absl/strings/string_view.h" |
| |
| namespace dsn { |
| namespace replication { |
| |
| /*static*/ error_code mutation_log::replay(log_file_ptr log, |
| replay_callback callback, |
| /*out*/ int64_t &end_offset) |
| { |
| end_offset = log->start_offset(); |
| LOG_INFO("start to replay mutation log {}, offset = [{}, {}), size = {}", |
| log->path(), |
| log->start_offset(), |
| log->end_offset(), |
| log->end_offset() - log->start_offset()); |
| |
| ::dsn::blob bb; |
| log->reset_stream(); |
| error_s err; |
| size_t start_offset = 0; |
| while (true) { |
| err = replay_block(log, callback, start_offset, end_offset); |
| if (!err.is_ok()) { |
| // Stop immediately if failed |
| break; |
| } |
| |
| start_offset = static_cast<size_t>(end_offset - log->start_offset()); |
| } |
| |
| LOG_INFO("finish to replay mutation log ({}) [err: {}]", log->path(), err); |
| return err.code(); |
| } |
| |
| /*static*/ error_s mutation_log::replay_block(log_file_ptr &log, |
| replay_callback &callback, |
| size_t start_offset, |
| int64_t &end_offset) |
| { |
| FAIL_POINT_INJECT_F("mutation_log_replay_block", [](absl::string_view) -> error_s { |
| return error_s::make(ERR_INCOMPLETE_DATA, "mutation_log_replay_block"); |
| }); |
| |
| blob bb; |
| std::unique_ptr<binary_reader> reader; |
| |
| log->reset_stream(start_offset); // start reading from given offset |
| int64_t global_start_offset = start_offset + log->start_offset(); |
| end_offset = global_start_offset; // reset end_offset to the start. |
| |
| // reads the entire block into memory |
| error_code err = log->read_next_log_block(bb); |
| if (err != ERR_OK) { |
| return error_s::make(err, "failed to read log block"); |
| } |
| |
| reader = std::make_unique<binary_reader>(bb); |
| end_offset += sizeof(log_block_header); |
| |
| // The first block is log_file_header. |
| if (global_start_offset == log->start_offset()) { |
| end_offset += log->read_file_header(*reader); |
| if (!log->is_right_header()) { |
| return error_s::make(ERR_INVALID_DATA, "failed to read log file header"); |
| } |
| // continue to parsing the data block |
| } |
| |
| while (!reader->is_eof()) { |
| auto old_size = reader->get_remaining_size(); |
| mutation_ptr mu = mutation::read_from(*reader, nullptr); |
| CHECK_NOTNULL(mu, ""); |
| mu->set_logged(); |
| |
| if (mu->data.header.log_offset != end_offset) { |
| return FMT_ERR(ERR_INVALID_DATA, |
| "offset mismatch in log entry and mutation {} vs {}", |
| end_offset, |
| mu->data.header.log_offset); |
| } |
| |
| int log_length = old_size - reader->get_remaining_size(); |
| |
| callback(log_length, mu); |
| |
| end_offset += log_length; |
| } |
| |
| return error_s::ok(); |
| } |
| |
| /*static*/ error_code mutation_log::replay(std::vector<std::string> &log_files, |
| replay_callback callback, |
| /*out*/ int64_t &end_offset) |
| { |
| log_file_map_by_index logs; |
| for (auto &fpath : log_files) { |
| error_code err; |
| log_file_ptr log = log_file::open_read(fpath.c_str(), err); |
| if (log == nullptr) { |
| if (err == ERR_HANDLE_EOF || err == ERR_INCOMPLETE_DATA || |
| err == ERR_INVALID_PARAMETERS) { |
| LOG_INFO("skip file {} during log replay", fpath); |
| continue; |
| } else { |
| return err; |
| } |
| } |
| |
| CHECK(logs.find(log->index()) == logs.end(), "invalid log index, index = {}", log->index()); |
| logs[log->index()] = log; |
| } |
| |
| return replay(logs, callback, end_offset); |
| } |
| |
| /*static*/ error_code mutation_log::replay(log_file_map_by_index &logs, |
| replay_callback callback, |
| /*out*/ int64_t &end_offset) |
| { |
| int64_t g_start_offset = 0; |
| int64_t g_end_offset = 0; |
| error_code err = ERR_OK; |
| log_file_ptr last; |
| |
| if (logs.size() > 0) { |
| g_start_offset = logs.begin()->second->start_offset(); |
| g_end_offset = logs.rbegin()->second->end_offset(); |
| } |
| |
| error_s error = log_utils::check_log_files_continuity(logs); |
| if (!error.is_ok()) { |
| LOG_ERROR("check_log_files_continuity failed: {}", error); |
| return error.code(); |
| } |
| |
| end_offset = g_start_offset; |
| |
| for (auto &kv : logs) { |
| log_file_ptr &log = kv.second; |
| |
| if (log->start_offset() != end_offset) { |
| LOG_ERROR("offset mismatch in log file offset and global offset {} vs {}", |
| log->start_offset(), |
| end_offset); |
| return ERR_INVALID_DATA; |
| } |
| |
| last = log; |
| err = mutation_log::replay(log, callback, end_offset); |
| |
| log->close(); |
| |
| if (err == ERR_OK || err == ERR_HANDLE_EOF) { |
| // do nothing |
| } else if (err == ERR_INCOMPLETE_DATA) { |
| // If the file is not corrupted, it may also return the value of ERR_INCOMPLETE_DATA. |
| // In this case, the correctness is relying on the check of start_offset. |
| LOG_WARNING("delay handling error: {}", err); |
| } else { |
| // for other errors, we should break |
| break; |
| } |
| } |
| |
| if (err == ERR_OK || err == ERR_HANDLE_EOF) { |
| // the log may still be written when used for learning |
| CHECK_LE(g_end_offset, end_offset); |
| err = ERR_OK; |
| } else if (err == ERR_INCOMPLETE_DATA) { |
| // ignore the last incomplate block |
| err = ERR_OK; |
| } else { |
| // bad error |
| LOG_ERROR("replay mutation log failed: {}", err); |
| } |
| |
| return err; |
| } |
| |
| } // namespace replication |
| } // namespace dsn |