| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package graceful_shutdown |
| |
| import ( |
| "os" |
| "os/signal" |
| "runtime/debug" |
| "sync" |
| "time" |
| ) |
| |
| import ( |
| "github.com/dubbogo/gost/log/logger" |
| ) |
| |
| import ( |
| "dubbo.apache.org/dubbo-go/v3/common/constant" |
| "dubbo.apache.org/dubbo-go/v3/common/extension" |
| "dubbo.apache.org/dubbo-go/v3/config" |
| ) |
| |
| const ( |
| // todo(DMwangnima): these descriptions and defaults could be wrapped by functions of Options |
| defaultTimeout = 60 * time.Second |
| defaultStepTimeout = 3 * time.Second |
| defaultConsumerUpdateWaitTime = 3 * time.Second |
| defaultOfflineRequestWindowTimeout = 3 * time.Second |
| |
| timeoutDesc = "Timeout" |
| stepTimeoutDesc = "StepTimeout" |
| consumerUpdateWaitTimeDesc = "ConsumerUpdateWaitTime" |
| offlineRequestWindowTimeoutDesc = "OfflineRequestWindowTimeout" |
| ) |
| |
| var ( |
| initOnce sync.Once |
| compatShutdown *config.ShutdownConfig |
| |
| proMu sync.Mutex |
| protocols map[string]struct{} |
| ) |
| |
| func Init(opts ...Option) { |
| initOnce.Do(func() { |
| protocols = make(map[string]struct{}) |
| newOpts := defaultOptions() |
| for _, opt := range opts { |
| opt(newOpts) |
| } |
| compatShutdown = compatShutdownConfig(newOpts.Shutdown) |
| // retrieve ShutdownConfig for gracefulShutdownFilter |
| cGracefulShutdownFilter, existcGracefulShutdownFilter := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey) |
| if !existcGracefulShutdownFilter { |
| return |
| } |
| sGracefulShutdownFilter, existsGracefulShutdownFilter := extension.GetFilter(constant.GracefulShutdownProviderFilterKey) |
| if !existsGracefulShutdownFilter { |
| return |
| } |
| if filter, ok := cGracefulShutdownFilter.(config.Setter); ok { |
| filter.Set(constant.GracefulShutdownFilterShutdownConfig, compatShutdown) |
| } |
| |
| if filter, ok := sGracefulShutdownFilter.(config.Setter); ok { |
| filter.Set(constant.GracefulShutdownFilterShutdownConfig, compatShutdown) |
| } |
| |
| if compatShutdown.InternalSignal != nil && *compatShutdown.InternalSignal { |
| signals := make(chan os.Signal, 1) |
| signal.Notify(signals, ShutdownSignals...) |
| |
| go func() { |
| select { |
| case sig := <-signals: |
| logger.Infof("get signal %s, applicationConfig will shutdown.", sig) |
| // gracefulShutdownOnce.Do(func() { |
| time.AfterFunc(totalTimeout(), func() { |
| logger.Warn("Shutdown gracefully timeout, applicationConfig will shutdown immediately. ") |
| os.Exit(0) |
| }) |
| beforeShutdown() |
| // those signals' original behavior is exit with dump ths stack, so we try to keep the behavior |
| for _, dumpSignal := range DumpHeapShutdownSignals { |
| if sig == dumpSignal { |
| debug.WriteHeapDump(os.Stdout.Fd()) |
| } |
| } |
| os.Exit(0) |
| } |
| }() |
| } |
| }) |
| } |
| |
| // RegisterProtocol registers protocol which would be destroyed before shutdown. |
| // Please make sure that Init function has been invoked before, otherwise this |
| // function would not make any sense. |
| func RegisterProtocol(name string) { |
| proMu.Lock() |
| protocols[name] = struct{}{} |
| proMu.Unlock() |
| } |
| |
| func totalTimeout() time.Duration { |
| timeout := parseDuration(compatShutdown.Timeout, timeoutDesc, defaultTimeout) |
| if timeout < defaultTimeout { |
| timeout = defaultTimeout |
| } |
| |
| return timeout |
| } |
| |
| func beforeShutdown() { |
| destroyRegistries() |
| // waiting for a short time so that the clients have enough time to get the notification that server shutdowns |
| // The value of configuration depends on how long the clients will get notification. |
| waitAndAcceptNewRequests() |
| |
| // reject sending/receiving the new request, but keeping waiting for accepting requests |
| waitForSendingAndReceivingRequests() |
| |
| // destroy all protocols |
| destroyProtocols() |
| |
| logger.Info("Graceful shutdown --- Execute the custom callbacks.") |
| customCallbacks := extension.GetAllCustomShutdownCallbacks() |
| for callback := customCallbacks.Front(); callback != nil; callback = callback.Next() { |
| callback.Value.(func())() |
| } |
| } |
| |
| // destroyRegistries destroys RegistryProtocol directly. |
| func destroyRegistries() { |
| logger.Info("Graceful shutdown --- Destroy all registriesConfig. ") |
| registryProtocol := extension.GetProtocol(constant.RegistryProtocol) |
| registryProtocol.Destroy() |
| } |
| |
| func waitAndAcceptNewRequests() { |
| logger.Info("Graceful shutdown --- Keep waiting and accept new requests for a short time. ") |
| |
| updateWaitTime := parseDuration(compatShutdown.ConsumerUpdateWaitTime, consumerUpdateWaitTimeDesc, defaultConsumerUpdateWaitTime) |
| time.Sleep(updateWaitTime) |
| |
| stepTimeout := parseDuration(compatShutdown.StepTimeout, stepTimeoutDesc, defaultStepTimeout) |
| |
| // ignore this step |
| if stepTimeout < 0 { |
| return |
| } |
| waitingProviderProcessedTimeout(stepTimeout) |
| } |
| |
| func waitingProviderProcessedTimeout(timeout time.Duration) { |
| deadline := time.Now().Add(timeout) |
| |
| offlineRequestWindowTimeout := parseDuration(compatShutdown.OfflineRequestWindowTimeout, offlineRequestWindowTimeoutDesc, defaultOfflineRequestWindowTimeout) |
| |
| for time.Now().Before(deadline) && |
| (compatShutdown.ProviderActiveCount.Load() > 0 || time.Now().Before(compatShutdown.ProviderLastReceivedRequestTime.Load().Add(offlineRequestWindowTimeout))) { |
| // sleep 10 ms and then we check it again |
| time.Sleep(10 * time.Millisecond) |
| logger.Infof("waiting for provider active invocation count = %d, provider last received request time: %v", |
| compatShutdown.ProviderActiveCount.Load(), compatShutdown.ProviderLastReceivedRequestTime.Load()) |
| } |
| } |
| |
| // for provider. It will wait for processing receiving requests |
| func waitForSendingAndReceivingRequests() { |
| logger.Info("Graceful shutdown --- Keep waiting until sending/accepting requests finish or timeout. ") |
| compatShutdown.RejectRequest.Store(true) |
| waitingConsumerProcessedTimeout() |
| } |
| |
| func waitingConsumerProcessedTimeout() { |
| stepTimeout := parseDuration(compatShutdown.StepTimeout, stepTimeoutDesc, defaultStepTimeout) |
| |
| if stepTimeout <= 0 { |
| return |
| } |
| deadline := time.Now().Add(stepTimeout) |
| |
| for time.Now().Before(deadline) && compatShutdown.ConsumerActiveCount.Load() > 0 { |
| // sleep 10 ms and then we check it again |
| time.Sleep(10 * time.Millisecond) |
| logger.Infof("waiting for consumer active invocation count = %d", compatShutdown.ConsumerActiveCount.Load()) |
| } |
| } |
| |
| // destroyProtocols destroys protocols that have been registered. |
| func destroyProtocols() { |
| logger.Info("Graceful shutdown --- Destroy protocols. ") |
| |
| proMu.Lock() |
| // extension.GetProtocol might panic |
| defer proMu.Unlock() |
| for name := range protocols { |
| extension.GetProtocol(name).Destroy() |
| } |
| } |