blob: a6dd017cfe8e6f8ea943246cec47388941130d66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.index;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.Calendar;
import java.util.Set;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import org.apache.jackrabbit.util.ISO8601;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RateLimitUtils {
private final static Logger LOG = LoggerFactory.getLogger(RateLimitUtils.class);
// when getOldestAsyncIndexUpdate was called the last time
private static Instant lastJmxBeanCall = Instant.now();
// the return value of the last call to getOldestAsyncIndexUpdate
private static Instant cachedOldestAsyncUpdate = Instant.now();
// the last time the rateLimitWrites was called
private static Instant lastRateLimitCall = Instant.now();
private RateLimitUtils() {
}
/**
* Rate limit writes in case indexes are lagging behind too far. The method
* returns immediately if all async indexes are up-to-date (updated in the last
* 30 seconds).
*
* If indexing lanes are lagging behind, however, the method will wait (using
* Thread.sleep) for at most 1 minute. If the method is called more than once
* per minute, it will sleep for at most the time that passed until the last
* call; that is, an application that is calling it a lot will be paused for up
* to 50%. This assumes indexes will be able to catch up in this situation.
*
* @return the number of milliseconds the call was sleeping
*/
public static long rateLimitWrites() {
Instant now = Instant.now();
Duration delay = Duration.between(getOldestAsyncIndexUpdate(now), now);
return rateLimit(now, delay, true);
}
static long rateLimit(Instant now, Duration delay, boolean callThreadSleep) {
long sleep;
if (delay.getSeconds() < 30) {
// less than 30 seconds: no need to wait
sleep = 0;
} else {
if (delay.toMinutes() > 60) {
LOG.warn("Indexing is delayed for {} minutes", delay.toMinutes());
}
// sleep for as long as the duration between the last call and this call
sleep = Duration.between(lastRateLimitCall, now).toMillis();
// sleep at most 1 minute
sleep = Math.min(sleep, Duration.ofMinutes(1).toMillis());
// ensure we don't try to sleep negative values (it would throw IllegalArgumentException)
sleep = Math.max(0, sleep);
if (callThreadSleep) {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
// ignore
}
}
now = now.plus(Duration.ofMillis(sleep));
}
lastRateLimitCall = now;
return sleep;
}
/**
* Get the timestamp of the oldest indexing lane. In the normal case, this is at
* most 5 seconds ago, if indexing lanes are updated every 5 seconds. If
* indexing is delayed, or paused, this can be in the past.
*
* The method can be called often without issues. If the last call was less than
* one second ago, the last returned value is returned. This is to avoid
* unnecessary calls to the JMX beans.
*
* @param now the current time
* @return the timestamp of the oldest indexing lane
*/
private static Instant getOldestAsyncIndexUpdate(Instant now) {
if (Duration.between(lastJmxBeanCall, now).getSeconds() < 1) {
return cachedOldestAsyncUpdate;
}
lastJmxBeanCall = now;
Instant oldestAsyncUpdate = now;
MBeanServerConnection server = ManagementFactory.getPlatformMBeanServer();
try {
Set<ObjectName> beanNames;
beanNames = server.queryNames(new ObjectName(
"org.apache.jackrabbit.oak:type=IndexStats,*"), null);
for (ObjectName objectName : beanNames) {
Object time = server.getAttribute(objectName, "Done");
if (time == null || time.toString().isEmpty()) {
time = server.getAttribute(objectName, "LastIndexedTime");
}
if (time != null && !time.toString().isEmpty()) {
Calendar cal = ISO8601.parse(time.toString());
Instant doneInstant = Instant.ofEpochMilli(cal.getTimeInMillis());
if (doneInstant.isBefore(oldestAsyncUpdate)) {
oldestAsyncUpdate = doneInstant;
}
}
}
} catch (Exception e) {
LOG.warn("Could not retrieve the async lane state", e);
}
cachedOldestAsyncUpdate = oldestAsyncUpdate;
return oldestAsyncUpdate;
}
}