registry ..
diff --git a/README.md b/README.md
index cc8612e..03110cd 100644
--- a/README.md
+++ b/README.md
@@ -81,7 +81,7 @@
When the user is managing changes in the background,
- Soul dynamically updates the cache by listening to the zookeeper node, websocket push,http longPull.
+ Soul dynamically updates the cache by listening to the zookeeper ttlCacheTimerTask, websocket push,http longPull.

diff --git a/soul-common/src/main/java/org/dromara/soul/common/cache/HashedWheelTimer.java b/soul-common/src/main/java/org/dromara/soul/common/cache/HashedWheelTimer.java
deleted file mode 100644
index 20a23bb..0000000
--- a/soul-common/src/main/java/org/dromara/soul/common/cache/HashedWheelTimer.java
+++ /dev/null
@@ -1,657 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.dromara.soul.common.cache;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import org.dromara.soul.common.utils.OsUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Loop time implementation.
- * io.netty.util.HashedWheelTimer
- *
- * @author sixh
- */
-public class HashedWheelTimer implements Timer {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(HashedWheelTimer.class);
-
- /**
- * 状态更新.
- */
- private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER;
-
-
- /**
- * 任务执行状态.
- */
- private static final int WORKER_STATE_INIT = 0;
-
- /**
- * 任务已启动.
- */
- private static final int WORKER_STATE_STARTED = 1;
-
- /**
- * 任务已结束.
- */
- private static final int WORKER_STATE_SHUTDOWN = 2;
-
- /**
- * 时间分片槽,默认512.
- */
- private final HashedWheelTimer.HashedWheelBucket[] wheel;
-
- /**
- * 执行时间分片槽的次数; {@link #wheel}长度 -1.
- */
- private final int mask;
-
- /**
- * 时间分片执行时间.
- */
- private final long tickDuration;
-
- /**
- * 执行线程.
- */
- private final Thread workerThread;
-
- /**
- * 任务执行实现.
- */
- private final HashedWheelTimer.Worker worker = new HashedWheelTimer.Worker();
-
- /**
- * 开始时间.
- */
- private volatile long startTime;
-
- /**
- * 开始时间初始化.
- */
- private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
-
-
- /**
- * 超时器.
- */
- private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<>();
-
- /**
- * 取消超时器.
- */
- private final Queue<HashedWheelTimeout> cancelledTimeouts = new ConcurrentLinkedQueue<>();
-
- static {
- WORKER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
- }
-
- /**
- * 0 - init, 1 - started, 2 - shut down.
- */
- @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
- private volatile int workerState = WORKER_STATE_INIT;
-
- /**
- * 初始化一个环形时间执行器.
- *
- * @param threadFactory 线程初始化工厂;
- */
- HashedWheelTimer(ThreadFactory threadFactory) {
- this(threadFactory, 100, TimeUnit.MILLISECONDS, 512);
- }
-
- /**
- * 初始化一个环形时间执行器.
- *
- * @param threadFactory 线程初始化工厂;
- * @param tickDuration 持续执行时间;
- * @param unit 时间单位;
- * @param ticksPerWheel 时间分片数;
- */
- private HashedWheelTimer(ThreadFactory threadFactory,
- long tickDuration, TimeUnit unit, int ticksPerWheel) {
- /*
- * 创建时间分片区;
- */
- this.wheel = createWheel(ticksPerWheel);
- //执行时间分片槽的次数;
- mask = wheel.length - 1;
- //把所有时间转为纳秒进方便操作;
- this.tickDuration = unit.toNanos(tickDuration);
- // Prevent overflow.
- if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
- throw new IllegalArgumentException(String.format(
- "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
- tickDuration, Long.MAX_VALUE / wheel.length));
- }
- workerThread = threadFactory.newThread(worker);
- }
-
- /**
- * 启动任务.
- */
- private void start() {
- switch (WORKER_STATE_UPDATER.get(this)) {
- //更改线程状态;如果当前的状态是初始,则改成启动中;
- case WORKER_STATE_INIT:
- if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
- //启动线程;
- workerThread.start();
- }
- break;
- case WORKER_STATE_STARTED:
- break;
- case WORKER_STATE_SHUTDOWN:
- throw new IllegalStateException("cannot be started once stopped");
- default:
- throw new Error("Invalid WorkerState");
- }
-
- // 等待初始化的Worker线程的startTime实始化;
- while (startTime == 0) {
- try {
- startTimeInitialized.await();
- } catch (InterruptedException ignore) {
- // Ignore - it will be ready very soon.
- }
- }
- }
-
- private static HashedWheelTimer.HashedWheelBucket[] createWheel(int ticksPerWheel) {
- int newTicksPerWheel = ticksPerWheel;
- if (newTicksPerWheel <= 0) {
- throw new IllegalArgumentException(
- "ticksPerWheel must be greater than 0: " + newTicksPerWheel);
- }
- if (newTicksPerWheel > 1073741824) {
- throw new IllegalArgumentException(
- "ticksPerWheel may not be greater than 2^30: " + newTicksPerWheel);
- }
-
- newTicksPerWheel = normalizeTicksPerWheel(newTicksPerWheel);
- HashedWheelTimer.HashedWheelBucket[] wheel = new HashedWheelTimer.HashedWheelBucket[newTicksPerWheel];
- for (int i = 0; i < wheel.length; i++) {
- wheel[i] = new HashedWheelTimer.HashedWheelBucket();
- }
- return wheel;
- }
-
- private static int normalizeTicksPerWheel(int ticksPerWheel) {
- int normalizedTicksPerWheel = 1;
- while (normalizedTicksPerWheel < ticksPerWheel) {
- normalizedTicksPerWheel <<= 1;
- }
- return normalizedTicksPerWheel;
- }
-
- @Override
- public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
- if (task == null) {
- throw new NullPointerException("task");
- }
- if (unit == null) {
- throw new NullPointerException("unit");
- }
- start();
- /*
- * 设置一下超时作务到列队中;计算出下一次的超时时间;
- */
- long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
- HashedWheelTimer.HashedWheelTimeout timeout = new HashedWheelTimer.HashedWheelTimeout(this, task, deadline);
- timeouts.add(timeout);
- return timeout;
- }
-
- @Override
- public Set<Timeout> stop() {
- if (Thread.currentThread() == workerThread) {
- throw new IllegalStateException(
- HashedWheelTimer.class.getSimpleName() +
- ".stop() cannot be called from " +
- TimerTask.class.getSimpleName());
- }
-
- if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
- // workerState can be 0 or 2 at this moment - let it always be 2.
- WORKER_STATE_UPDATER.set(this, WORKER_STATE_SHUTDOWN);
- return Collections.emptySet();
- }
-
- boolean interrupted = false;
- while (workerThread.isAlive()) {
- workerThread.interrupt();
- try {
- workerThread.join(100);
- } catch (InterruptedException ignored) {
- interrupted = true;
- }
- }
-
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
-
- return worker.unprocessedTimeouts();
- }
-
- private final class Worker implements Runnable {
- private final Set<Timeout> unprocessedTimeouts = new HashSet<>();
-
- /**
- * 到达一个槽就是++.
- */
- private long tick;
-
- @Override
- public void run() {
- //初始化启动开始时间;
- startTime = System.nanoTime();
- if (startTime == 0) {
- // 因为startTime使用0来判断是否进行初始化,所以要保证他的初始化不为0;
- startTime = 1;
- }
- //结束执行start()方法;
- startTimeInitialized.countDown();
-
- do {
- //计算下一个到达的时间片;如果没有到达则需要等待;
- final long deadline = waitForNextTick();
- if (deadline > 0) {
- int idx = (int) (tick & mask);
- processCancelledTasks();
- //取出一个时间分片槽
- HashedWheelTimer.HashedWheelBucket bucket =
- wheel[idx];
- //转换时间分片槽。并将数据放入到时间分片槽上;
- transferTimeoutsToBuckets();
- bucket.expireTimeouts(deadline);
- tick++;
- }
- } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
-
- // 如果调用Stop方法,就会填充未超时的Task将他返回给用户;
- for (HashedWheelTimer.HashedWheelBucket bucket : wheel) {
- bucket.clearTimeouts(unprocessedTimeouts);
- }
- for (; ; ) {
- HashedWheelTimer.HashedWheelTimeout timeout = timeouts.poll();
- if (timeout == null) {
- break;
- }
- if (!timeout.isCancelled()) {
- unprocessedTimeouts.add(timeout);
- }
- }
- processCancelledTasks();
- }
-
- private void transferTimeoutsToBuckets() {
- // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
- // adds new timeouts in a loop.
- for (int i = 0; i < 100000; i++) {
- //获取一个作务;
- HashedWheelTimer.HashedWheelTimeout timeout = timeouts.poll();
- if (timeout == null) {
- // all processed
- break;
- }
- if (timeout.state() == HashedWheelTimer.HashedWheelTimeout.ST_CANCELLED) {
- // Was cancelled in the meantime.
- continue;
- }
- //需要轮转多少次,就可以到达时间分片上,(超时时间/任务分片轮转时间)
- long calculated = timeout.deadline / tickDuration;
- //计算还有剩余多少轮没有执行;
- timeout.remainingRounds = (calculated - tick) / wheel.length;
- // Ensure we don't schedule for past.
- final long ticks = Math.max(calculated, tick);
- //算出任务应该插入的时间分片槽位置;
- int stopIndex = (int) (ticks & mask);
- HashedWheelTimer.HashedWheelBucket bucket = wheel[stopIndex];
- bucket.addTimeout(timeout);
- }
- }
-
- /**
- * 流程任务取消;从取消队列中获取任务。并删除.
- */
- private void processCancelledTasks() {
- for (; ; ) {
- HashedWheelTimer.HashedWheelTimeout timeout = cancelledTimeouts.poll();
- if (timeout == null) {
- // all processed
- break;
- }
- try {
- timeout.remove();
- } catch (Throwable t) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn("An exception was thrown while process a cancellation task", t);
- }
- }
- }
- }
-
- /**
- * calculate goal nanoTime from startTime and current tick number,
- * then wait until that goal has been reached.
- *
- * @return Long.MIN_VALUE if received a shutdown request,
- * current time otherwise (with Long.MIN_VALUE changed by +1).
- */
- private long waitForNextTick() {
- //计算时间片;例:1000000*(1+1)
- long deadline = tickDuration * (tick + 1);
-
- for (; ; ) {
- //得到当前已经耗时;
- final long currentTime = System.nanoTime() - startTime;
- //得到休眠的时间毫秒数;
- long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
-
- if (sleepTimeMs <= 0) {
- if (currentTime == Long.MIN_VALUE) {
- return -Long.MAX_VALUE;
- } else {
- return currentTime;
- }
- }
-
- // Check if we run on windows, as if thats the case we will need
- // to round the sleepTime as workaround for a bug that only affect
- // the JVM if it runs on windows.
- //
- // See https://github.com/netty/netty/issues/356
- if (OsUtils.isWindows()) {
- sleepTimeMs = sleepTimeMs / 10 * 10;
- }
-
- try {
- TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
- } catch (InterruptedException ignored) {
- if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
- return Long.MIN_VALUE;
- }
- }
- }
- }
-
- Set<Timeout> unprocessedTimeouts() {
- return Collections.unmodifiableSet(unprocessedTimeouts);
- }
- }
-
- /**
- * 超时任务对象.
- */
- private static final class HashedWheelTimeout implements Timeout {
-
- private static final int ST_INIT = 0;
-
- private static final int ST_CANCELLED = 1;
-
- private static final int ST_EXPIRED = 2;
-
- private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER;
-
- static {
- AtomicIntegerFieldUpdater<HashedWheelTimeout> updater =
- AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.HashedWheelTimeout.class, "state");
- STATE_UPDATER = updater;
- }
-
- private final HashedWheelTimer timer;
-
- private final TimerTask task;
-
- private final long deadline;
-
- @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
- private volatile int state = ST_INIT;
-
- /**
- * remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
- * HashedWheelTimeout will be added to the correct HashedWheelBucket.
- */
- private long remainingRounds;
-
- /**
- * This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
- * As only the workerThread will act on it there is no need for synchronization / volatile.
- */
- private HashedWheelTimer.HashedWheelTimeout next;
-
- private HashedWheelTimer.HashedWheelTimeout prev;
-
- /**
- * The bucket to which the timeout was added
- */
- private HashedWheelTimer.HashedWheelBucket bucket;
-
- HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
- this.timer = timer;
- this.task = task;
- this.deadline = deadline;
- }
-
- @Override
- public Timer timer() {
- return timer;
- }
-
- @Override
- public TimerTask task() {
- return task;
- }
-
- @Override
- public long deadline() {
- return deadline;
- }
-
- @Override
- public boolean cancel() {
- // only update the state it will be removed from HashedWheelBucket on next tick.
- if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
- return false;
- }
- // If a task should be canceled we put this to another queue which will be processed on each tick.
- // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
- // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
- timer.cancelledTimeouts.add(this);
- return true;
- }
-
- void remove() {
- HashedWheelTimer.HashedWheelBucket bucket = this.bucket;
- if (bucket != null) {
- bucket.remove(this);
- }
- }
-
- boolean compareAndSetState(int expected, int state) {
- return STATE_UPDATER.compareAndSet(this, expected, state);
- }
-
- int state() {
- return state;
- }
-
- @Override
- public boolean isCancelled() {
- return state() == ST_CANCELLED;
- }
-
- @Override
- public boolean isExpired() {
- return state() == ST_EXPIRED;
- }
-
- void expire() {
- if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
- return;
- }
-
- try {
- task.run(this);
- } catch (Throwable t) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
- }
- }
- }
-
- @Override
- public boolean isDefault() {
- return false;
- }
- }
-
- /**
- * 每个分片槽封装对象,
- * 这里使用的是双向队列.
- */
- private static final class HashedWheelBucket {
- private HashedWheelTimeout head;
-
- private HashedWheelTimeout tail;
-
- /**
- * Add {@link HashedWheelTimeout} to this bucket.
- */
- void addTimeout(HashedWheelTimeout timeout) {
- assert timeout.bucket == null;
- timeout.bucket = this;
- if (head == null) {
- head = tail = timeout;
- } else {
- tail.next = timeout;
- timeout.prev = tail;
- tail = timeout;
- }
- }
-
- /**
- * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
- */
- void expireTimeouts(long deadline) {
- HashedWheelTimeout timeout = head;
- // process all timeouts
- while (timeout != null) {
- boolean remove = false;
- if (timeout.remainingRounds <= 0) {
- if (timeout.deadline <= deadline) {
- timeout.expire();
- } else {
- // The timeout was placed into a wrong slot. This should never happen.
- throw new IllegalStateException(String.format(
- "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
- }
- remove = true;
- } else if (timeout.isCancelled()) {
- remove = true;
- } else {
- timeout.remainingRounds--;
- }
- // store reference to next as we may null out timeout.next in the remove block.
- HashedWheelTimeout next = timeout.next;
- if (remove) {
- remove(timeout);
- }
- timeout = next;
- }
- }
-
- void remove(HashedWheelTimeout timeout) {
- HashedWheelTimeout next = timeout.next;
- // remove timeout that was either processed or cancelled by updating the linked-list
- if (timeout.prev != null) {
- timeout.prev.next = next;
- }
- if (timeout.next != null) {
- timeout.next.prev = timeout.prev;
- }
-
- if (timeout == head) {
- // if timeout is also the tail we need to adjust the entry too
- if (timeout == tail) {
- tail = null;
- head = null;
- } else {
- head = next;
- }
- } else if (timeout == tail) {
- // if the timeout is the tail modify the tail to be the prev node.
- tail = timeout.prev;
- }
- // null out prev, next and bucket to allow for GC.
- timeout.prev = null;
- timeout.next = null;
- timeout.bucket = null;
- }
-
- /**
- * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
- */
- void clearTimeouts(Set<Timeout> set) {
- for (; ; ) {
- HashedWheelTimeout timeout = pollTimeout();
- if (timeout == null) {
- return;
- }
- if (timeout.isExpired() || timeout.isCancelled()) {
- continue;
- }
- set.add(timeout);
- }
- }
-
- private HashedWheelTimeout pollTimeout() {
- HashedWheelTimeout head = this.head;
- if (head == null) {
- return null;
- }
- HashedWheelTimeout next = head.next;
- if (next == null) {
- tail = this.head = null;
- } else {
- this.head = next;
- next.prev = null;
- }
-
- // null out prev and next to allow for GC.
- head.next = null;
- head.prev = null;
- head.bucket = null;
- return head;
- }
- }
-
-}
diff --git a/soul-common/src/main/java/org/dromara/soul/common/cache/Timeout.java b/soul-common/src/main/java/org/dromara/soul/common/cache/Timeout.java
deleted file mode 100644
index 52fbbaf..0000000
--- a/soul-common/src/main/java/org/dromara/soul/common/cache/Timeout.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.dromara.soul.common.cache;
-
-/**
- * timeout.
- *
- * @author chenbinl
- * @see HashedWheelTimer;
- */
-public interface Timeout {
-
- /**
- * Returns the {@link Timer} that created this handle.
- */
- Timer timer();
-
- /**
- * Returns the {@link TimerTask} which is associated with this handle.
- */
- TimerTask task();
-
- /**
- * Returns {@code true} if and only if the {@link TimerTask} associated
- * with this handle has been expired.
- */
- boolean isExpired();
-
- /**
- * Returns {@code true} if and only if the {@link TimerTask} associated
- * with this handle has been cancelled.
- */
- boolean isCancelled();
-
- /**
- * Attempts to cancel the {@link TimerTask} associated with this handle.
- * If the task has been executed or cancelled already, it will return with
- * no side effect.
- *
- * @return True if the cancellation completed successfully, otherwise false
- */
- boolean cancel();
-
- /**
- * 是否已经最后执行期限.
- *
- * @return true or false.
- */
- long deadline();
-
- /**
- * 是否为自定义的一个timeout.
- *
- * @return true or false.
- */
- default boolean isDefault() {
- return true;
- }
-}
\ No newline at end of file
diff --git a/soul-common/src/main/java/org/dromara/soul/common/cache/Timer.java b/soul-common/src/main/java/org/dromara/soul/common/cache/Timer.java
deleted file mode 100644
index fbe8eda..0000000
--- a/soul-common/src/main/java/org/dromara/soul/common/cache/Timer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.dromara.soul.common.cache;
-
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Timer.
- * @author chenbin
- */
-public interface Timer {
-
- /**
- * Schedules the specified {@link TimerTask} for one-time execution after
- * the specified delay.
- * @param task timeTask
- * @param delay delay time;
- * @param unit time unit;
- * @return a handle which is associated with the specified task
- *
- * @throws IllegalStateException if this timer has been
- * {@linkplain #stop() stopped} already
- */
- Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
-
- /**
- * Releases all resources acquired by this {@link Timer} and cancels all
- * tasks which were scheduled but not executed yet.
- *
- * @return the handles associated with the tasks which were canceled by
- * this method
- */
- Set<Timeout> stop();
-}
diff --git a/soul-common/src/main/java/org/dromara/soul/common/cache/TimerTask.java b/soul-common/src/main/java/org/dromara/soul/common/cache/TimerTask.java
deleted file mode 100644
index 7060f23..0000000
--- a/soul-common/src/main/java/org/dromara/soul/common/cache/TimerTask.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.dromara.soul.common.cache;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * A task which is executed after the delay specified with
- * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
- *
- * @author chenbin
- */
-public interface TimerTask {
-
- /**
- * Executed after the delay specified with
- * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
- *
- * @param timeout a handle which is associated with this task
- */
- void run(Timeout timeout);
-}
diff --git a/soul-common/src/main/java/org/dromara/soul/common/cache/TtlCache.java b/soul-common/src/main/java/org/dromara/soul/common/cache/TtlCache.java
index b3ebac7..fb62939 100644
--- a/soul-common/src/main/java/org/dromara/soul/common/cache/TtlCache.java
+++ b/soul-common/src/main/java/org/dromara/soul/common/cache/TtlCache.java
@@ -18,8 +18,12 @@
package org.dromara.soul.common.cache;
-import java.util.concurrent.TimeUnit;
import org.dromara.soul.common.concurrent.SoulThreadFactory;
+import org.dromara.soul.common.timer.HashedWheelTimer;
+import org.dromara.soul.common.timer.Timeout;
+import org.dromara.soul.common.timer.TimerTask;
+
+import java.util.concurrent.TimeUnit;
/**
* 实现一个关于时间的过期的缓存;当缓存的对象到期后,
@@ -94,14 +98,14 @@
* @return Timeout
*/
public Timeout put(K k, V v, long expire, TimeUnit unit) {
- Node node = new Node(k, v, expire, unit);
- return timer.newTimeout(node, expire, unit);
+ TtlCacheTimerTask ttlCacheTimerTask = new TtlCacheTimerTask(k, v, expire, unit);
+ return timer.newTimeout(ttlCacheTimerTask, expire, unit);
}
/**
* 保存.
*/
- public class Node implements TimerTask {
+ public class TtlCacheTimerTask implements TimerTask {
/**
* 缓存的KEY.
*/
@@ -135,7 +139,7 @@
* @param expire 默认等待时间;
* @param unit 默认等时间的单位;
*/
- public Node(K key, V value, long expire, TimeUnit unit) {
+ TtlCacheTimerTask(K key, V value, long expire, TimeUnit unit) {
this.key = key;
this.value = value;
time = System.nanoTime();
diff --git a/soul-common/src/main/java/org/dromara/soul/common/timer/HashedWheelTimer.java b/soul-common/src/main/java/org/dromara/soul/common/timer/HashedWheelTimer.java
new file mode 100644
index 0000000..4e43b72
--- /dev/null
+++ b/soul-common/src/main/java/org/dromara/soul/common/timer/HashedWheelTimer.java
@@ -0,0 +1,829 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.dromara.soul.common.timer;
+
+
+import org.dromara.soul.common.utils.OsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A {@link Timer} optimized for approximated I/O timeout scheduling.
+ *
+ * <h3>Tick Duration</h3>
+ * <p>
+ * As described with 'approximated', this timer does not execute the scheduled
+ * {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will
+ * check if there are any {@link TimerTask}s behind the schedule and execute
+ * them.
+ * <p>
+ * You can increase or decrease the accuracy of the execution timing by
+ * specifying smaller or larger tick duration in the constructor. In most
+ * network applications, I/O timeout does not need to be accurate. Therefore,
+ * the default tick duration is 100 milliseconds and you will not need to try
+ * different configurations in most cases.
+ *
+ * <h3>Ticks per Wheel (Wheel Size)</h3>
+ * <p>
+ * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
+ * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
+ * function is 'dead line of the task'. The default number of ticks per wheel
+ * (i.e. the size of the wheel) is 512. You could specify a larger value
+ * if you are going to schedule a lot of timeouts.
+ *
+ * <h3>Do not create many instances.</h3>
+ * <p>
+ * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
+ * started. Therefore, you should make sure to create only one instance and
+ * share it across your application. One of the common mistakes, that makes
+ * your application unresponsive, is to create a new instance for every connection.
+ *
+ * <h3>Implementation Details</h3>
+ * <p>
+ * {@link HashedWheelTimer} is based on
+ * <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
+ * Tony Lauck's paper,
+ * <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
+ * and Hierarchical Timing Wheels: data structures to efficiently implement a
+ * timer facility'</a>. More comprehensive slides are located
+ * <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
+ */
+public class HashedWheelTimer implements Timer {
+
+ static final Logger logger =
+ LoggerFactory.getLogger(HashedWheelTimer.class);
+
+ private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
+ private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
+ private static final int INSTANCE_COUNT_LIMIT = 64;
+ private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
+
+
+ private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
+
+ private final Worker worker = new Worker();
+ private final Thread workerThread;
+
+ public static final int WORKER_STATE_INIT = 0;
+ public static final int WORKER_STATE_STARTED = 1;
+ public static final int WORKER_STATE_SHUTDOWN = 2;
+ @SuppressWarnings({"unused", "FieldMayBeFinal"})
+ private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
+
+ private final long tickDuration;
+ private final HashedWheelBucket[] wheel;
+ private final int mask;
+ private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
+ private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<>();
+ private final Queue<HashedWheelTimeout> cancelledTimeouts = new ConcurrentLinkedDeque<>();
+ private final AtomicLong pendingTimeouts = new AtomicLong(0);
+ private final long maxPendingTimeouts;
+
+ private volatile long startTime;
+
+ /**
+ * Creates a new timer with the default thread factory
+ * ({@link Executors#defaultThreadFactory()}), default tick duration, and
+ * default number of ticks per wheel.
+ */
+ public HashedWheelTimer() {
+ this(Executors.defaultThreadFactory());
+ }
+
+ /**
+ * Creates a new timer with the default thread factory
+ * ({@link Executors#defaultThreadFactory()}) and default number of ticks
+ * per wheel.
+ *
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @throws NullPointerException if {@code unit} is {@code null}
+ * @throws IllegalArgumentException if {@code tickDuration} is <= 0
+ */
+ public HashedWheelTimer(long tickDuration, TimeUnit unit) {
+ this(Executors.defaultThreadFactory(), tickDuration, unit);
+ }
+
+ /**
+ * Creates a new timer with the default thread factory
+ * ({@link Executors#defaultThreadFactory()}).
+ *
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @param ticksPerWheel the size of the wheel
+ * @throws NullPointerException if {@code unit} is {@code null}
+ * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
+ */
+ public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
+ this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
+ }
+
+ /**
+ * Creates a new timer with the default tick duration and default number of
+ * ticks per wheel.
+ *
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated to
+ * {@link TimerTask} execution.
+ * @throws NullPointerException if {@code threadFactory} is {@code null}
+ */
+ public HashedWheelTimer(ThreadFactory threadFactory) {
+ this(threadFactory, 100, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a new timer with the default number of ticks per wheel.
+ *
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated to
+ * {@link TimerTask} execution.
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
+ * @throws IllegalArgumentException if {@code tickDuration} is <= 0
+ */
+ public HashedWheelTimer(
+ ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
+ this(threadFactory, tickDuration, unit, 512);
+ }
+
+ /**
+ * Creates a new timer.
+ *
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated to
+ * {@link TimerTask} execution.
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @param ticksPerWheel the size of the wheel
+ * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
+ * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
+ */
+ public HashedWheelTimer(
+ ThreadFactory threadFactory,
+ long tickDuration, TimeUnit unit, int ticksPerWheel) {
+ this(threadFactory, tickDuration, unit, ticksPerWheel, true);
+ }
+
+ /**
+ * Creates a new timer.
+ *
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated to
+ * {@link TimerTask} execution.
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @param ticksPerWheel the size of the wheel
+ * @param leakDetection {@code true} if leak detection should be enabled always,
+ * if false it will only be enabled if the worker thread is not
+ * a daemon thread.
+ * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
+ * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
+ */
+ public HashedWheelTimer(
+ ThreadFactory threadFactory,
+ long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
+ this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
+ }
+
+ /**
+ * Creates a new timer.
+ *
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated to
+ * {@link TimerTask} execution.
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @param ticksPerWheel the size of the wheel
+ * @param leakDetection {@code true} if leak detection should be enabled always,
+ * if false it will only be enabled if the worker thread is not
+ * a daemon thread.
+ * @param maxPendingTimeouts The maximum number of pending timeouts after which call to
+ * {@code newTimeout} will result in
+ * {@link java.util.concurrent.RejectedExecutionException}
+ * being thrown. No maximum pending timeouts limit is assumed if
+ * this value is 0 or negative.
+ * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
+ * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
+ */
+ public HashedWheelTimer(
+ ThreadFactory threadFactory,
+ long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
+ long maxPendingTimeouts) {
+
+ if (threadFactory == null) {
+ throw new NullPointerException("threadFactory");
+ }
+ if (unit == null) {
+ throw new NullPointerException("unit");
+ }
+ if (tickDuration <= 0) {
+ throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
+ }
+ if (ticksPerWheel <= 0) {
+ throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
+ }
+
+ // Normalize ticksPerWheel to power of two and initialize the wheel.
+ wheel = createWheel(ticksPerWheel);
+ mask = wheel.length - 1;
+
+ // Convert tickDuration to nanos.
+ long duration = unit.toNanos(tickDuration);
+
+ // Prevent overflow.
+ if (duration >= Long.MAX_VALUE / wheel.length) {
+ throw new IllegalArgumentException(String.format(
+ "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
+ tickDuration, Long.MAX_VALUE / wheel.length));
+ }
+
+ if (duration < MILLISECOND_NANOS) {
+ logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",
+ tickDuration, MILLISECOND_NANOS);
+ this.tickDuration = MILLISECOND_NANOS;
+ } else {
+ this.tickDuration = duration;
+ }
+
+ workerThread = threadFactory.newThread(worker);
+
+ this.maxPendingTimeouts = maxPendingTimeouts;
+
+ if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
+ WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
+ reportTooManyInstances();
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ try {
+ super.finalize();
+ } finally {
+ // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
+ // we have not yet shutdown then we want to make sure we decrement the active instance count.
+ if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
+ INSTANCE_COUNTER.decrementAndGet();
+ }
+ }
+ }
+
+ private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
+ if (ticksPerWheel <= 0) {
+ throw new IllegalArgumentException(
+ "ticksPerWheel must be greater than 0: " + ticksPerWheel);
+ }
+ if (ticksPerWheel > 1073741824) {
+ throw new IllegalArgumentException(
+ "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
+ }
+
+ ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
+ HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
+ for (int i = 0; i < wheel.length; i++) {
+ wheel[i] = new HashedWheelBucket();
+ }
+ return wheel;
+ }
+
+ private static int normalizeTicksPerWheel(int ticksPerWheel) {
+ int normalizedTicksPerWheel = 1;
+ while (normalizedTicksPerWheel < ticksPerWheel) {
+ normalizedTicksPerWheel <<= 1;
+ }
+ return normalizedTicksPerWheel;
+ }
+
+ /**
+ * Starts the background thread explicitly. The background thread will
+ * start automatically on demand even if you did not call this method.
+ *
+ * @throws IllegalStateException if this timer has been
+ * {@linkplain #stop() stopped} already
+ */
+ public void start() {
+ switch (WORKER_STATE_UPDATER.get(this)) {
+ case WORKER_STATE_INIT:
+ if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
+ workerThread.start();
+ }
+ break;
+ case WORKER_STATE_STARTED:
+ break;
+ case WORKER_STATE_SHUTDOWN:
+ throw new IllegalStateException("cannot be started once stopped");
+ default:
+ throw new Error("Invalid WorkerState");
+ }
+
+ // Wait until the startTime is initialized by the worker.
+ while (startTime == 0) {
+ try {
+ startTimeInitialized.await();
+ } catch (InterruptedException ignore) {
+ // Ignore - it will be ready very soon.
+ }
+ }
+ }
+
+ @Override
+ public Set<Timeout> stop() {
+ if (Thread.currentThread() == workerThread) {
+ throw new IllegalStateException(
+ HashedWheelTimer.class.getSimpleName() +
+ ".stop() cannot be called from " +
+ TimerTask.class.getSimpleName());
+ }
+
+ if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
+ // workerState can be 0 or 2 at this moment - let it always be 2.
+ if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
+ INSTANCE_COUNTER.decrementAndGet();
+ }
+
+ return Collections.emptySet();
+ }
+
+ try {
+ boolean interrupted = false;
+ while (workerThread.isAlive()) {
+ workerThread.interrupt();
+ try {
+ workerThread.join(100);
+ } catch (InterruptedException ignored) {
+ interrupted = true;
+ }
+ }
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ } finally {
+ INSTANCE_COUNTER.decrementAndGet();
+ }
+ return worker.unprocessedTimeouts();
+ }
+
+ @Override
+ public boolean isStop() {
+ return WORKER_STATE_SHUTDOWN == WORKER_STATE_UPDATER.get(this);
+ }
+
+ @Override
+ public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
+ if (task == null) {
+ throw new NullPointerException("task");
+ }
+ if (unit == null) {
+ throw new NullPointerException("unit");
+ }
+
+ long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
+
+ if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
+ pendingTimeouts.decrementAndGet();
+ throw new RejectedExecutionException("Number of pending timeouts ("
+ + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ + "timeouts (" + maxPendingTimeouts + ")");
+ }
+
+ start();
+
+ // Add the timeout to the timeout queue which will be processed on the next tick.
+ // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
+ long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
+
+ // Guard against overflow.
+ if (delay > 0 && deadline < 0) {
+ deadline = Long.MAX_VALUE;
+ }
+ HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
+ timeouts.add(timeout);
+ return timeout;
+ }
+
+ /**
+ * Returns the number of pending timeouts of this {@link Timer}.
+ */
+ public long pendingTimeouts() {
+ return pendingTimeouts.get();
+ }
+
+ private static void reportTooManyInstances() {
+ if (logger.isErrorEnabled()) {
+ String resourceType = HashedWheelTimer.class.getSimpleName();
+ logger.error("You are creating too many " + resourceType + " instances. " +
+ resourceType + " is a shared resource that must be reused across the JVM," +
+ "so that only a few instances are created.");
+ }
+ }
+
+ private final class Worker implements Runnable {
+ private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
+
+ private long tick;
+
+ @Override
+ public void run() {
+ // Initialize the startTime.
+ startTime = System.nanoTime();
+ if (startTime == 0) {
+ // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
+ startTime = 1;
+ }
+
+ // Notify the other threads waiting for the initialization at start().
+ startTimeInitialized.countDown();
+
+ do {
+ final long deadline = waitForNextTick();
+ if (deadline > 0) {
+ int idx = (int) (tick & mask);
+ processCancelledTasks();
+ HashedWheelBucket bucket =
+ wheel[idx];
+ transferTimeoutsToBuckets();
+ bucket.expireTimeouts(deadline);
+ tick++;
+ }
+ } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
+
+ // Fill the unprocessedTimeouts so we can return them from stop() method.
+ for (HashedWheelBucket bucket : wheel) {
+ bucket.clearTimeouts(unprocessedTimeouts);
+ }
+ for (; ; ) {
+ HashedWheelTimeout timeout = timeouts.poll();
+ if (timeout == null) {
+ break;
+ }
+ if (!timeout.isCancelled()) {
+ unprocessedTimeouts.add(timeout);
+ }
+ }
+ processCancelledTasks();
+ }
+
+ private void transferTimeoutsToBuckets() {
+ // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
+ // adds new timeouts in a loop.
+ for (int i = 0; i < 100000; i++) {
+ HashedWheelTimeout timeout = timeouts.poll();
+ if (timeout == null) {
+ // all processed
+ break;
+ }
+ if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
+ // Was cancelled in the meantime.
+ continue;
+ }
+
+ long calculated = timeout.deadline / tickDuration;
+ timeout.remainingRounds = (calculated - tick) / wheel.length;
+
+ final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
+ int stopIndex = (int) (ticks & mask);
+
+ HashedWheelBucket bucket = wheel[stopIndex];
+ bucket.addTimeout(timeout);
+ }
+ }
+
+ private void processCancelledTasks() {
+ for (; ; ) {
+ HashedWheelTimeout timeout = cancelledTimeouts.poll();
+ if (timeout == null) {
+ // all processed
+ break;
+ }
+ try {
+ timeout.remove();
+ } catch (Throwable t) {
+ if (logger.isWarnEnabled()) {
+ logger.warn("An exception was thrown while process a cancellation task", t);
+ }
+ }
+ }
+ }
+
+ /**
+ * calculate goal nanoTime from startTime and current tick number,
+ * then wait until that goal has been reached.
+ *
+ * @return Long.MIN_VALUE if received a shutdown request,
+ * current time otherwise (with Long.MIN_VALUE changed by +1)
+ */
+ private long waitForNextTick() {
+ long deadline = tickDuration * (tick + 1);
+
+ for (; ; ) {
+ final long currentTime = System.nanoTime() - startTime;
+ long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
+
+ if (sleepTimeMs <= 0) {
+ if (currentTime == Long.MIN_VALUE) {
+ return -Long.MAX_VALUE;
+ } else {
+ return currentTime;
+ }
+ }
+
+ // Check if we run on windows, as if thats the case we will need
+ // to round the sleepTime as workaround for a bug that only affect
+ // the JVM if it runs on windows.
+ //
+ // See https://github.com/netty/netty/issues/356
+ if (OsUtils.isWindows()) {
+ sleepTimeMs = sleepTimeMs / 10 * 10;
+ }
+
+ try {
+ Thread.sleep(sleepTimeMs);
+ } catch (InterruptedException ignored) {
+ if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
+ return Long.MIN_VALUE;
+ }
+ }
+ }
+ }
+
+ public Set<Timeout> unprocessedTimeouts() {
+ return Collections.unmodifiableSet(unprocessedTimeouts);
+ }
+ }
+
+ private static final class HashedWheelTimeout implements Timeout {
+
+ private static final int ST_INIT = 0;
+ private static final int ST_CANCELLED = 1;
+ private static final int ST_EXPIRED = 2;
+ private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
+
+ private final HashedWheelTimer timer;
+ private final TimerTask task;
+ private final long deadline;
+
+ @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
+ private volatile int state = ST_INIT;
+
+ // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
+ // HashedWheelTimeout will be added to the correct HashedWheelBucket.
+ long remainingRounds;
+
+ // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
+ // As only the workerThread will act on it there is no need for synchronization / volatile.
+ HashedWheelTimeout next;
+ HashedWheelTimeout prev;
+
+ // The bucket to which the timeout was added
+ HashedWheelBucket bucket;
+
+ HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
+ this.timer = timer;
+ this.task = task;
+ this.deadline = deadline;
+ }
+
+ @Override
+ public Timer timer() {
+ return timer;
+ }
+
+ @Override
+ public TimerTask task() {
+ return task;
+ }
+
+ @Override
+ public boolean cancel() {
+ // only update the state it will be removed from HashedWheelBucket on next tick.
+ if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
+ return false;
+ }
+ // If a task should be canceled we put this to another queue which will be processed on each tick.
+ // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
+ // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
+ timer.cancelledTimeouts.add(this);
+ return true;
+ }
+
+ void remove() {
+ HashedWheelBucket bucket = this.bucket;
+ if (bucket != null) {
+ bucket.remove(this);
+ } else {
+ timer.pendingTimeouts.decrementAndGet();
+ }
+ }
+
+ public boolean compareAndSetState(int expected, int state) {
+ return STATE_UPDATER.compareAndSet(this, expected, state);
+ }
+
+ public int state() {
+ return state;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return state() == ST_CANCELLED;
+ }
+
+ @Override
+ public boolean isExpired() {
+ return state() == ST_EXPIRED;
+ }
+
+ public void expire() {
+ if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
+ return;
+ }
+
+ try {
+ task.run(this);
+ } catch (Throwable t) {
+ if (logger.isWarnEnabled()) {
+ logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ final long currentTime = System.nanoTime();
+ long remaining = deadline - currentTime + timer.startTime;
+
+ StringBuilder buf = new StringBuilder(192)
+ .append(this.getClass().getSimpleName())
+ .append('(')
+ .append("deadline: ");
+ if (remaining > 0) {
+ buf.append(remaining)
+ .append(" ns later");
+ } else if (remaining < 0) {
+ buf.append(-remaining)
+ .append(" ns ago");
+ } else {
+ buf.append("now");
+ }
+
+ if (isCancelled()) {
+ buf.append(", cancelled");
+ }
+
+ return buf.append(", task: ")
+ .append(task())
+ .append(')')
+ .toString();
+ }
+
+ }
+
+ /**
+ * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
+ * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
+ * extra object creation is needed.
+ */
+ private static final class HashedWheelBucket {
+ // Used for the linked-list datastructure
+ private HashedWheelTimeout head;
+ private HashedWheelTimeout tail;
+
+ /**
+ * Add {@link HashedWheelTimeout} to this bucket.
+ */
+ public void addTimeout(HashedWheelTimeout timeout) {
+ assert timeout.bucket == null;
+ timeout.bucket = this;
+ if (head == null) {
+ head = tail = timeout;
+ } else {
+ tail.next = timeout;
+ timeout.prev = tail;
+ tail = timeout;
+ }
+ }
+
+ /**
+ * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
+ */
+ public void expireTimeouts(long deadline) {
+ HashedWheelTimeout timeout = head;
+
+ // process all timeouts
+ while (timeout != null) {
+ HashedWheelTimeout next = timeout.next;
+ if (timeout.remainingRounds <= 0) {
+ next = remove(timeout);
+ if (timeout.deadline <= deadline) {
+ timeout.expire();
+ } else {
+ // The timeout was placed into a wrong slot. This should never happen.
+ throw new IllegalStateException(String.format(
+ "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
+ }
+ } else if (timeout.isCancelled()) {
+ next = remove(timeout);
+ } else {
+ timeout.remainingRounds--;
+ }
+ timeout = next;
+ }
+ }
+
+ public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
+ HashedWheelTimeout next = timeout.next;
+ // remove timeout that was either processed or cancelled by updating the linked-list
+ if (timeout.prev != null) {
+ timeout.prev.next = next;
+ }
+ if (timeout.next != null) {
+ timeout.next.prev = timeout.prev;
+ }
+
+ if (timeout == head) {
+ // if timeout is also the tail we need to adjust the entry too
+ if (timeout == tail) {
+ tail = null;
+ head = null;
+ } else {
+ head = next;
+ }
+ } else if (timeout == tail) {
+ // if the timeout is the tail modify the tail to be the prev node.
+ tail = timeout.prev;
+ }
+ // null out prev, next and bucket to allow for GC.
+ timeout.prev = null;
+ timeout.next = null;
+ timeout.bucket = null;
+ timeout.timer.pendingTimeouts.decrementAndGet();
+ return next;
+ }
+
+ /**
+ * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
+ */
+ public void clearTimeouts(Set<Timeout> set) {
+ for (; ; ) {
+ HashedWheelTimeout timeout = pollTimeout();
+ if (timeout == null) {
+ return;
+ }
+ if (timeout.isExpired() || timeout.isCancelled()) {
+ continue;
+ }
+ set.add(timeout);
+ }
+ }
+
+ private HashedWheelTimeout pollTimeout() {
+ HashedWheelTimeout head = this.head;
+ if (head == null) {
+ return null;
+ }
+ HashedWheelTimeout next = head.next;
+ if (next == null) {
+ tail = this.head = null;
+ } else {
+ this.head = next;
+ next.prev = null;
+ }
+
+ // null out prev and next to allow for GC.
+ head.next = null;
+ head.prev = null;
+ head.bucket = null;
+ return head;
+ }
+ }
+}
diff --git a/soul-common/src/main/java/org/dromara/soul/common/timer/Timeout.java b/soul-common/src/main/java/org/dromara/soul/common/timer/Timeout.java
new file mode 100644
index 0000000..02ef6e4
--- /dev/null
+++ b/soul-common/src/main/java/org/dromara/soul/common/timer/Timeout.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dromara.soul.common.timer;
+
+/**
+ * A handle associated with a {@link TimerTask} that is returned by a
+ * {@link Timer}.
+ *
+ * @author netty.
+ */
+public interface Timeout {
+
+ /**
+ * Returns the {@link Timer} that created this handle.
+ */
+ Timer timer();
+
+ /**
+ * Returns the {@link TimerTask} which is associated with this handle.
+ */
+ TimerTask task();
+
+ /**
+ * Returns {@code true} if and only if the {@link TimerTask} associated
+ * with this handle has been expired.
+ */
+ boolean isExpired();
+
+ /**
+ * Returns {@code true} if and only if the {@link TimerTask} associated
+ * with this handle has been cancelled.
+ */
+ boolean isCancelled();
+
+ /**
+ * Attempts to cancel the {@link TimerTask} associated with this handle.
+ * If the task has been executed or cancelled already, it will return with
+ * no side effect.
+ *
+ * @return True if the cancellation completed successfully, otherwise false
+ */
+ boolean cancel();
+}
\ No newline at end of file
diff --git a/soul-common/src/main/java/org/dromara/soul/common/timer/Timer.java b/soul-common/src/main/java/org/dromara/soul/common/timer/Timer.java
new file mode 100644
index 0000000..a26cc6a
--- /dev/null
+++ b/soul-common/src/main/java/org/dromara/soul/common/timer/Timer.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.dromara.soul.common.timer;
+
+import java.util.Set;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Schedules {@link TimerTask}s for one-time future execution in a background
+ * thread.
+ *
+ * @author netty.
+ */
+public interface Timer {
+
+ /**
+ * Schedules the specified {@link TimerTask} for one-time execution after
+ * the specified delay.
+ *
+ * @return a handle which is associated with the specified task
+ * @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already
+ * @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout
+ * can cause instability in the system.
+ */
+ Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
+
+ /**
+ * Releases all resources acquired by this {@link Timer} and cancels all
+ * tasks which were scheduled but not executed yet.
+ *
+ * @return the handles associated with the tasks which were canceled by
+ * this method
+ */
+ Set<Timeout> stop();
+
+ /**
+ * stop method status.
+ * @return bool.
+ */
+ boolean isStop();
+}
diff --git a/soul-common/src/main/java/org/dromara/soul/common/timer/TimerTask.java b/soul-common/src/main/java/org/dromara/soul/common/timer/TimerTask.java
new file mode 100644
index 0000000..c26526a
--- /dev/null
+++ b/soul-common/src/main/java/org/dromara/soul/common/timer/TimerTask.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dromara.soul.common.timer;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A task which is executed after the delay specified with
+ * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
+ * io.netty.util.TimerTask.
+ *
+ * @author netty.
+ */
+public interface TimerTask {
+
+ /**
+ * Executed after the delay specified with
+ * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
+ *
+ * @param timeout a handle which is associated with this task
+ */
+ void run(Timeout timeout);
+}
diff --git a/soul-register/soul-register-api/src/main/java/org/dromara/soul/register/api/AbstractRegistry.java b/soul-register/soul-register-api/src/main/java/org/dromara/soul/register/api/AbstractRegistry.java
index 09e3aef..57d2d3f 100644
--- a/soul-register/soul-register-api/src/main/java/org/dromara/soul/register/api/AbstractRegistry.java
+++ b/soul-register/soul-register-api/src/main/java/org/dromara/soul/register/api/AbstractRegistry.java
@@ -17,11 +17,16 @@
package org.dromara.soul.register.api;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.Sets;
import org.dromara.soul.common.exception.SoulException;
import org.dromara.soul.common.http.URL;
+import org.dromara.soul.register.api.exception.RegisterException;
/**
* AbstractRegistry.
@@ -41,12 +46,12 @@
/**
* The notifyUrls.
*/
- protected final Map<URL, RegisterNotifyListener> subscribed = new ConcurrentHashMap<>();
+ private final Map<URL, RegisterNotifyListener> subscribed = new ConcurrentHashMap<>();
/**
* The Registered.
*/
- protected final Map<URL, Object> registered = new ConcurrentHashMap<>();
+ private final Set<URL> registered = Sets.newConcurrentHashSet();
/**
* Instantiates a new Abstract registry.
@@ -69,10 +74,21 @@
}
/**
- * Retry.
+ * get subscribed.
+ *
+ * @return map.
*/
- public void retry() {
- //todo:重试.
+ Map<URL, RegisterNotifyListener> getSubscribed() {
+ return Collections.unmodifiableMap(subscribed);
+ }
+
+ /**
+ * get registered.
+ *
+ * @return set.
+ */
+ Set<URL> getRegistered() {
+ return Collections.unmodifiableSet(registered);
}
/**
@@ -89,7 +105,7 @@
*
* @return the remote url
*/
- public URL getRemoteUrl() {
+ protected URL getRemoteUrl() {
return remoteUrl;
}
@@ -103,22 +119,38 @@
@Override
public void register(URL url) {
-
+ if (url == null) {
+ throw new RegisterException("url is null || url == null");
+ }
+ registered.add(url);
}
+
@Override
public void unregister(URL url) {
-
+ if (url == null) {
+ throw new RegisterException("url is null || url == null");
+ }
+ registered.remove(url);
}
@Override
public void subscribe(URL url, RegisterNotifyListener listener) {
-
+ if (url == null) {
+ throw new RegisterException("url is null || url == null");
+ }
+ if (listener == null) {
+ throw new RegisterException("listener is null || listener == null");
+ }
+ subscribed.putIfAbsent(url, listener);
}
@Override
public void unsubscribe(URL url) {
-
+ if (url == null) {
+ throw new RegisterException("url is null || url == null");
+ }
+ subscribed.remove(url);
}
@Override
diff --git a/soul-register/soul-register-api/src/main/java/org/dromara/soul/register/api/FailbackRegistry.java b/soul-register/soul-register-api/src/main/java/org/dromara/soul/register/api/FailbackRegistry.java
new file mode 100644
index 0000000..d861987
--- /dev/null
+++ b/soul-register/soul-register-api/src/main/java/org/dromara/soul/register/api/FailbackRegistry.java
@@ -0,0 +1,280 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dromara.soul.register.api;
+
+import com.google.common.collect.Maps;
+import org.dromara.soul.common.concurrent.SoulThreadFactory;
+import org.dromara.soul.common.http.URL;
+import org.dromara.soul.common.timer.HashedWheelTimer;
+import org.dromara.soul.common.timer.Timeout;
+import org.dromara.soul.common.timer.Timer;
+import org.dromara.soul.common.timer.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * FailbackRegistry .
+ *
+ * @author sixh
+ */
+public abstract class FailbackRegistry extends AbstractRegistry {
+
+ private Logger logger = LoggerFactory.getLogger(FailbackRegistry.class);
+
+ private final Map<URL, FailbackTask> failbackRegister = Maps.newConcurrentMap();
+
+ private final Map<URL, FailbackTask> failbackSubscribe = Maps.newConcurrentMap();
+
+ private static final HashedWheelTimer TIMER = new HashedWheelTimer(SoulThreadFactory.create("soul.failback.register"));
+ /**
+ * retry exec time.
+ */
+ private final Long retryTime = TimeUnit.SECONDS.toMillis(5);
+
+ /**
+ * Instantiates a new Abstract registry.
+ *
+ * @param url the url.
+ */
+ public FailbackRegistry(URL url) {
+ super(url);
+ }
+
+ @Override
+ public void register(URL url) {
+ removeFailbackRegister(url);
+ try {
+ super.register(url);
+ doRegister(url);
+ } catch (Throwable t) {
+ addFailbackRegister(url, new FailbackTask(url, url1 -> {
+ doRegister(url1);
+ failbackRegister.remove(url1);
+ }, retryTime));
+ }
+ }
+
+ @Override
+ public void unregister(URL url) {
+ removeFailbackRegister(url);
+ try {
+ super.unregister(url);
+ doUnRegister(url);
+ } catch (Throwable t) {
+ addFailbackRegister(url, new FailbackTask(url, url1 -> {
+ doUnRegister(url1);
+ failbackRegister.remove(url1);
+ }, retryTime));
+ }
+ }
+
+ @Override
+ public void subscribe(URL url, RegisterNotifyListener listener) {
+ removeFailbackSubscribe(url);
+ try {
+ super.subscribe(url, listener);
+ doSubscribe(url);
+ } catch (Throwable t) {
+ addFailbackSubscribe(url, new FailbackTask(url, url1 -> {
+ doSubscribe(url1);
+ failbackSubscribe.remove(url1);
+ }, retryTime));
+ }
+ }
+
+ @Override
+ public void unsubscribe(URL url) {
+ removeFailbackSubscribe(url);
+ try {
+ super.unsubscribe(url);
+ doUnSubscribe(url);
+ } catch (Throwable t) {
+ addFailbackSubscribe(url, new FailbackTask(url, url1 -> {
+ doUnSubscribe(url1);
+ failbackSubscribe.remove(url1);
+ }, retryTime));
+ }
+ }
+
+ private void addFailbackRegister(URL url, FailbackTask task) {
+ FailbackTask failbackTask = failbackRegister.get(url);
+ if (failbackTask != null) {
+ return;
+ }
+ failbackRegister.putIfAbsent(url, task);
+ TIMER.newTimeout(task, retryTime, TimeUnit.MILLISECONDS);
+ }
+
+ private void removeFailbackRegister(URL url) {
+ FailbackTask remove = failbackRegister.remove(url);
+ if (remove != null) {
+ remove.cancel();
+ }
+ }
+
+ private void addFailbackSubscribe(URL url, FailbackTask task) {
+ FailbackTask failbackTask = failbackRegister.get(url);
+ if (failbackTask != null) {
+ return;
+ }
+ failbackSubscribe.putIfAbsent(url, task);
+ TIMER.newTimeout(task, retryTime, TimeUnit.MILLISECONDS);
+ }
+
+ private void removeFailbackSubscribe(URL url) {
+ FailbackTask remove = failbackSubscribe.remove(url);
+ if (remove != null) {
+ remove.cancel();
+ }
+ }
+
+ protected void recover() {
+ Set<URL> registered = getRegistered();
+ if (!registered.isEmpty()) {
+ if (logger.isInfoEnabled()) {
+ logger.info("registered : {}", registered);
+ }
+ registered.forEach(e -> addFailbackSubscribe(e, new FailbackTask(e,
+ url -> {
+ doRegister(url);
+ failbackRegister.remove(url);
+ }, retryTime)));
+ }
+ Map<URL, RegisterNotifyListener> subscribed = getSubscribed();
+ if (!subscribed.isEmpty()) {
+ if (logger.isInfoEnabled()) {
+ logger.info("subscribed: {}", subscribed);
+ }
+ subscribed.forEach((k, v) -> addFailbackSubscribe(k, new FailbackTask(k, url -> {
+ doSubscribe(url);
+ failbackSubscribe.remove(url);
+ }, retryTime)));
+ }
+ }
+
+ /**
+ * register template method.
+ *
+ * @param url url.
+ */
+ protected abstract void doRegister(URL url);
+
+ /**
+ * Do un register.
+ *
+ * @param url the url
+ */
+ protected abstract void doUnRegister(URL url);
+
+ /**
+ * Do subscribe.
+ *
+ * @param url the url
+ */
+ protected abstract void doSubscribe(URL url);
+
+ /**
+ * Do un subscribe.
+ *
+ * @param url the url
+ */
+ protected abstract void doUnSubscribe(URL url);
+
+ /**
+ * The type Failback task.
+ */
+ static class FailbackTask implements TimerTask {
+
+ private FailbackRunner runner;
+
+ private URL url;
+
+ private Long tick;
+
+ private volatile boolean cancel = false;
+
+ /**
+ * Instantiates a new Failback task.
+ *
+ * @param url the url
+ * @param runner the runner
+ * @param tick the tick
+ */
+ FailbackTask(URL url, FailbackRunner runner, Long tick) {
+ this.runner = runner;
+ this.url = url;
+ this.tick = tick;
+ }
+
+ /**
+ * Cancel.
+ */
+ void cancel() {
+ this.cancel = true;
+ }
+
+ /**
+ * Is cancel boolean.
+ *
+ * @return the boolean
+ */
+ private boolean isCancel() {
+ return cancel;
+ }
+
+ @Override
+ public void run(Timeout timeout) {
+ if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) {
+ // other thread cancel this timeout or stop the timer.
+ return;
+ }
+ try {
+ runner.apply(url);
+ } catch (Throwable e) {
+ replay(timeout, tick);
+ }
+ }
+
+ private void replay(Timeout timeout, long tick) {
+ Timer timer = timeout.timer();
+ if (timer.isStop() || timeout.isCancelled() || isCancel()) {
+ return;
+ }
+ timer.newTimeout(this, tick, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * The interface Failback runner.
+ */
+ @FunctionalInterface
+ interface FailbackRunner {
+ /**
+ * Apply.
+ *
+ * @param url the url
+ */
+ void apply(URL url);
+ }
+}
diff --git a/soul-register/soul-register-api/src/main/java/org/dromara/soul/register/api/exception/RegisterException.java b/soul-register/soul-register-api/src/main/java/org/dromara/soul/register/api/exception/RegisterException.java
new file mode 100644
index 0000000..d6e1fe2
--- /dev/null
+++ b/soul-register/soul-register-api/src/main/java/org/dromara/soul/register/api/exception/RegisterException.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dromara.soul.register.api.exception;
+
+import org.dromara.soul.common.exception.SoulException;
+
+/**
+ * RegisterException .
+ *
+ * @author sixh
+ */
+public class RegisterException extends SoulException {
+ public RegisterException(Throwable e) {
+ super(e);
+ }
+
+ public RegisterException(String message) {
+ super(message);
+ }
+
+ public RegisterException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+}
diff --git a/soul-register/soul-register-zookeeper/src/main/java/ZookeeperRegistry.java b/soul-register/soul-register-zookeeper/src/main/java/ZookeeperRegistry.java
index 7f55846..f977a59 100644
--- a/soul-register/soul-register-zookeeper/src/main/java/ZookeeperRegistry.java
+++ b/soul-register/soul-register-zookeeper/src/main/java/ZookeeperRegistry.java
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-import java.util.Optional;
import org.dromara.soul.common.exception.SoulException;
import org.dromara.soul.common.extension.Join;
import org.dromara.soul.common.http.URL;
-import org.dromara.soul.register.api.AbstractRegistry;
+import org.dromara.soul.register.api.FailbackRegistry;
import org.dromara.soul.register.api.RegisterConst;
import org.dromara.soul.remoting.zookeeper.ZookeeperClient;
import org.dromara.soul.remoting.zookeeper.ZookeeperClientCache;
import org.dromara.soul.remoting.zookeeper.ZookeeperStatusCallback;
+import java.util.Optional;
+
import static org.dromara.soul.register.api.RegisterConst.BASE_URL_PATH_KEY;
/**
@@ -34,7 +35,7 @@
* @author sixh
*/
@Join
-public class ZookeeperRegistry extends AbstractRegistry {
+public class ZookeeperRegistry extends FailbackRegistry {
private final ZookeeperClient client;
@@ -43,7 +44,7 @@
client = ZookeeperClientCache.getClient(url);
client.statusChange(connectionState -> {
if (connectionState.equals(ZookeeperStatusCallback.RECONNECTED)) {
- retry();
+ recover();
}
});
}
@@ -64,15 +65,6 @@
return path + url.encode();
}
- @Override
- public void register(URL url) {
- try {
- String path = toPath(url);
- client.create(path, isEphemeral(url));
- } catch (Throwable e) {
- throw new SoulException("register failed " + url + "to zookeeper " + getRemoteUrl());
- }
- }
private boolean isEphemeral(URL url) {
String parameter = url.getParameter(RegisterConst.EPHEMERAL_KEY);
@@ -82,11 +74,31 @@
}
@Override
- public void unregister(URL url) {
+ protected void doRegister(URL url) {
+ try {
+ String path = toPath(url);
+ client.create(path, isEphemeral(url));
+ } catch (Throwable e) {
+ throw new SoulException("register failed " + url + "to zookeeper " + getRemoteUrl());
+ }
+ }
+
+ @Override
+ protected void doUnRegister(URL url) {
try {
client.delete(toPath(url));
} catch (Throwable e) {
throw new SoulException("unregister failed " + url + "to zookeeper " + getRemoteUrl());
}
}
+
+ @Override
+ protected void doSubscribe(URL url) {
+
+ }
+
+ @Override
+ protected void doUnSubscribe(URL url) {
+
+ }
}
diff --git a/soul-remoting/soul-remoting-api/src/main/java/org/dromara/soul/remoting/api/ChannelCache.java b/soul-remoting/soul-remoting-api/src/main/java/org/dromara/soul/remoting/api/ChannelCache.java
index 138a9800f..8434ae3 100644
--- a/soul-remoting/soul-remoting-api/src/main/java/org/dromara/soul/remoting/api/ChannelCache.java
+++ b/soul-remoting/soul-remoting-api/src/main/java/org/dromara/soul/remoting/api/ChannelCache.java
@@ -107,10 +107,10 @@
Channel get(String key) {
Timeout timeout = CACHES.get(key);
if (timeout != null && !timeout.isDefault()) {
- Node node = (Node) timeout.task();
- String sKey = node.getKey();
+ TtlCacheTimerTask ttlCacheTimerTask = (TtlCacheTimerTask) timeout.task();
+ String sKey = ttlCacheTimerTask.getKey();
if (sKey.equals(key)) {
- return node.getValue();
+ return ttlCacheTimerTask.getValue();
}
}
return null;
@@ -154,6 +154,6 @@
@SuppressWarnings("unchecked")
public Collection<Channel> getAll() {
- return CACHES.values().stream().map(e -> ((Node) e.task()).getValue()).collect(Collectors.toList());
+ return CACHES.values().stream().map(e -> ((TtlCacheTimerTask) e.task()).getValue()).collect(Collectors.toList());
}
}