blob: 370b8cc6b5ac43ea79c11c2df20a5dfc440fe249 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/twitter-demo/insert_consumer.h"
#include <boost/thread/locks.hpp>
#include <glog/logging.h>
#include <string>
#include <time.h>
#include <vector>
#include "kudu/common/wire_protocol.h"
#include "kudu/common/row.h"
#include "kudu/common/schema.h"
#include "kudu/client/client.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/twitter-demo/parser.h"
#include "kudu/twitter-demo/twitter-schema.h"
#include "kudu/util/status.h"
namespace kudu {
namespace twitter_demo {
using tserver::TabletServerServiceProxy;
using tserver::WriteRequestPB;
using tserver::WriteResponsePB;
using rpc::RpcController;
using kudu::client::KuduInsert;
using kudu::client::KuduClient;
using kudu::client::KuduSession;
using kudu::client::KuduStatusCallback;
using kudu::client::KuduTable;
using kudu::client::KuduTableCreator;
FlushCB::FlushCB(InsertConsumer* consumer)
: consumer_(consumer) {
}
FlushCB::~FlushCB() {
}
void FlushCB::Run(const Status& status) {
consumer_->BatchFinished(status);
}
InsertConsumer::InsertConsumer(const client::sp::shared_ptr<KuduClient> &client)
: initted_(false),
schema_(CreateTwitterSchema()),
flush_cb_(this),
client_(client),
request_pending_(false) {
}
Status InsertConsumer::Init() {
const char *kTableName = "twitter";
Status s = client_->OpenTable(kTableName, &table_);
if (s.IsNotFound()) {
gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
RETURN_NOT_OK_PREPEND(table_creator->table_name(kTableName)
.schema(&schema_)
.Create(),
"Couldn't create twitter table");
s = client_->OpenTable(kTableName, &table_);
}
RETURN_NOT_OK_PREPEND(s, "Couldn't open twitter table");
session_ = client_->NewSession();
session_->SetTimeoutMillis(1000);
CHECK_OK(session_->SetFlushMode(KuduSession::MANUAL_FLUSH));
initted_ = true;
return Status::OK();
}
InsertConsumer::~InsertConsumer() {
// TODO: to be safe, we probably need to cancel any current RPC,
// or else the callback will get called on the destroyed object.
// Given this is just demo code, cutting this corner.
CHECK(!request_pending_);
}
void InsertConsumer::BatchFinished(const Status& s) {
boost::lock_guard<simple_spinlock> l(lock_);
request_pending_ = false;
if (!s.ok()) {
bool overflow;
vector<client::KuduError*> errors;
ElementDeleter d(&errors);
session_->GetPendingErrors(&errors, &overflow);
for (const client::KuduError* error : errors) {
LOG(WARNING) << "Failed to insert row " << error->failed_op().ToString()
<< ": " << error->status().ToString();
}
}
}
void InsertConsumer::ConsumeJSON(const Slice& json_slice) {
CHECK(initted_);
string json = json_slice.ToString();
Status s = parser_.Parse(json, &event_);
if (!s.ok()) {
LOG(WARNING) << "Unable to parse JSON string: " << json << ": " << s.ToString();
return;
}
if (event_.type == DELETE_TWEET) {
// Not currently supported.
return;
}
string created_at = TwitterEventParser::ReformatTime(event_.tweet_event.created_at);
gscoped_ptr<KuduInsert> ins(table_->NewInsert());
KuduPartialRow* r = ins->mutable_row();
CHECK_OK(r->SetInt64("tweet_id", event_.tweet_event.tweet_id));
CHECK_OK(r->SetStringCopy("text", event_.tweet_event.text));
CHECK_OK(r->SetStringCopy("source", event_.tweet_event.source));
CHECK_OK(r->SetStringCopy("created_at", created_at));
CHECK_OK(r->SetInt64("user_id", event_.tweet_event.user_id));
CHECK_OK(r->SetStringCopy("user_name", event_.tweet_event.user_name));
CHECK_OK(r->SetStringCopy("user_description", event_.tweet_event.user_description));
CHECK_OK(r->SetStringCopy("user_location", event_.tweet_event.user_location));
CHECK_OK(r->SetInt32("user_followers_count", event_.tweet_event.user_followers_count));
CHECK_OK(r->SetInt32("user_friends_count", event_.tweet_event.user_friends_count));
CHECK_OK(r->SetStringCopy("user_image_url", event_.tweet_event.user_image_url));
CHECK_OK(session_->Apply(ins.release()));
// TODO: once the auto-flush mode is implemented, switch to using that
// instead of the manual batching here
bool do_flush = false;
{
boost::lock_guard<simple_spinlock> l(lock_);
if (!request_pending_) {
request_pending_ = true;
do_flush = true;
}
}
if (do_flush) {
VLOG(1) << "Sending batch of " << session_->CountBufferedOperations();
session_->FlushAsync(&flush_cb_);
}
}
} // namespace twitter_demo
} // namespace kudu