| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.rocketmq.connect.runtime.utils; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.UUID; |
| import org.apache.rocketmq.acl.common.AclClientRPCHook; |
| import org.apache.rocketmq.acl.common.SessionCredentials; |
| import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; |
| import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
| import org.apache.rocketmq.client.exception.MQClientException; |
| import org.apache.rocketmq.client.producer.DefaultMQProducer; |
| import org.apache.rocketmq.common.UtilAll; |
| import org.apache.rocketmq.common.consumer.ConsumeFromWhere; |
| import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; |
| import org.apache.rocketmq.connect.runtime.config.ConnectConfig; |
| import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine; |
| import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy; |
| import org.apache.rocketmq.remoting.RPCHook; |
| import org.apache.rocketmq.remoting.common.RemotingUtil; |
| import org.apache.rocketmq.remoting.protocol.LanguageCode; |
| import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; |
| import org.apache.rocketmq.tools.command.CommandUtil; |
| |
| public class ConnectUtil { |
| |
| public static String createGroupName(String prefix) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append(prefix).append("-"); |
| sb.append(RemotingUtil.getLocalAddress()).append("-"); |
| sb.append(UtilAll.getPid()).append("-"); |
| sb.append(System.nanoTime()); |
| return sb.toString().replace(".", "-"); |
| } |
| |
| public static String createGroupName(String prefix, String postfix) { |
| return new StringBuilder().append(prefix).append("-").append(postfix).toString(); |
| } |
| |
| public static String createInstance(String servers) { |
| String[] serversArray = servers.split(";"); |
| List<String> serversList = new ArrayList<String>(); |
| for (String server : serversArray) { |
| if (!serversList.contains(server)) { |
| serversList.add(server); |
| } |
| } |
| Collections.sort(serversList); |
| return String.valueOf(serversList.toString().hashCode()); |
| } |
| |
| public static String createUniqInstance(String prefix) { |
| return new StringBuffer(prefix).append("-").append(UUID.randomUUID().toString()).toString(); |
| } |
| |
| public static AllocateConnAndTaskStrategy initAllocateConnAndTaskStrategy(ConnectConfig connectConfig) { |
| try { |
| return (AllocateConnAndTaskStrategy) Thread.currentThread().getContextClassLoader().loadClass(connectConfig.getAllocTaskStrategy()).newInstance(); |
| } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { |
| throw new IllegalArgumentException(e); |
| } |
| } |
| |
| public static DefaultMQProducer initDefaultMQProducer(ConnectConfig connectConfig) { |
| RPCHook rpcHook = null; |
| if (connectConfig.getAclEnable()) { |
| rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey())); |
| } |
| DefaultMQProducer producer = new DefaultMQProducer(rpcHook); |
| producer.setNamesrvAddr(connectConfig.getNamesrvAddr()); |
| producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr())); |
| producer.setProducerGroup(connectConfig.getRmqProducerGroup()); |
| producer.setSendMsgTimeout(connectConfig.getOperationTimeout()); |
| producer.setMaxMessageSize(RuntimeConfigDefine.MAX_MESSAGE_SIZE); |
| producer.setLanguage(LanguageCode.JAVA); |
| return producer; |
| } |
| |
| public static DefaultMQPullConsumer initDefaultMQPullConsumer(ConnectConfig connectConfig) { |
| RPCHook rpcHook = null; |
| if (connectConfig.getAclEnable()) { |
| rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey())); |
| } |
| DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rpcHook); |
| consumer.setNamesrvAddr(connectConfig.getNamesrvAddr()); |
| consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr())); |
| consumer.setConsumerGroup(connectConfig.getRmqConsumerGroup()); |
| consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes()); |
| consumer.setBrokerSuspendMaxTimeMillis(connectConfig.getBrokerSuspendMaxTimeMillis()); |
| consumer.setConsumerPullTimeoutMillis((long) connectConfig.getRmqMessageConsumeTimeout()); |
| consumer.setLanguage(LanguageCode.JAVA); |
| return consumer; |
| } |
| |
| public static DefaultMQPushConsumer initDefaultMQPushConsumer(ConnectConfig connectConfig) { |
| RPCHook rpcHook = null; |
| if (connectConfig.getAclEnable()) { |
| rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey())); |
| } |
| DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rpcHook); |
| consumer.setNamesrvAddr(connectConfig.getNamesrvAddr()); |
| consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr())); |
| consumer.setConsumerGroup(createGroupName(connectConfig.getRmqConsumerGroup())); |
| consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes()); |
| consumer.setConsumeTimeout((long) connectConfig.getRmqMessageConsumeTimeout()); |
| consumer.setConsumeThreadMin(connectConfig.getRmqMinConsumeThreadNums()); |
| consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); |
| consumer.setLanguage(LanguageCode.JAVA); |
| return consumer; |
| } |
| |
| public static DefaultMQAdminExt startMQAdminTool(ConnectConfig connectConfig) throws MQClientException { |
| RPCHook rpcHook = null; |
| if (connectConfig.getAclEnable()) { |
| rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey())); |
| } |
| DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); |
| defaultMQAdminExt.setNamesrvAddr(connectConfig.getNamesrvAddr()); |
| defaultMQAdminExt.setAdminExtGroup(connectConfig.getAdminExtGroup()); |
| defaultMQAdminExt.setInstanceName(ConnectUtil.createUniqInstance(connectConfig.getNamesrvAddr())); |
| defaultMQAdminExt.start(); |
| return defaultMQAdminExt; |
| } |
| |
| public static String createSubGroup(ConnectConfig connectConfig, String subGroup) { |
| DefaultMQAdminExt defaultMQAdminExt = null; |
| try { |
| defaultMQAdminExt = startMQAdminTool(connectConfig); |
| SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig(); |
| initConfig.setGroupName(subGroup); |
| |
| Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, connectConfig.getClusterName()); |
| for (String addr : masterSet) { |
| defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig); |
| } |
| } catch (Exception e) { |
| throw new IllegalArgumentException("create subGroup: " + subGroup + " failed", e); |
| } finally { |
| if (defaultMQAdminExt != null) { |
| defaultMQAdminExt.shutdown(); |
| } |
| } |
| return subGroup; |
| } |
| } |