blob: 9644e78c19be16021f519576f4dc92eafec241f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.pd.notice;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hugegraph.pd.common.HgAssert;
import lombok.extern.slf4j.Slf4j;
// TODO: merge/move to another package
@Slf4j
public class NoticeBroadcaster {
private final Supplier<Long> noticeSupplier;
private long noticeId;
private String durableId;
private Supplier<String> durableSupplier;
private Function<String, Boolean> removeFunction;
private int state; //0=ready; 1=notified; 2=done ack; -1=error
private int counter;
private long timestamp;
private NoticeBroadcaster(Supplier<Long> noticeSupplier) {
this.noticeSupplier = noticeSupplier;
this.timestamp = System.currentTimeMillis();
}
public static NoticeBroadcaster of(Supplier<Long> noticeSupplier) {
HgAssert.isArgumentNotNull(noticeSupplier, "noticeSupplier");
return new NoticeBroadcaster(noticeSupplier);
}
public NoticeBroadcaster setDurableSupplier(Supplier<String> durableSupplier) {
this.durableSupplier = durableSupplier;
return this;
}
public NoticeBroadcaster setRemoveFunction(Function<String, Boolean> removeFunction) {
this.removeFunction = removeFunction;
return this;
}
public NoticeBroadcaster notifying() {
if (this.state >= 2) {
log.warn("Aborted notifying as ack has done. notice: {}", this);
return this;
}
this.counter++;
if (this.durableId == null && this.durableSupplier != null) {
try {
this.durableId = this.durableSupplier.get();
} catch (Throwable t) {
log.error("Failed to invoke durableSupplier, cause by:", t);
}
}
try {
this.noticeId = this.noticeSupplier.get();
state = 1;
} catch (Throwable t) {
state = -1;
log.error("Failed to invoke noticeSupplier: {}; cause by: " +
this.noticeSupplier.toString(), t);
}
return this;
}
public boolean checkAck(long ackNoticeId) {
boolean flag = false;
if (this.noticeId == ackNoticeId) {
flag = true;
this.state = 2;
}
if (flag) {
this.doRemoveDurable();
}
return flag;
}
public boolean doRemoveDurable() {
log.info("Removing NoticeBroadcaster is stating, noticeId:{}, durableId: {}"
, this.noticeId, this.durableId);
boolean flag = false;
if (this.removeFunction == null) {
log.warn("The remove-function hasn't been set.");
return false;
}
if (this.durableId == null) {
log.warn("The durableId hasn't been set.");
return false;
}
try {
if (!(flag = this.removeFunction.apply(this.durableId))) {
log.error("Removing NoticeBroadcaster was not complete, noticeId: {}, durableId: {}"
, this.noticeId, this.durableId);
}
} catch (Throwable t) {
log.error("Failed to remove NoticeBroadcaster, noticeId: "
+ this.noticeId + ", durableId: " + this.durableId + ". Cause by:", t);
}
return flag;
}
public long getNoticeId() {
return noticeId;
}
public int getState() {
return state;
}
public int getCounter() {
return counter;
}
public String getDurableId() {
return durableId;
}
public void setDurableId(String durableId) {
if (HgAssert.isInvalid(durableId)) {
log.warn("Set an invalid durable-id to NoticeBroadcaster.");
}
this.durableId = durableId;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "NoticeBroadcaster{" +
"noticeId=" + noticeId +
", durableId='" + durableId + '\'' +
", state=" + state +
", counter=" + counter +
", timestamp=" + timestamp +
'}';
}
}