| // Copyright (c) 2017-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| |
| #include "format.h" |
| |
| #include <algorithm> |
| #include <map> |
| #include <memory> |
| |
| #include "utilities/cassandra/serialize.h" |
| |
| namespace rocksdb { |
| namespace cassandra { |
| namespace { |
| const int32_t kDefaultLocalDeletionTime = |
| std::numeric_limits<int32_t>::max(); |
| const int64_t kDefaultMarkedForDeleteAt = |
| std::numeric_limits<int64_t>::min(); |
| } |
| |
| ColumnBase::ColumnBase(int8_t mask, int8_t index) |
| : mask_(mask), index_(index) {} |
| |
| std::size_t ColumnBase::Size() const { |
| return sizeof(mask_) + sizeof(index_); |
| } |
| |
| int8_t ColumnBase::Mask() const { |
| return mask_; |
| } |
| |
| int8_t ColumnBase::Index() const { |
| return index_; |
| } |
| |
| void ColumnBase::Serialize(std::string* dest) const { |
| rocksdb::cassandra::Serialize<int8_t>(mask_, dest); |
| rocksdb::cassandra::Serialize<int8_t>(index_, dest); |
| } |
| |
| std::shared_ptr<ColumnBase> ColumnBase::Deserialize(const char* src, |
| std::size_t offset) { |
| int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset); |
| if ((mask & ColumnTypeMask::DELETION_MASK) != 0) { |
| return Tombstone::Deserialize(src, offset); |
| } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) { |
| return ExpiringColumn::Deserialize(src, offset); |
| } else { |
| return Column::Deserialize(src, offset); |
| } |
| } |
| |
| Column::Column( |
| int8_t mask, |
| int8_t index, |
| int64_t timestamp, |
| int32_t value_size, |
| const char* value |
| ) : ColumnBase(mask, index), timestamp_(timestamp), |
| value_size_(value_size), value_(value) {} |
| |
| int64_t Column::Timestamp() const { |
| return timestamp_; |
| } |
| |
| std::size_t Column::Size() const { |
| return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_) |
| + value_size_; |
| } |
| |
| void Column::Serialize(std::string* dest) const { |
| ColumnBase::Serialize(dest); |
| rocksdb::cassandra::Serialize<int64_t>(timestamp_, dest); |
| rocksdb::cassandra::Serialize<int32_t>(value_size_, dest); |
| dest->append(value_, value_size_); |
| } |
| |
| std::shared_ptr<Column> Column::Deserialize(const char *src, |
| std::size_t offset) { |
| int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset); |
| offset += sizeof(mask); |
| int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset); |
| offset += sizeof(index); |
| int64_t timestamp = rocksdb::cassandra::Deserialize<int64_t>(src, offset); |
| offset += sizeof(timestamp); |
| int32_t value_size = rocksdb::cassandra::Deserialize<int32_t>(src, offset); |
| offset += sizeof(value_size); |
| return std::make_shared<Column>( |
| mask, index, timestamp, value_size, src + offset); |
| } |
| |
| ExpiringColumn::ExpiringColumn( |
| int8_t mask, |
| int8_t index, |
| int64_t timestamp, |
| int32_t value_size, |
| const char* value, |
| int32_t ttl |
| ) : Column(mask, index, timestamp, value_size, value), |
| ttl_(ttl) {} |
| |
| std::size_t ExpiringColumn::Size() const { |
| return Column::Size() + sizeof(ttl_); |
| } |
| |
| void ExpiringColumn::Serialize(std::string* dest) const { |
| Column::Serialize(dest); |
| rocksdb::cassandra::Serialize<int32_t>(ttl_, dest); |
| } |
| |
| std::chrono::time_point<std::chrono::system_clock> ExpiringColumn::TimePoint() const { |
| return std::chrono::time_point<std::chrono::system_clock>(std::chrono::microseconds(Timestamp())); |
| } |
| |
| std::chrono::seconds ExpiringColumn::Ttl() const { |
| return std::chrono::seconds(ttl_); |
| } |
| |
| bool ExpiringColumn::Expired() const { |
| return TimePoint() + Ttl() < std::chrono::system_clock::now(); |
| } |
| |
| std::shared_ptr<Tombstone> ExpiringColumn::ToTombstone() const { |
| auto expired_at = (TimePoint() + Ttl()).time_since_epoch(); |
| int32_t local_deletion_time = static_cast<int32_t>( |
| std::chrono::duration_cast<std::chrono::seconds>(expired_at).count()); |
| int64_t marked_for_delete_at = |
| std::chrono::duration_cast<std::chrono::microseconds>(expired_at).count(); |
| return std::make_shared<Tombstone>( |
| ColumnTypeMask::DELETION_MASK, |
| Index(), |
| local_deletion_time, |
| marked_for_delete_at); |
| } |
| |
| std::shared_ptr<ExpiringColumn> ExpiringColumn::Deserialize( |
| const char *src, |
| std::size_t offset) { |
| int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset); |
| offset += sizeof(mask); |
| int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset); |
| offset += sizeof(index); |
| int64_t timestamp = rocksdb::cassandra::Deserialize<int64_t>(src, offset); |
| offset += sizeof(timestamp); |
| int32_t value_size = rocksdb::cassandra::Deserialize<int32_t>(src, offset); |
| offset += sizeof(value_size); |
| const char* value = src + offset; |
| offset += value_size; |
| int32_t ttl = rocksdb::cassandra::Deserialize<int32_t>(src, offset); |
| return std::make_shared<ExpiringColumn>( |
| mask, index, timestamp, value_size, value, ttl); |
| } |
| |
| Tombstone::Tombstone( |
| int8_t mask, |
| int8_t index, |
| int32_t local_deletion_time, |
| int64_t marked_for_delete_at |
| ) : ColumnBase(mask, index), local_deletion_time_(local_deletion_time), |
| marked_for_delete_at_(marked_for_delete_at) {} |
| |
| int64_t Tombstone::Timestamp() const { |
| return marked_for_delete_at_; |
| } |
| |
| std::size_t Tombstone::Size() const { |
| return ColumnBase::Size() + sizeof(local_deletion_time_) |
| + sizeof(marked_for_delete_at_); |
| } |
| |
| void Tombstone::Serialize(std::string* dest) const { |
| ColumnBase::Serialize(dest); |
| rocksdb::cassandra::Serialize<int32_t>(local_deletion_time_, dest); |
| rocksdb::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest); |
| } |
| |
| std::shared_ptr<Tombstone> Tombstone::Deserialize(const char *src, |
| std::size_t offset) { |
| int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset); |
| offset += sizeof(mask); |
| int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset); |
| offset += sizeof(index); |
| int32_t local_deletion_time = |
| rocksdb::cassandra::Deserialize<int32_t>(src, offset); |
| offset += sizeof(int32_t); |
| int64_t marked_for_delete_at = |
| rocksdb::cassandra::Deserialize<int64_t>(src, offset); |
| return std::make_shared<Tombstone>( |
| mask, index, local_deletion_time, marked_for_delete_at); |
| } |
| |
| RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at) |
| : local_deletion_time_(local_deletion_time), |
| marked_for_delete_at_(marked_for_delete_at), columns_(), |
| last_modified_time_(0) {} |
| |
| RowValue::RowValue(Columns columns, |
| int64_t last_modified_time) |
| : local_deletion_time_(kDefaultLocalDeletionTime), |
| marked_for_delete_at_(kDefaultMarkedForDeleteAt), |
| columns_(std::move(columns)), last_modified_time_(last_modified_time) {} |
| |
| std::size_t RowValue::Size() const { |
| std::size_t size = sizeof(local_deletion_time_) |
| + sizeof(marked_for_delete_at_); |
| for (const auto& column : columns_) { |
| size += column -> Size(); |
| } |
| return size; |
| } |
| |
| int64_t RowValue::LastModifiedTime() const { |
| if (IsTombstone()) { |
| return marked_for_delete_at_; |
| } else { |
| return last_modified_time_; |
| } |
| } |
| |
| bool RowValue::IsTombstone() const { |
| return marked_for_delete_at_ > kDefaultMarkedForDeleteAt; |
| } |
| |
| void RowValue::Serialize(std::string* dest) const { |
| rocksdb::cassandra::Serialize<int32_t>(local_deletion_time_, dest); |
| rocksdb::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest); |
| for (const auto& column : columns_) { |
| column -> Serialize(dest); |
| } |
| } |
| |
| RowValue RowValue::PurgeTtl(bool* changed) const { |
| *changed = false; |
| Columns new_columns; |
| for (auto& column : columns_) { |
| if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) { |
| std::shared_ptr<ExpiringColumn> expiring_column = |
| std::static_pointer_cast<ExpiringColumn>(column); |
| |
| if(expiring_column->Expired()){ |
| *changed = true; |
| continue; |
| } |
| } |
| |
| new_columns.push_back(column); |
| } |
| return RowValue(std::move(new_columns), last_modified_time_); |
| } |
| |
| RowValue RowValue::ExpireTtl(bool* changed) const { |
| *changed = false; |
| Columns new_columns; |
| for (auto& column : columns_) { |
| if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) { |
| std::shared_ptr<ExpiringColumn> expiring_column = |
| std::static_pointer_cast<ExpiringColumn>(column); |
| |
| if(expiring_column->Expired()) { |
| shared_ptr<Tombstone> tombstone = expiring_column->ToTombstone(); |
| new_columns.push_back(tombstone); |
| *changed = true; |
| continue; |
| } |
| } |
| new_columns.push_back(column); |
| } |
| return RowValue(std::move(new_columns), last_modified_time_); |
| } |
| |
| bool RowValue::Empty() const { |
| return columns_.empty(); |
| } |
| |
| RowValue RowValue::Deserialize(const char *src, std::size_t size) { |
| std::size_t offset = 0; |
| assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_)); |
| int32_t local_deletion_time = |
| rocksdb::cassandra::Deserialize<int32_t>(src, offset); |
| offset += sizeof(int32_t); |
| int64_t marked_for_delete_at = |
| rocksdb::cassandra::Deserialize<int64_t>(src, offset); |
| offset += sizeof(int64_t); |
| if (offset == size) { |
| return RowValue(local_deletion_time, marked_for_delete_at); |
| } |
| |
| assert(local_deletion_time == kDefaultLocalDeletionTime); |
| assert(marked_for_delete_at == kDefaultMarkedForDeleteAt); |
| Columns columns; |
| int64_t last_modified_time = 0; |
| while (offset < size) { |
| auto c = ColumnBase::Deserialize(src, offset); |
| offset += c -> Size(); |
| assert(offset <= size); |
| last_modified_time = std::max(last_modified_time, c -> Timestamp()); |
| columns.push_back(std::move(c)); |
| } |
| |
| return RowValue(std::move(columns), last_modified_time); |
| } |
| |
| // Merge multiple row values into one. |
| // For each column in rows with same index, we pick the one with latest |
| // timestamp. And we also take row tombstone into consideration, by iterating |
| // each row from reverse timestamp order, and stop once we hit the first |
| // row tombstone. |
| RowValue RowValue::Merge(std::vector<RowValue>&& values) { |
| assert(values.size() > 0); |
| if (values.size() == 1) { |
| return std::move(values[0]); |
| } |
| |
| // Merge columns by their last modified time, and skip once we hit |
| // a row tombstone. |
| std::sort(values.begin(), values.end(), |
| [](const RowValue& r1, const RowValue& r2) { |
| return r1.LastModifiedTime() > r2.LastModifiedTime(); |
| }); |
| |
| std::map<int8_t, std::shared_ptr<ColumnBase>> merged_columns; |
| int64_t tombstone_timestamp = 0; |
| |
| for (auto& value : values) { |
| if (value.IsTombstone()) { |
| if (merged_columns.size() == 0) { |
| return std::move(value); |
| } |
| tombstone_timestamp = value.LastModifiedTime(); |
| break; |
| } |
| for (auto& column : value.columns_) { |
| int8_t index = column->Index(); |
| if (merged_columns.find(index) == merged_columns.end()) { |
| merged_columns[index] = column; |
| } else { |
| if (column->Timestamp() > merged_columns[index]->Timestamp()) { |
| merged_columns[index] = column; |
| } |
| } |
| } |
| } |
| |
| int64_t last_modified_time = 0; |
| Columns columns; |
| for (auto& pair: merged_columns) { |
| // For some row, its last_modified_time > row tombstone_timestamp, but |
| // it might have rows whose timestamp is ealier than tombstone, so we |
| // ned to filter these rows. |
| if (pair.second->Timestamp() <= tombstone_timestamp) { |
| continue; |
| } |
| last_modified_time = std::max(last_modified_time, pair.second->Timestamp()); |
| columns.push_back(std::move(pair.second)); |
| } |
| return RowValue(std::move(columns), last_modified_time); |
| } |
| |
| } // namepsace cassandrda |
| } // namespace rocksdb |