blob: 22a74e01c0a1bb04c84aa208f261efc046b2e0bb [file] [log] [blame]
// Copyright 2016 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author: cheesy@google.com (Steve Hill)
#include "pagespeed/controller/popularity_contest_schedule_rewrite_controller.h"
#include <unordered_map>
#include <utility>
#include "base/logging.h"
#include "pagespeed/controller/priority_queue.h"
#include "pagespeed/kernel/base/function.h"
#include "pagespeed/kernel/base/statistics.h"
#include "pagespeed/kernel/base/string.h"
#include "pagespeed/kernel/base/scoped_ptr.h"
#include "pagespeed/kernel/base/thread_system.h"
namespace net_instaweb {
const char PopularityContestScheduleRewriteController::kNumRewritesRequested[] =
"popularity-contest-num-rewrites-requested";
const char PopularityContestScheduleRewriteController::kNumRewritesSucceeded[] =
"popularity-contest-num-rewrites-succeeded";
const char PopularityContestScheduleRewriteController::kNumRewritesFailed[] =
"popularity-contest-num-rewrites-failed";
const char PopularityContestScheduleRewriteController::
kNumRewritesRejectedQueueSize[] =
"popularity-contest-num-rewrites-rejected-queue-full";
const char PopularityContestScheduleRewriteController::
kNumRewritesRejectedInProgress[] =
"popularity-contest-num-rewrites-rejected-already-running";
const char PopularityContestScheduleRewriteController::kRewriteQueueSize[] =
"popularity-contest-queue-size";
const char PopularityContestScheduleRewriteController::kNumRewritesRunning[] =
"popularity-contest-num-rewrites-running";
const char
PopularityContestScheduleRewriteController::kNumRewritesAwaitingRetry[] =
"popularity-contest-num-rewrites-awaiting-retry";
PopularityContestScheduleRewriteController::
PopularityContestScheduleRewriteController(ThreadSystem* thread_system,
Statistics* stats, Timer* timer,
int max_running_rewrites,
int max_queued_rewrites)
: mutex_(thread_system->NewMutex()),
timer_(timer),
running_rewrites_(0),
max_running_rewrites_(max_running_rewrites),
max_queued_rewrites_(max_queued_rewrites),
num_rewrite_requests_(stats->GetTimedVariable(kNumRewritesRequested)),
num_rewrites_succeeded_(stats->GetTimedVariable(kNumRewritesSucceeded)),
num_rewrites_failed_(stats->GetTimedVariable(kNumRewritesFailed)),
num_rewrites_rejected_queue_size_(
stats->GetTimedVariable(kNumRewritesRejectedQueueSize)),
num_rewrites_rejected_in_progress_(
stats->GetTimedVariable(kNumRewritesRejectedInProgress)),
queue_size_(stats->GetUpDownCounter(kRewriteQueueSize)),
num_rewrites_running_(stats->GetUpDownCounter(kNumRewritesRunning)),
num_rewrites_awaiting_retry_(
stats->GetUpDownCounter(kNumRewritesAwaitingRetry)) {
// Technically the code should work with these *at* zero, but then what's the
// point?
CHECK_GT(max_running_rewrites_, 0);
CHECK_GT(max_queued_rewrites_, 0);
}
void PopularityContestScheduleRewriteController::InitStats(Statistics* stats) {
stats->AddTimedVariable(kNumRewritesRequested, Statistics::kDefaultGroup);
stats->AddTimedVariable(kNumRewritesSucceeded, Statistics::kDefaultGroup);
stats->AddTimedVariable(kNumRewritesFailed, Statistics::kDefaultGroup);
stats->AddTimedVariable(kNumRewritesRejectedQueueSize,
Statistics::kDefaultGroup);
stats->AddTimedVariable(kNumRewritesRejectedInProgress,
Statistics::kDefaultGroup);
stats->AddUpDownCounter(kRewriteQueueSize);
stats->AddUpDownCounter(kNumRewritesRunning);
stats->AddUpDownCounter(kNumRewritesAwaitingRetry);
}
PopularityContestScheduleRewriteController::
~PopularityContestScheduleRewriteController() {
// TODO(cheesy): I think this might not be cleaned up properly in a
// multi-process server, since I doubt we have ordering guarantees about
// workers dying before the supervisor process. I'd like to keep an eye on
// that, so leaving this here.
DCHECK(queue_.Empty());
// Even if queue_ is empty, we may still have leftover AWAITING_RETRY rewrites
// which must be freed.
for (const auto& key_and_rewrite : all_rewrites_) {
Rewrite* rewrite = key_and_rewrite.second;
// This should always be true for AWAITING_RETRY rewrites.
DCHECK(rewrite->callback == nullptr);
if (rewrite->callback != nullptr) {
// This might be scary if the client has already quit. The only other
// option would be to delete it.
rewrite->callback->CallCancel();
}
delete rewrite;
}
}
void PopularityContestScheduleRewriteController::ScheduleRewrite(
const GoogleString& key, Function* callback) {
ScopedMutex lock(mutex_.get());
num_rewrite_requests_->IncBy(1);
CHECK(callback != nullptr);
Rewrite* rewrite = GetRewrite(key);
if (rewrite == nullptr) {
// Too many queued rewrites.
num_rewrites_rejected_queue_size_->IncBy(1);
lock.Release();
callback->CallCancel();
return;
}
if (rewrite->state == RUNNING) {
// The key is already being processed by another worker, so cancel this
// request.
++rewrite->saved_priority;
num_rewrites_rejected_in_progress_->IncBy(1);
lock.Release();
callback->CallCancel();
return;
}
Function* old_callback_to_cancel = nullptr;
if (rewrite->callback != nullptr) {
// There's already another rewrite queued for this rewrite, so cancel
// the old request. We always prefer to hold onto the most recent request
// since workers are not expected to live forever.
old_callback_to_cancel = rewrite->callback;
rewrite->callback = nullptr;
}
int priority = 1;
if (rewrite->state == AWAITING_RETRY) {
// saved_priority is what was left over from the previous failed attempt. It
// may be zero.
priority += rewrite->saved_priority;
rewrite->saved_priority = 0;
retry_queue_.Remove(rewrite);
num_rewrites_awaiting_retry_->Add(-1);
}
rewrite->state = QUEUED;
rewrite->callback = callback;
queue_.IncreasePriority(rewrite, priority);
Function* callback_to_start = AttemptStartRewrite();
// Release the lock and run any oustanding callbacks.
lock.Release();
if (old_callback_to_cancel != nullptr) {
old_callback_to_cancel->CallCancel();
}
if (callback_to_start != nullptr) {
callback_to_start->CallRun();
}
}
void PopularityContestScheduleRewriteController::NotifyRewriteComplete(
const GoogleString& key) {
ScopedMutex lock(mutex_.get());
num_rewrites_succeeded_->IncBy(1);
Rewrite* rewrite = GetRewrite(key);
CHECK(rewrite != nullptr) << "NotifyRewriteComplete called for unknown key: "
<< key;
CHECK_EQ(rewrite->state, RUNNING) << "NotifyRewriteComplete called for key '"
<< key << "' that isn't currently running";
StopRewrite(rewrite);
DeleteRewrite(rewrite);
Function* run_callback = AttemptStartRewrite();
lock.Release();
if (run_callback != nullptr) {
run_callback->CallRun();
}
}
void PopularityContestScheduleRewriteController::NotifyRewriteFailed(
const GoogleString& key) {
ScopedMutex lock(mutex_.get());
num_rewrites_failed_->IncBy(1);
Rewrite* rewrite = GetRewrite(key);
CHECK(rewrite != nullptr) << "NotifyRewriteFailed called for unknown key: "
<< key;
CHECK_EQ(rewrite->state, RUNNING) << "NotifyRewriteFailed called for key '"
<< key << "' that isn't currently running";
// Mark the rewrite as stopped but don't delete it. This ensures
// saved_priority will be set on subsequent retries.
StopRewrite(rewrite);
SaveRewriteForRetry(rewrite);
Function* run_callback = AttemptStartRewrite();
lock.Release();
if (run_callback != nullptr) {
run_callback->CallRun();
}
}
Function* PopularityContestScheduleRewriteController::AttemptStartRewrite() {
if (running_rewrites_ >= max_running_rewrites_ || queue_.Empty()) {
return nullptr;
}
const std::pair<Rewrite* const*, int64>& queue_top = queue_.Top();
Rewrite* rewrite = *queue_top.first;
rewrite->saved_priority = queue_top.second;
DCHECK_EQ(rewrite->state, QUEUED);
queue_.Pop();
return StartRewrite(rewrite);
}
Function* PopularityContestScheduleRewriteController::StartRewrite(
Rewrite* rewrite) {
Function* callback = nullptr;
DCHECK_LT(running_rewrites_, max_running_rewrites_);
DCHECK_NE(rewrite->state, RUNNING);
DCHECK(rewrite->callback != nullptr);
if (rewrite->callback != nullptr) {
rewrite->state = RUNNING;
++running_rewrites_;
num_rewrites_running_->Add(1);
callback = rewrite->callback;
rewrite->callback = nullptr;
} else {
rewrite->state = STOPPED;
}
return callback;
}
void PopularityContestScheduleRewriteController::StopRewrite(
Rewrite* rewrite) {
DCHECK_EQ(rewrite->state, RUNNING);
rewrite->state = STOPPED;
--running_rewrites_;
num_rewrites_running_->Add(-1);
}
void PopularityContestScheduleRewriteController::SaveRewriteForRetry(
Rewrite* rewrite) {
DCHECK_EQ(rewrite->state, STOPPED);
rewrite->state = AWAITING_RETRY;
// Insert the item into retry_queue_ with a priority of "negative now".
// This will cause the queue to be ordered by "oldest first".
int64 priority = -timer_->NowMs();
retry_queue_.IncreasePriority(rewrite, priority);
num_rewrites_awaiting_retry_->Add(1);
}
void PopularityContestScheduleRewriteController::DeleteRewrite(
const Rewrite* rewrite) {
RewriteMap::iterator i = all_rewrites_.find(&rewrite->key);
DCHECK(i != all_rewrites_.end());
if (i != all_rewrites_.end()) {
CHECK_EQ(i->second, rewrite);
all_rewrites_.erase(i);
queue_size_->Add(-1);
}
CHECK_NE(rewrite->state, RUNNING);
DCHECK(rewrite->callback == nullptr);
// If rewrite->callback isn't null, we just leak it since we'd need to drop
// the lock to invoke Cancel.
delete rewrite;
}
PopularityContestScheduleRewriteController::Rewrite*
PopularityContestScheduleRewriteController::GetRewrite(
const GoogleString& key) {
RewriteMap::iterator i = all_rewrites_.find(&key);
if (i == all_rewrites_.end()) {
// This rewrite isn't already queued. Do we have an available queue slot?
ConsiderDroppingRetry();
if (all_rewrites_.size() >= static_cast<size_t>(max_queued_rewrites_)) {
return nullptr;
}
Rewrite* rewrite = new Rewrite(key);
std::pair<RewriteMap::iterator, bool> insert_result =
all_rewrites_.emplace(&rewrite->key, rewrite);
bool was_inserted = insert_result.second;
CHECK(was_inserted);
queue_size_->Add(1);
i = insert_result.first;
}
return i->second;
}
void PopularityContestScheduleRewriteController::ConsiderDroppingRetry() {
// Pop rewrites out of the retry_queue_ until either the retry queue is empty
// or there is an available rewrite slot.
while (all_rewrites_.size() >= static_cast<size_t>(max_queued_rewrites_) &&
!retry_queue_.Empty()) {
Rewrite* rewrite = *retry_queue_.Top().first;
retry_queue_.Pop();
num_rewrites_awaiting_retry_->Add(-1);
rewrite->state = STOPPED;
DeleteRewrite(rewrite);
}
}
void PopularityContestScheduleRewriteController::SetMaxQueueSizeForTesting(
int size) {
ScopedMutex lock(mutex_.get());
max_queued_rewrites_ = size;
}
} // namespace net_instaweb