blob: bc814391d5345f787ad2ea85ab8c064a1ce75123 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.QueryId;
import javax.annotation.concurrent.GuardedBy;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.lang.String.format;
import static java.time.ZoneOffset.UTC;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
public class QueryIdGenerator {
private static final DateTimeFormatter TIMESTAMP_FORMAT =
DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss").withZone(UTC);
private static final long BASE_SYSTEM_TIME_MILLIS = System.currentTimeMillis();
private static final long BASE_NANO_TIME = System.nanoTime();
private final String dataNodeId;
@GuardedBy("this")
private long lastTimeInDays;
@GuardedBy("this")
private long lastTimeInSeconds;
@GuardedBy("this")
private String lastTimestamp;
@GuardedBy("this")
private int counter;
public QueryIdGenerator() {
int dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
checkArgument(dataNodeId != -1, "DataNodeId should be init first!");
this.dataNodeId = String.valueOf(dataNodeId);
}
public String getCoordinatorId() {
return dataNodeId;
}
/**
* Generate next queryId using the following format: {@code YYYYMMDD_hhmmss_index_dataNodeId}
*
* <p>{@code index} rolls at the start of every day or when it is close to reaching {@code
* 99,999}. {@code dataNodeId} is a unique id generated by config node when this data node is
* created.
*/
public synchronized QueryId createNextQueryId() {
long now = nowInMillis();
// restart counter if it is at the limit
if (counter > 99_999) {
// Wait for the timestamp to change to reset the counter to avoid duplicates
// in the very unlikely case we generated all 100,000 in the previous second
while (MILLISECONDS.toSeconds(now) == lastTimeInSeconds) {
long delta = SECONDS.toMillis(lastTimeInSeconds + 1) - now;
sleepUninterruptibly(delta, MILLISECONDS);
now = nowInMillis();
}
counter = 0;
}
// if the second changed since the last ID was generated, generate a new timestamp
if (MILLISECONDS.toSeconds(now) != lastTimeInSeconds) {
// generate new timestamp
lastTimeInSeconds = MILLISECONDS.toSeconds(now);
lastTimestamp = formatEpochMilli(now);
// if the day has rolled over, restart the counter
if (MILLISECONDS.toDays(now) != lastTimeInDays) {
lastTimeInDays = MILLISECONDS.toDays(now);
counter = 0;
}
// restart the counter if it is close to the limit, to avoid sleep on rollover
if (counter >= 90_000) {
counter = 0;
}
}
long index = counter;
counter++;
return new QueryId(format("%s_%05d_%s", lastTimestamp, index, dataNodeId));
}
private static String formatEpochMilli(long milli) {
return TIMESTAMP_FORMAT.format(Instant.ofEpochMilli(milli));
}
protected long nowInMillis() {
// avoid problems with the clock moving backwards
return BASE_SYSTEM_TIME_MILLIS + NANOSECONDS.toMillis(System.nanoTime() - BASE_NANO_TIME);
}
}