| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "TableViewImpl.h" |
| |
| #include "ClientImpl.h" |
| #include "LogUtils.h" |
| #include "ReaderImpl.h" |
| #include "TimeUtils.h" |
| |
| namespace pulsar { |
| |
| DECLARE_LOG_OBJECT() |
| |
| TableViewImpl::TableViewImpl(ClientImplPtr client, const std::string& topic, |
| const TableViewConfiguration& conf) |
| : client_(client), topic_(topic), conf_(conf) {} |
| |
| Future<Result, TableViewImplPtr> TableViewImpl::start() { |
| Promise<Result, TableViewImplPtr> promise; |
| ReaderConfiguration readerConfiguration; |
| readerConfiguration.setSchema(conf_.schemaInfo); |
| readerConfiguration.setReadCompacted(true); |
| readerConfiguration.setInternalSubscriptionName(conf_.subscriptionName); |
| |
| TableViewImplPtr self = shared_from_this(); |
| ReaderCallback readerCallback = [self, promise](Result res, Reader reader) { |
| if (res == ResultOk) { |
| self->reader_ = reader.impl_; |
| self->readAllExistingMessages(promise, TimeUtils::currentTimeMillis(), 0); |
| } else { |
| promise.setFailed(res); |
| } |
| }; |
| client_->createReaderAsync(topic_, MessageId::earliest(), readerConfiguration, readerCallback); |
| return promise.getFuture(); |
| } |
| |
| bool TableViewImpl::retrieveValue(const std::string& key, std::string& value) { |
| auto optValue = data_.remove(key); |
| if (optValue) { |
| value = optValue.value(); |
| return true; |
| } |
| return false; |
| } |
| |
| bool TableViewImpl::getValue(const std::string& key, std::string& value) const { |
| auto optValue = data_.find(key); |
| if (optValue) { |
| value = optValue.value(); |
| return true; |
| } |
| return false; |
| } |
| |
| bool TableViewImpl::containsKey(const std::string& key) const { return data_.find(key) != boost::none; } |
| |
| std::unordered_map<std::string, std::string> TableViewImpl::snapshot() { return data_.move(); } |
| |
| std::size_t TableViewImpl::size() const { return data_.size(); } |
| |
| void TableViewImpl::forEach(TableViewAction action) { data_.forEach(action); } |
| |
| void TableViewImpl::forEachAndListen(TableViewAction action) { |
| data_.forEach(action); |
| Lock lock(listenersMutex_); |
| listeners_.emplace_back(action); |
| } |
| |
| void TableViewImpl::closeAsync(ResultCallback callback) { |
| if (reader_) { |
| reader_->closeAsync([callback, this](Result result) { |
| reader_.reset(); |
| callback(result); |
| }); |
| } else { |
| callback(ResultConsumerNotInitialized); |
| } |
| } |
| |
| void TableViewImpl::handleMessage(const Message& msg) { |
| if (msg.hasPartitionKey()) { |
| auto value = msg.getDataAsString(); |
| LOG_DEBUG("Applying message from " << topic_ << " key=" << msg.getPartitionKey() |
| << " value=" << value) |
| |
| if (msg.getLength() == 0) { |
| data_.remove(msg.getPartitionKey()); |
| } else { |
| data_.emplace(msg.getPartitionKey(), value); |
| } |
| |
| Lock lock(listenersMutex_); |
| for (const auto& listener : listeners_) { |
| try { |
| listener(msg.getPartitionKey(), value); |
| } catch (const std::exception& exc) { |
| LOG_ERROR("Table view listener raised an exception: " << exc.what()); |
| } |
| } |
| } |
| } |
| |
| void TableViewImpl::readAllExistingMessages(Promise<Result, TableViewImplPtr> promise, long startTime, |
| long messagesRead) { |
| std::weak_ptr<TableViewImpl> weakSelf{shared_from_this()}; |
| reader_->hasMessageAvailableAsync( |
| [weakSelf, promise, startTime, messagesRead](Result result, bool hasMessage) { |
| auto self = weakSelf.lock(); |
| if (!self || result != ResultOk) { |
| promise.setFailed(result); |
| return; |
| } |
| if (hasMessage) { |
| Message msg; |
| auto topic = self->topic_; |
| self->reader_->readNextAsync( |
| [weakSelf, promise, startTime, messagesRead, topic](Result res, const Message& msg) { |
| auto self = weakSelf.lock(); |
| if (!self || res != ResultOk) { |
| promise.setFailed(res); |
| LOG_ERROR("Start table view failed, reader msg for " |
| << topic << " error: " << strResult(res)); |
| } else { |
| self->handleMessage(msg); |
| auto tmpMessagesRead = messagesRead + 1; |
| self->readAllExistingMessages(promise, startTime, tmpMessagesRead); |
| } |
| }); |
| } else { |
| long endTime = TimeUtils::currentTimeMillis(); |
| long durationMillis = endTime - startTime; |
| LOG_INFO("Started table view for " << self->topic_ << "Replayed: " << messagesRead |
| << " message in " << durationMillis << " millis"); |
| promise.setValue(self); |
| self->readTailMessage(); |
| } |
| }); |
| } |
| |
| void TableViewImpl::readTailMessage() { |
| auto self = shared_from_this(); |
| reader_->readNextAsync([self](Result result, const Message& msg) { |
| if (result == ResultOk) { |
| self->handleMessage(msg); |
| self->readTailMessage(); |
| } else { |
| LOG_WARN("Reader " << self->topic_ << " was interrupted: " << result); |
| } |
| }); |
| } |
| |
| } // namespace pulsar |