blob: 1996b76ed777ed4e99733d88e302c7221be541e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "result_writer.h"
#include <dsn/utility/flags.h>
namespace pegasus {
namespace server {
DSN_DEFINE_int32("pegasus.collector",
capacity_unit_saving_ttl_days,
90,
"the ttl of the CU data, 0 if no ttl");
DEFINE_TASK_CODE(LPC_WRITE_RESULT, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
result_writer::result_writer(pegasus_client *client) : _client(client) {}
void result_writer::set_result(const std::string &hash_key,
const std::string &sort_key,
const std::string &value,
int try_count)
{
auto async_set_callback = [=](int err, pegasus_client::internal_info &&info) {
if (err != PERR_OK) {
int new_try_count = try_count - 1;
if (new_try_count > 0) {
dwarn("set_result fail, hash_key = %s, sort_key = %s, value = %s, "
"error = %s, left_try_count = %d, try again after 1 minute",
hash_key.c_str(),
sort_key.c_str(),
value.c_str(),
_client->get_error_string(err),
new_try_count);
::dsn::tasking::enqueue(
LPC_WRITE_RESULT,
&_tracker,
[=]() { set_result(hash_key, sort_key, value, new_try_count); },
0,
std::chrono::minutes(1));
} else {
derror("set_result fail, hash_key = %s, sort_key = %s, value = %s, "
"error = %s, left_try_count = %d, do not try again",
hash_key.c_str(),
sort_key.c_str(),
value.c_str(),
_client->get_error_string(err),
new_try_count);
}
} else {
dinfo("set_result succeed, hash_key = %s, sort_key = %s, value = %s",
hash_key.c_str(),
sort_key.c_str(),
value.c_str());
}
};
_client->async_set(hash_key,
sort_key,
value,
std::move(async_set_callback),
5000,
FLAGS_capacity_unit_saving_ttl_days * 3600 * 24);
}
} // namespace server
} // namespace pegasus