blob: 231461328ab4de0f1c4952fdc164f1bcc71c281e [file] [log] [blame]
// Copyright 2013 Cloudera, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <boost/foreach.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <iostream>
#include <vector>
#include "kudu/common/row_operations.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log_index.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/util/env.h"
#include "kudu/util/flags.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/pb_util.h"
DEFINE_bool(print_headers, true, "print the log segment headers/footers");
DEFINE_string(print_entries, "decoded",
"How to print entries:\n"
" false|0|no = don't print\n"
" true|1|yes|decoded = print them decoded\n"
" pb = print the raw protobuf\n"
" id = print only their ids");
DEFINE_int32(truncate_data, 100,
"Truncate the data fields to the given number of bytes "
"before printing. Set to 0 to disable");
namespace kudu {
namespace log {
using consensus::CommitMsg;
using consensus::OperationType;
using consensus::ReplicateMsg;
using tserver::WriteRequestPB;
using std::string;
using std::vector;
using std::cout;
using std::endl;
enum PrintEntryType {
DONT_PRINT,
PRINT_PB,
PRINT_DECODED,
PRINT_ID
};
static PrintEntryType ParsePrintType() {
if (ParseLeadingBoolValue(FLAGS_print_entries.c_str(), true) == false) {
return DONT_PRINT;
} else if (ParseLeadingBoolValue(FLAGS_print_entries.c_str(), false) == true ||
FLAGS_print_entries == "decoded") {
return PRINT_DECODED;
} else if (FLAGS_print_entries == "pb") {
return PRINT_PB;
} else if (FLAGS_print_entries == "id") {
return PRINT_ID;
} else {
LOG(FATAL) << "Unknown value for --print_entries: " << FLAGS_print_entries;
}
}
void PrintIdOnly(const LogEntryPB& entry) {
switch (entry.type()) {
case log::REPLICATE:
{
cout << entry.replicate().id().term() << "." << entry.replicate().id().index()
<< "@" << entry.replicate().timestamp() << "\t";
cout << "REPLICATE "
<< OperationType_Name(entry.replicate().op_type());
break;
}
case log::COMMIT:
{
cout << "COMMIT " << entry.commit().commited_op_id().term()
<< "." << entry.commit().commited_op_id().index();
break;
}
default:
cout << "UNKNOWN: " << entry.ShortDebugString();
}
cout << endl;
}
Status PrintDecodedWriteRequestPB(const string& indent,
const Schema& tablet_schema,
const WriteRequestPB& write) {
Schema request_schema;
RETURN_NOT_OK(SchemaFromPB(write.schema(), &request_schema));
Arena arena(32 * 1024, 1024 * 1024);
RowOperationsPBDecoder dec(&write.row_operations(), &request_schema, &tablet_schema, &arena);
vector<DecodedRowOperation> ops;
RETURN_NOT_OK(dec.DecodeOperations(&ops));
cout << indent << "Tablet: " << write.tablet_id() << endl;
cout << indent << "Consistency: "
<< ExternalConsistencyMode_Name(write.external_consistency_mode()) << endl;
if (write.has_propagated_timestamp()) {
cout << indent << "Propagated TS: " << write.propagated_timestamp() << endl;
}
int i = 0;
BOOST_FOREACH(const DecodedRowOperation& op, ops) {
// TODO (KUDU-515): Handle the case when a tablet's schema changes
// mid-segment.
cout << indent << "op " << (i++) << ": " << op.ToString(tablet_schema) << endl;
}
return Status::OK();
}
Status PrintDecoded(const LogEntryPB& entry, const Schema& tablet_schema) {
PrintIdOnly(entry);
const string indent = "\t";
if (entry.has_replicate()) {
// We can actually decode REPLICATE messages.
const ReplicateMsg& replicate = entry.replicate();
if (replicate.op_type() == consensus::WRITE_OP) {
RETURN_NOT_OK(PrintDecodedWriteRequestPB(indent, tablet_schema, replicate.write_request()));
} else {
cout << indent << replicate.ShortDebugString() << endl;
}
} else if (entry.has_commit()) {
// For COMMIT we'll just dump the PB
cout << indent << entry.commit().ShortDebugString() << endl;
}
return Status::OK();
}
Status PrintSegment(const scoped_refptr<ReadableLogSegment>& segment) {
PrintEntryType print_type = ParsePrintType();
if (FLAGS_print_headers) {
cout << "Header:\n" << segment->header().DebugString();
}
vector<LogEntryPB*> entries;
RETURN_NOT_OK(segment->ReadEntries(&entries));
if (print_type == DONT_PRINT) return Status::OK();
Schema tablet_schema;
RETURN_NOT_OK(SchemaFromPB(segment->header().schema(), &tablet_schema));
BOOST_FOREACH(LogEntryPB* entry, entries) {
if (print_type == PRINT_PB) {
if (FLAGS_truncate_data > 0) {
pb_util::TruncateFields(entry, FLAGS_truncate_data);
}
cout << "Entry:\n" << entry->DebugString();
} else if (print_type == PRINT_DECODED) {
RETURN_NOT_OK(PrintDecoded(*entry, tablet_schema));
} else if (print_type == PRINT_ID) {
PrintIdOnly(*entry);
}
}
if (FLAGS_print_headers && segment->HasFooter()) {
cout << "Footer:\n" << segment->footer().DebugString();
}
return Status::OK();
}
Status DumpLog(const string& tablet_oid) {
Env *env = Env::Default();
gscoped_ptr<LogReader> reader;
FsManagerOpts fs_opts;
fs_opts.read_only = true;
FsManager fs_manager(env, fs_opts);
RETURN_NOT_OK(fs_manager.Open());
RETURN_NOT_OK(LogReader::Open(&fs_manager,
scoped_refptr<LogIndex>(),
tablet_oid,
scoped_refptr<MetricEntity>(),
&reader));
SegmentSequence segments;
RETURN_NOT_OK(reader->GetSegmentsSnapshot(&segments));
BOOST_FOREACH(const scoped_refptr<ReadableLogSegment>& segment, segments) {
RETURN_NOT_OK(PrintSegment(segment));
}
return Status::OK();
}
Status DumpSegment(const string &segment_path) {
Env *env = Env::Default();
scoped_refptr<ReadableLogSegment> segment;
RETURN_NOT_OK(ReadableLogSegment::Open(env, segment_path, &segment));
RETURN_NOT_OK(PrintSegment(segment));
return Status::OK();
}
} // namespace log
} // namespace kudu
int main(int argc, char **argv) {
kudu::ParseCommandLineFlags(&argc, &argv, true);
if (argc != 2) {
std::cerr << "usage: " << argv[0]
<< " -fs_wal_dir <dir> -fs_data_dirs <dirs>"
<< " <tablet_name> | <log segment path>"
<< std::endl;
return 1;
}
kudu::InitGoogleLoggingSafe(argv[0]);
kudu::Status s = kudu::log::DumpSegment(argv[1]);
if (s.ok()) {
return 0;
} else if (s.IsNotFound()) {
s = kudu::log::DumpLog(argv[1]);
}
if (!s.ok()) {
std::cerr << "Error: " << s.ToString() << std::endl;
return 1;
}
return 0;
}