| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package golang |
| |
| import ( |
| "context" |
| "sync" |
| "time" |
| |
| "github.com/apache/rocketmq-clients/golang/pkg/ticker" |
| "github.com/apache/rocketmq-clients/golang/pkg/utils" |
| v2 "github.com/apache/rocketmq-clients/golang/protocol/v2" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| ) |
| |
| // TODO: no global clientManager |
| type ClientManager interface { |
| RegisterClient(client Client) |
| UnRegisterClient(client Client) |
| QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) (*v2.QueryRouteResponse, error) |
| HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, duration time.Duration) (*v2.HeartbeatResponse, error) |
| SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, duration time.Duration) (*v2.SendMessageResponse, error) |
| Telemetry(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error) |
| EndTransaction(ctx context.Context, endpoints *v2.Endpoints, request *v2.EndTransactionRequest, duration time.Duration) (*v2.EndTransactionResponse, error) |
| NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, request *v2.NotifyClientTerminationRequest, duration time.Duration) (*v2.NotifyClientTerminationResponse, error) |
| ReceiveMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error) |
| AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, duration time.Duration) (*v2.AckMessageResponse, error) |
| ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, request *v2.ChangeInvisibleDurationRequest, duration time.Duration) (*v2.ChangeInvisibleDurationResponse, error) |
| } |
| |
| type clientManagerOptions struct { |
| RPC_CLIENT_MAX_IDLE_DURATION time.Duration |
| |
| RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY time.Duration |
| RPC_CLIENT_IDLE_CHECK_PERIOD time.Duration |
| |
| HEART_BEAT_INITIAL_DELAY time.Duration |
| HEART_BEAT_PERIOD time.Duration |
| |
| LOG_STATS_INITIAL_DELAY time.Duration |
| LOG_STATS_PERIOD time.Duration |
| |
| SYNC_SETTINGS_DELAY time.Duration |
| SYNC_SETTINGS_PERIOD time.Duration |
| } |
| |
| var defaultClientManagerOptions = clientManagerOptions{ |
| RPC_CLIENT_MAX_IDLE_DURATION: time.Minute * 30, |
| |
| RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY: time.Second * 5, |
| RPC_CLIENT_IDLE_CHECK_PERIOD: time.Minute * 1, |
| |
| HEART_BEAT_INITIAL_DELAY: time.Second * 1, |
| HEART_BEAT_PERIOD: time.Second * 10, |
| |
| LOG_STATS_INITIAL_DELAY: time.Second * 60, |
| LOG_STATS_PERIOD: time.Second * 60, |
| |
| SYNC_SETTINGS_DELAY: time.Second * 1, |
| SYNC_SETTINGS_PERIOD: time.Minute * 5, |
| } |
| |
| type defaultClientManager struct { |
| rpcClientTable map[string]RpcClient |
| rpcClientTableLock sync.RWMutex |
| clientTable sync.Map |
| done chan struct{} |
| opts clientManagerOptions |
| } |
| |
| var _ = ClientManager(&defaultClientManager{}) |
| |
| var NewDefaultClientManager = func() *defaultClientManager { |
| return &defaultClientManager{ |
| rpcClientTable: make(map[string]RpcClient), |
| done: make(chan struct{}), |
| opts: defaultClientManagerOptions, |
| } |
| } |
| |
| func (cm *defaultClientManager) RegisterClient(client Client) { |
| cm.clientTable.Store(client.GetClientID(), client) |
| } |
| |
| func (cm *defaultClientManager) UnRegisterClient(client Client) { |
| cm.clientTable.Delete(client.GetClientID()) |
| } |
| |
| func (cm *defaultClientManager) startUp() { |
| sugarBaseLogger.Info("begin to start the client manager") |
| |
| go func() { |
| time.Sleep(cm.opts.RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY) |
| cm.clearIdleRpcClients() |
| ticker.Tick(cm.clearIdleRpcClients, (cm.opts.RPC_CLIENT_IDLE_CHECK_PERIOD), cm.done) |
| }() |
| |
| go func() { |
| time.Sleep(cm.opts.HEART_BEAT_INITIAL_DELAY) |
| cm.doHeartbeat() |
| ticker.Tick(cm.doHeartbeat, (cm.opts.HEART_BEAT_PERIOD), cm.done) |
| }() |
| |
| go func() { |
| time.Sleep(cm.opts.LOG_STATS_INITIAL_DELAY) |
| cm.doStats() |
| ticker.Tick(cm.doStats, (cm.opts.LOG_STATS_PERIOD), cm.done) |
| }() |
| |
| go func() { |
| time.Sleep(cm.opts.SYNC_SETTINGS_DELAY) |
| cm.syncSettings() |
| ticker.Tick(cm.syncSettings, (cm.opts.SYNC_SETTINGS_PERIOD), cm.done) |
| }() |
| |
| sugarBaseLogger.Info("the client manager starts successfully") |
| } |
| func (cm *defaultClientManager) deleteRpcClient(rpcClient RpcClient) { |
| delete(cm.rpcClientTable, rpcClient.GetTarget()) |
| rpcClient.GracefulStop() |
| } |
| |
| func (cm *defaultClientManager) clearIdleRpcClients() { |
| sugarBaseLogger.Info("clientManager start clearIdleRpcClients") |
| cm.rpcClientTableLock.Lock() |
| defer cm.rpcClientTableLock.Unlock() |
| for target, rpcClient := range cm.rpcClientTable { |
| idleDuration := rpcClient.idleDuration() |
| if idleDuration > cm.opts.RPC_CLIENT_MAX_IDLE_DURATION { |
| cm.deleteRpcClient(rpcClient) |
| sugarBaseLogger.Warnf("rpc client has been idle for a long time, target=%s, idleDuration=%d, rpcClientMaxIdleDuration=%d\n", target, idleDuration, cm.opts.RPC_CLIENT_MAX_IDLE_DURATION) |
| } |
| } |
| } |
| func (cm *defaultClientManager) doHeartbeat() { |
| sugarBaseLogger.Info("clientManager start doHeartbeat") |
| cm.clientTable.Range(func(_, v interface{}) bool { |
| client := v.(*defaultClient) |
| client.Heartbeat() |
| return true |
| }) |
| } |
| func (cm *defaultClientManager) doStats() { |
| // TODO |
| } |
| func (cm *defaultClientManager) syncSettings() { |
| sugarBaseLogger.Info("clientManager start syncSettings") |
| cm.clientTable.Range(func(_, v interface{}) bool { |
| client := v.(*defaultClient) |
| client.trySyncSettings() |
| return true |
| }) |
| } |
| func (cm *defaultClientManager) shutdown() { |
| sugarBaseLogger.Info("begin to shutdown the client manager") |
| cm.done <- struct{}{} |
| close(cm.done) |
| cm.cleanRpcClient() |
| sugarBaseLogger.Info("shutdown the client manager successfully") |
| } |
| func (cm *defaultClientManager) cleanRpcClient() { |
| sugarBaseLogger.Info("clientManager start cleanRpcClient") |
| cm.rpcClientTableLock.Lock() |
| defer cm.rpcClientTableLock.Unlock() |
| for _, rpcClient := range cm.rpcClientTable { |
| cm.deleteRpcClient(rpcClient) |
| } |
| } |
| func (cm *defaultClientManager) getRpcClient(endpoints *v2.Endpoints) (RpcClient, error) { |
| target := utils.ParseAddress(utils.SelectAnAddress(endpoints)) |
| |
| cm.rpcClientTableLock.RLock() |
| item, ok := cm.rpcClientTable[target] |
| cm.rpcClientTableLock.RUnlock() |
| if ok { |
| if ret, ok := item.(*rpcClient); ok { |
| return ret, nil |
| } |
| } |
| |
| cm.rpcClientTableLock.Lock() |
| defer cm.rpcClientTableLock.Unlock() |
| |
| // double check |
| item, ok = cm.rpcClientTable[target] |
| if ok { |
| if ret, ok := item.(*rpcClient); ok { |
| return ret, nil |
| } |
| } |
| rpcClient, err := NewRpcClient(target) |
| if err != nil { |
| return nil, err |
| } |
| cm.rpcClientTable[target] = rpcClient |
| return rpcClient, nil |
| } |
| func (cm *defaultClientManager) handleGrpcError(rpcClient RpcClient, err error) { |
| if err != nil { |
| if e, ok := status.FromError(err); ok { |
| if e.Code() == codes.Unavailable { |
| sugarBaseLogger.Errorf("happened unavailable err=%w, close rpcClient=%s", err, rpcClient.GetTarget()) |
| cm.rpcClientTableLock.Lock() |
| defer cm.rpcClientTableLock.Unlock() |
| cm.deleteRpcClient(rpcClient) |
| } |
| } |
| } |
| } |
| func (cm *defaultClientManager) QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) (*v2.QueryRouteResponse, error) { |
| ctx, _ = context.WithTimeout(ctx, duration) |
| rpcClient, err := cm.getRpcClient(endpoints) |
| if err != nil { |
| return nil, err |
| } |
| ret, err := rpcClient.QueryRoute(ctx, request) |
| cm.handleGrpcError(rpcClient, err) |
| return ret, err |
| } |
| |
| func (cm *defaultClientManager) SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, duration time.Duration) (*v2.SendMessageResponse, error) { |
| ctx, _ = context.WithTimeout(ctx, duration) |
| rpcClient, err := cm.getRpcClient(endpoints) |
| if err != nil { |
| return nil, err |
| } |
| ret, err := rpcClient.SendMessage(ctx, request) |
| cm.handleGrpcError(rpcClient, err) |
| return ret, err |
| } |
| |
| func (cm *defaultClientManager) Telemetry(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error) { |
| ctx, _ = context.WithTimeout(ctx, duration) |
| rpcClient, err := cm.getRpcClient(endpoints) |
| if err != nil { |
| return nil, err |
| } |
| ret, err := rpcClient.Telemetry(ctx) |
| cm.handleGrpcError(rpcClient, err) |
| return ret, err |
| } |
| |
| func (cm *defaultClientManager) EndTransaction(ctx context.Context, endpoints *v2.Endpoints, request *v2.EndTransactionRequest, duration time.Duration) (*v2.EndTransactionResponse, error) { |
| ctx, _ = context.WithTimeout(ctx, duration) |
| rpcClient, err := cm.getRpcClient(endpoints) |
| if err != nil { |
| return nil, err |
| } |
| ret, err := rpcClient.EndTransaction(ctx, request) |
| cm.handleGrpcError(rpcClient, err) |
| return ret, err |
| } |
| |
| func (cm *defaultClientManager) HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, duration time.Duration) (*v2.HeartbeatResponse, error) { |
| ctx, _ = context.WithTimeout(ctx, duration) |
| rpcClient, err := cm.getRpcClient(endpoints) |
| if err != nil { |
| return nil, err |
| } |
| |
| ret, err := rpcClient.HeartBeat(ctx, request) |
| cm.handleGrpcError(rpcClient, err) |
| return ret, err |
| } |
| |
| func (cm *defaultClientManager) NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, request *v2.NotifyClientTerminationRequest, duration time.Duration) (*v2.NotifyClientTerminationResponse, error) { |
| ctx, _ = context.WithTimeout(ctx, duration) |
| rpcClient, err := cm.getRpcClient(endpoints) |
| if err != nil { |
| return nil, err |
| } |
| ret, err := rpcClient.NotifyClientTermination(ctx, request) |
| cm.handleGrpcError(rpcClient, err) |
| return ret, err |
| } |
| |
| func (cm *defaultClientManager) ReceiveMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error) { |
| rpcClient, err := cm.getRpcClient(endpoints) |
| if err != nil { |
| return nil, err |
| } |
| ret, err := rpcClient.ReceiveMessage(ctx, request) |
| cm.handleGrpcError(rpcClient, err) |
| return ret, err |
| } |
| |
| func (cm *defaultClientManager) AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, duration time.Duration) (*v2.AckMessageResponse, error) { |
| ctx, _ = context.WithTimeout(ctx, duration) |
| rpcClient, err := cm.getRpcClient(endpoints) |
| if err != nil { |
| return nil, err |
| } |
| ret, err := rpcClient.AckMessage(ctx, request) |
| cm.handleGrpcError(rpcClient, err) |
| return ret, err |
| } |
| |
| func (cm *defaultClientManager) ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, request *v2.ChangeInvisibleDurationRequest, duration time.Duration) (*v2.ChangeInvisibleDurationResponse, error) { |
| ctx, _ = context.WithTimeout(ctx, duration) |
| rpcClient, err := cm.getRpcClient(endpoints) |
| if err != nil { |
| return nil, err |
| } |
| ret, err := rpcClient.ChangeInvisibleDuration(ctx, request) |
| cm.handleGrpcError(rpcClient, err) |
| return ret, err |
| } |