| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.inlong.dataproxy.source; |
| |
| import org.apache.inlong.common.heartbeat.ReportResourceType; |
| import org.apache.inlong.common.metric.MetricRegister; |
| import org.apache.inlong.dataproxy.admin.ProxyServiceMBean; |
| import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor; |
| import org.apache.inlong.dataproxy.config.CommonConfigHolder; |
| import org.apache.inlong.dataproxy.config.ConfigManager; |
| import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback; |
| import org.apache.inlong.dataproxy.consts.AttrConstants; |
| import org.apache.inlong.dataproxy.consts.SourceConstants; |
| import org.apache.inlong.dataproxy.consts.StatConstants; |
| import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; |
| import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; |
| import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; |
| import org.apache.inlong.dataproxy.metrics.stats.MonitorIndex; |
| import org.apache.inlong.dataproxy.metrics.stats.MonitorStats; |
| import org.apache.inlong.dataproxy.source.httpMsg.HttpMessageHandler; |
| import org.apache.inlong.dataproxy.utils.AddressUtils; |
| import org.apache.inlong.dataproxy.utils.ConfStringUtils; |
| import org.apache.inlong.dataproxy.utils.DateTimeUtils; |
| import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder; |
| import org.apache.inlong.sdk.commons.admin.AdminServiceRegister; |
| |
| import com.google.common.base.Preconditions; |
| import io.netty.channel.Channel; |
| import io.netty.channel.ChannelFuture; |
| import io.netty.channel.ChannelInitializer; |
| import io.netty.channel.EventLoopGroup; |
| import io.netty.channel.group.ChannelGroup; |
| import io.netty.channel.group.DefaultChannelGroup; |
| import io.netty.util.concurrent.GlobalEventExecutor; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.flume.ChannelSelector; |
| import org.apache.flume.Context; |
| import org.apache.flume.Event; |
| import org.apache.flume.EventDrivenSource; |
| import org.apache.flume.FlumeException; |
| import org.apache.flume.channel.ChannelProcessor; |
| import org.apache.flume.conf.Configurable; |
| import org.apache.flume.source.AbstractSource; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.lang.reflect.Constructor; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.Map; |
| |
| /** |
| * source base class |
| */ |
| public abstract class BaseSource |
| extends |
| AbstractSource |
| implements |
| ConfigUpdateCallback, |
| ProxyServiceMBean, |
| EventDrivenSource, |
| Configurable { |
| |
| private static final Logger logger = LoggerFactory.getLogger(BaseSource.class); |
| |
| protected Context context; |
| // whether source reject service |
| protected volatile boolean isRejectService = false; |
| protected String cachedSrcName; |
| protected ChannelProcessor cachedChProcessor; |
| // source service host |
| protected String srcHost; |
| // source serviced port |
| protected int srcPort; |
| protected String strPort; |
| // report source name |
| protected String rptSrcType; |
| // message factory name |
| protected String msgFactoryName; |
| // message handler name |
| protected String messageHandlerName; |
| // allowed max message length |
| protected int maxMsgLength; |
| // whether compress message |
| protected boolean isCompressed; |
| // whether filter empty message |
| protected boolean filterEmptyMsg; |
| // whether custom channel processor |
| protected boolean customProcessor; |
| // max netty worker threads |
| protected int maxWorkerThreads; |
| // max netty accept threads |
| protected int maxAcceptThreads; |
| // max read idle time |
| protected long maxReadIdleTimeMs; |
| // max connection count |
| protected int maxConnections; |
| // reuse address |
| protected boolean reuseAddress; |
| // connect backlog |
| protected int conBacklog; |
| // connect linger |
| protected int conLinger = -1; |
| // netty parameters |
| protected EventLoopGroup acceptorGroup; |
| protected EventLoopGroup workerGroup; |
| protected ChannelGroup allChannels; |
| protected ChannelFuture channelFuture; |
| // receive buffer size |
| protected int maxRcvBufferSize; |
| // send buffer size |
| protected int maxSendBufferSize; |
| // file metric statistic |
| private MonitorIndex monitorIndex = null; |
| private MonitorStats monitorStats = null; |
| // metric set |
| private DataProxyMetricItemSet metricItemSet; |
| // whether enable file metric |
| protected boolean enableFileMetric; |
| |
| public BaseSource() { |
| super(); |
| allChannels = new DefaultChannelGroup("DefaultChannelGroup", GlobalEventExecutor.INSTANCE); |
| this.enableFileMetric = CommonConfigHolder.getInstance().isEnableFileMetric(); |
| } |
| |
| @Override |
| public void configure(Context context) { |
| this.cachedSrcName = getName(); |
| logger.info("{} start to configure context:{}.", this.cachedSrcName, context.toString()); |
| this.context = context; |
| this.srcHost = getHostIp(context); |
| this.srcPort = getHostPort(context); |
| this.strPort = String.valueOf(this.srcPort); |
| // get source logic type |
| String tmpVal = context.getString( |
| SourceConstants.SRCCXT_LOGIC_TYPE_NAME, ReportResourceType.INLONG); |
| Preconditions.checkArgument(StringUtils.isNotBlank(tmpVal), |
| SourceConstants.SRCCXT_LOGIC_TYPE_NAME + " config is blank"); |
| this.rptSrcType = tmpVal.trim().toUpperCase(); |
| // get message factory |
| tmpVal = context.getString(SourceConstants.SRCCXT_MSG_FACTORY_NAME, |
| ServerMessageFactory.class.getName()).trim(); |
| Preconditions.checkArgument(StringUtils.isNotBlank(tmpVal), |
| SourceConstants.SRCCXT_MSG_FACTORY_NAME + " config is blank"); |
| this.msgFactoryName = tmpVal.trim(); |
| // get message handler |
| tmpVal = context.getString(SourceConstants.SRCCXT_MESSAGE_HANDLER_NAME); |
| if (StringUtils.isBlank(tmpVal)) { |
| tmpVal = SourceConstants.SRC_PROTOCOL_TYPE_HTTP.equalsIgnoreCase(getProtocolName()) |
| ? HttpMessageHandler.class.getName() |
| : ServerMessageHandler.class.getName(); |
| } |
| Preconditions.checkArgument(StringUtils.isNotBlank(tmpVal), |
| SourceConstants.SRCCXT_MESSAGE_HANDLER_NAME + " config is blank"); |
| this.messageHandlerName = tmpVal; |
| // get allowed max message length |
| this.maxMsgLength = ConfStringUtils.getIntValue(context, |
| SourceConstants.SRCCXT_MAX_MSG_LENGTH, SourceConstants.VAL_DEF_MAX_MSG_LENGTH); |
| Preconditions.checkArgument((this.maxMsgLength >= SourceConstants.VAL_MIN_MAX_MSG_LENGTH |
| && this.maxMsgLength <= SourceConstants.VAL_MAX_MAX_MSG_LENGTH), |
| SourceConstants.SRCCXT_MAX_MSG_LENGTH + " must be in [" |
| + SourceConstants.VAL_MIN_MAX_MSG_LENGTH + ", " |
| + SourceConstants.VAL_MAX_MAX_MSG_LENGTH + "]"); |
| // get whether compress message |
| this.isCompressed = context.getBoolean(SourceConstants.SRCCXT_MSG_COMPRESSED, |
| SourceConstants.VAL_DEF_MSG_COMPRESSED); |
| // get whether filter empty message |
| this.filterEmptyMsg = context.getBoolean(SourceConstants.SRCCXT_FILTER_EMPTY_MSG, |
| SourceConstants.VAL_DEF_FILTER_EMPTY_MSG); |
| // get whether custom channel processor |
| this.customProcessor = context.getBoolean(SourceConstants.SRCCXT_CUSTOM_CHANNEL_PROCESSOR, |
| SourceConstants.VAL_DEF_CUSTOM_CH_PROCESSOR); |
| // get max accept threads |
| this.maxAcceptThreads = ConfStringUtils.getIntValue(context, |
| SourceConstants.SRCCXT_MAX_ACCEPT_THREADS, SourceConstants.VAL_DEF_NET_ACCEPT_THREADS); |
| Preconditions.checkArgument((this.maxAcceptThreads >= SourceConstants.VAL_MIN_ACCEPT_THREADS |
| && this.maxAcceptThreads <= SourceConstants.VAL_MAX_ACCEPT_THREADS), |
| SourceConstants.SRCCXT_MAX_ACCEPT_THREADS + " must be in [" |
| + SourceConstants.VAL_MIN_ACCEPT_THREADS + ", " |
| + SourceConstants.VAL_MAX_ACCEPT_THREADS + "]"); |
| // get max worker threads |
| this.maxWorkerThreads = ConfStringUtils.getIntValue(context, |
| SourceConstants.SRCCXT_MAX_WORKER_THREADS, SourceConstants.VAL_DEF_WORKER_THREADS); |
| Preconditions.checkArgument((this.maxWorkerThreads >= SourceConstants.VAL_MIN_WORKER_THREADS), |
| SourceConstants.SRCCXT_MAX_WORKER_THREADS + " must be >= " |
| + SourceConstants.VAL_MIN_WORKER_THREADS); |
| // get max read idle time |
| this.maxReadIdleTimeMs = ConfStringUtils.getLongValue(context, |
| SourceConstants.SRCCXT_MAX_READ_IDLE_TIME_MS, SourceConstants.VAL_DEF_READ_IDLE_TIME_MS); |
| Preconditions.checkArgument((this.maxReadIdleTimeMs >= SourceConstants.VAL_MIN_READ_IDLE_TIME_MS |
| && this.maxReadIdleTimeMs <= SourceConstants.VAL_MAX_READ_IDLE_TIME_MS), |
| SourceConstants.SRCCXT_MAX_READ_IDLE_TIME_MS + " must be in [" |
| + SourceConstants.VAL_MIN_READ_IDLE_TIME_MS + ", " |
| + SourceConstants.VAL_MAX_READ_IDLE_TIME_MS + "]"); |
| // get max connect count |
| this.maxConnections = ConfStringUtils.getIntValue(context, |
| SourceConstants.SRCCXT_MAX_CONNECTION_CNT, SourceConstants.VAL_DEF_MAX_CONNECTION_CNT); |
| Preconditions.checkArgument(this.maxConnections >= SourceConstants.VAL_MIN_CONNECTION_CNT, |
| SourceConstants.SRCCXT_MAX_CONNECTION_CNT + " must be >= " |
| + SourceConstants.VAL_MIN_CONNECTION_CNT); |
| // get connect backlog |
| this.conBacklog = ConfStringUtils.getIntValue(context, |
| SourceConstants.SRCCXT_CONN_BACKLOG, SourceConstants.VAL_DEF_CONN_BACKLOG); |
| Preconditions.checkArgument(this.conBacklog >= SourceConstants.VAL_MIN_CONN_BACKLOG, |
| SourceConstants.SRCCXT_CONN_BACKLOG + " must be >= " |
| + SourceConstants.VAL_MIN_CONN_BACKLOG); |
| // get connect linger |
| Integer tmpValue = context.getInteger(SourceConstants.SRCCXT_CONN_LINGER); |
| if (tmpValue != null && tmpValue >= 0) { |
| this.conLinger = tmpValue; |
| } |
| // get whether reuse address |
| this.reuseAddress = context.getBoolean(SourceConstants.SRCCXT_REUSE_ADDRESS, |
| SourceConstants.VAL_DEF_REUSE_ADDRESS); |
| |
| // get whether custom channel processor |
| this.customProcessor = context.getBoolean(SourceConstants.SRCCXT_CUSTOM_CHANNEL_PROCESSOR, |
| SourceConstants.VAL_DEF_CUSTOM_CH_PROCESSOR); |
| // get max receive buffer size |
| this.maxRcvBufferSize = ConfStringUtils.getIntValue(context, |
| SourceConstants.SRCCXT_RECEIVE_BUFFER_SIZE, SourceConstants.VAL_DEF_RECEIVE_BUFFER_SIZE); |
| Preconditions.checkArgument(this.maxRcvBufferSize >= SourceConstants.VAL_MIN_RECEIVE_BUFFER_SIZE, |
| SourceConstants.SRCCXT_RECEIVE_BUFFER_SIZE + " must be >= " |
| + SourceConstants.VAL_MIN_RECEIVE_BUFFER_SIZE); |
| // get max send buffer size |
| this.maxSendBufferSize = ConfStringUtils.getIntValue(context, |
| SourceConstants.SRCCXT_SEND_BUFFER_SIZE, SourceConstants.VAL_DEF_SEND_BUFFER_SIZE); |
| Preconditions.checkArgument(this.maxSendBufferSize >= SourceConstants.VAL_MIN_SEND_BUFFER_SIZE, |
| SourceConstants.SRCCXT_SEND_BUFFER_SIZE + " must be >= " |
| + SourceConstants.VAL_MIN_SEND_BUFFER_SIZE); |
| } |
| |
| @Override |
| public synchronized void start() { |
| if (customProcessor) { |
| ChannelSelector selector = getChannelProcessor().getSelector(); |
| FailoverChannelProcessor newProcessor = new FailoverChannelProcessor(selector); |
| newProcessor.configure(this.context); |
| setChannelProcessor(newProcessor); |
| FailoverChannelProcessorHolder.setChannelProcessor(newProcessor); |
| } |
| super.start(); |
| this.cachedChProcessor = getChannelProcessor(); |
| // initial metric item set |
| this.metricItemSet = new DataProxyMetricItemSet( |
| CommonConfigHolder.getInstance().getClusterName(), this.cachedSrcName, String.valueOf(srcPort)); |
| MetricRegister.register(metricItemSet); |
| // init monitor logic |
| if (enableFileMetric) { |
| this.monitorIndex = new MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSourceOutName(), |
| CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L, |
| CommonConfigHolder.getInstance().getFileMetricStatCacheCnt()); |
| this.monitorIndex.start(); |
| this.monitorStats = new MonitorStats( |
| CommonConfigHolder.getInstance().getFileMetricEventOutName() |
| + AttrConstants.SEP_HASHTAG + this.cachedSrcName, |
| CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L, |
| CommonConfigHolder.getInstance().getFileMetricStatCacheCnt()); |
| this.monitorStats.start(); |
| } |
| startSource(); |
| // register |
| AdminServiceRegister.register(ProxyServiceMBean.MBEAN_TYPE, this.cachedSrcName, this); |
| } |
| |
| @Override |
| public synchronized void stop() { |
| logger.info("[STOP {} SOURCE]{} stopping...", this.getProtocolName(), this.cachedSrcName); |
| // close channels |
| if (!allChannels.isEmpty()) { |
| try { |
| allChannels.close().awaitUninterruptibly(); |
| } catch (Exception e) { |
| logger.warn("Close {} netty channels throw exception", this.cachedSrcName, e); |
| } finally { |
| allChannels.clear(); |
| } |
| } |
| // close channel future |
| if (channelFuture != null) { |
| try { |
| channelFuture.channel().closeFuture().sync(); |
| } catch (InterruptedException e) { |
| logger.warn("Close {} channel future throw exception", this.cachedSrcName, e); |
| } |
| } |
| // stop super class |
| super.stop(); |
| // stop workers |
| if (this.acceptorGroup != null) { |
| this.acceptorGroup.shutdownGracefully(); |
| } |
| if (this.workerGroup != null) { |
| this.workerGroup.shutdownGracefully(); |
| } |
| // stop file statistic index |
| if (enableFileMetric) { |
| if (monitorIndex != null) { |
| monitorIndex.stop(); |
| } |
| if (monitorStats != null) { |
| monitorStats.stop(); |
| } |
| } |
| logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(), this.cachedSrcName); |
| } |
| |
| @Override |
| public void update() { |
| // check current all links |
| if (ConfigManager.getInstance().needChkIllegalIP()) { |
| int cnt = 0; |
| Channel channel; |
| String strRemoteIP; |
| long startTime = System.currentTimeMillis(); |
| Iterator<Channel> iterator = allChannels.iterator(); |
| while (iterator.hasNext()) { |
| channel = iterator.next(); |
| strRemoteIP = AddressUtils.getChannelRemoteIP(channel); |
| if (strRemoteIP == null) { |
| continue; |
| } |
| if (ConfigManager.getInstance().isIllegalIP(strRemoteIP)) { |
| channel.disconnect(); |
| channel.close(); |
| allChannels.remove(channel); |
| cnt++; |
| logger.error(strRemoteIP + " is Illegal IP, so disconnect it !"); |
| } |
| } |
| logger.info("Source {} channel check, disconnects {} Illegal channels, waist {} ms", |
| this.cachedSrcName, cnt, (System.currentTimeMillis() - startTime)); |
| } |
| } |
| |
| /** |
| * get metricItemSet |
| * |
| * @return the metricItemSet |
| */ |
| public DataProxyMetricItemSet getMetricItemSet() { |
| return metricItemSet; |
| } |
| |
| public Context getContext() { |
| return context; |
| } |
| |
| public String getSrcHost() { |
| return srcHost; |
| } |
| |
| public int getSrcPort() { |
| return srcPort; |
| } |
| |
| public String getStrPort() { |
| return strPort; |
| } |
| |
| public int getMaxMsgLength() { |
| return maxMsgLength; |
| } |
| |
| public boolean isCompressed() { |
| return isCompressed; |
| } |
| |
| public boolean isFilterEmptyMsg() { |
| return filterEmptyMsg; |
| } |
| |
| public boolean isCustomProcessor() { |
| return customProcessor; |
| } |
| |
| public int getMaxConnections() { |
| return maxConnections; |
| } |
| |
| public ChannelGroup getAllChannels() { |
| return allChannels; |
| } |
| |
| public long getMaxReadIdleTimeMs() { |
| return maxReadIdleTimeMs; |
| } |
| |
| public String getMessageHandlerName() { |
| return messageHandlerName; |
| } |
| |
| public int getMaxWorkerThreads() { |
| return maxWorkerThreads; |
| } |
| |
| public void fileMetricIncSumStats(String eventKey) { |
| if (enableFileMetric) { |
| monitorStats.incSumStats(eventKey); |
| } |
| } |
| |
| public void fileMetricIncWithDetailStats(String eventKey, String detailInfoKey) { |
| if (enableFileMetric) { |
| monitorStats.incSumStats(eventKey); |
| monitorStats.incDetailStats(eventKey + "#" + detailInfoKey); |
| } |
| } |
| |
| public void fileMetricAddSuccStats(StringBuilder strBuff, String groupId, String streamId, |
| String topicName, String clientIP, String msgProcType, |
| long dt, long pkgTime, int cnt, int packCnt, long packSize) { |
| fileMetricIncStats(strBuff, true, groupId, streamId, topicName, |
| clientIP, msgProcType, dt, pkgTime, cnt, packCnt, packSize, 0); |
| } |
| |
| public void fileMetricAddFailStats(StringBuilder strBuff, String groupId, String streamId, |
| String topicName, String clientIP, String msgProcType, long dt, long pkgTime, int failCnt) { |
| fileMetricIncStats(strBuff, false, groupId, streamId, topicName, |
| clientIP, msgProcType, dt, pkgTime, 0, 0, 0, failCnt); |
| } |
| |
| private void fileMetricIncStats(StringBuilder strBuff, boolean isSucc, String groupId, |
| String streamId, String topicName, String clientIP, String msgProcType, |
| long dt, long pkgTime, int cnt, int packCnt, long packSize, int failCnt) { |
| if (!enableFileMetric) { |
| return; |
| } |
| String tenMinsDt = DateTimeUtils.ms2yyyyMMddHHmmTenMins(dt); |
| strBuff.append(this.cachedSrcName).append(AttrConstants.SEP_HASHTAG) |
| .append(groupId).append(AttrConstants.SEP_HASHTAG) |
| .append(streamId).append(AttrConstants.SEP_HASHTAG) |
| .append(topicName).append(AttrConstants.SEP_HASHTAG) |
| .append(msgProcType).append(AttrConstants.SEP_HASHTAG) |
| .append(srcHost).append(AttrConstants.SEP_HASHTAG) |
| .append(clientIP).append(AttrConstants.SEP_HASHTAG) |
| .append(tenMinsDt).append(AttrConstants.SEP_HASHTAG) |
| .append(DateTimeUtils.ms2yyyyMMddHHmm(pkgTime)); |
| if (isSucc) { |
| monitorStats.incSumStats(StatConstants.EVENT_MSG_V0_POST_SUCCESS); |
| monitorIndex.addSuccStats(strBuff.toString(), cnt, packCnt, packSize); |
| } else { |
| monitorIndex.addFailStats(strBuff.toString(), failCnt); |
| monitorStats.incSumStats(StatConstants.EVENT_MSG_V0_POST_FAILURE); |
| } |
| strBuff.delete(0, strBuff.length()); |
| } |
| |
| /** |
| * addMetric |
| * |
| * @param result |
| * @param size |
| * @param event |
| */ |
| public void addMetric(boolean result, long size, Event event) { |
| Map<String, String> dimensions = new HashMap<>(); |
| dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, CommonConfigHolder.getInstance().getClusterName()); |
| dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, this.cachedSrcName); |
| dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, getStrPort()); |
| DataProxyMetricItem.fillInlongId(event, dimensions); |
| DataProxyMetricItem.fillAuditFormatTime(event, dimensions); |
| DataProxyMetricItem metricItem = metricItemSet.findMetricItem(dimensions); |
| if (result) { |
| metricItem.readSuccessCount.incrementAndGet(); |
| metricItem.readSuccessSize.addAndGet(size); |
| AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event); |
| } else { |
| metricItem.readFailCount.incrementAndGet(); |
| metricItem.readFailSize.addAndGet(size); |
| } |
| } |
| |
| /** |
| * channel factory |
| * |
| * @return |
| */ |
| public ChannelInitializer getChannelInitializerFactory() { |
| ChannelInitializer fac = null; |
| logger.info(this.cachedSrcName + " load msgFactory=" + msgFactoryName); |
| try { |
| Class<? extends ChannelInitializer> clazz = |
| (Class<? extends ChannelInitializer>) Class.forName(msgFactoryName); |
| Constructor ctor = clazz.getConstructor(BaseSource.class); |
| logger.info("Using channel processor:{}", getChannelProcessor().getClass().getName()); |
| fac = (ChannelInitializer) ctor.newInstance(this); |
| } catch (Exception e) { |
| logger.error("{} start error, fail to construct ChannelPipelineFactory with name {}", |
| this.cachedSrcName, msgFactoryName, e); |
| stop(); |
| throw new FlumeException(e.getMessage()); |
| } |
| return fac; |
| } |
| |
| public abstract String getProtocolName(); |
| |
| public abstract void startSource(); |
| |
| /** |
| * stopService |
| */ |
| @Override |
| public void stopService() { |
| this.isRejectService = true; |
| } |
| |
| /** |
| * recoverService |
| */ |
| @Override |
| public void recoverService() { |
| this.isRejectService = false; |
| } |
| |
| /** |
| * isRejectService |
| * |
| * @return |
| */ |
| public boolean isRejectService() { |
| return isRejectService; |
| } |
| |
| public String getCachedSrcName() { |
| return cachedSrcName; |
| } |
| |
| public ChannelProcessor getCachedChProcessor() { |
| return cachedChProcessor; |
| } |
| |
| /** |
| * getHostIp |
| * |
| * @param context |
| * @return |
| */ |
| private String getHostIp(Context context) { |
| String result = null; |
| // first get host ip from dataProxy.conf |
| String tmpVal = context.getString(SourceConstants.SRCCXT_CONFIG_HOST); |
| if (StringUtils.isNotBlank(tmpVal)) { |
| tmpVal = tmpVal.trim(); |
| Preconditions.checkArgument(ConfStringUtils.isValidIp(tmpVal), |
| SourceConstants.SRCCXT_CONFIG_HOST + "(" + tmpVal + ") config in conf not valid"); |
| result = tmpVal; |
| } |
| // second get host ip from system env |
| Map<String, String> envMap = System.getenv(); |
| if (envMap.containsKey(SourceConstants.SYSENV_HOST_IP)) { |
| tmpVal = envMap.get(SourceConstants.SYSENV_HOST_IP); |
| Preconditions.checkArgument(ConfStringUtils.isValidIp(tmpVal), |
| SourceConstants.SYSENV_HOST_IP + "(" + tmpVal + ") config in system env not valid"); |
| result = tmpVal.trim(); |
| } |
| if (StringUtils.isBlank(result)) { |
| result = SourceConstants.VAL_DEF_HOST_VALUE; |
| } |
| return result; |
| } |
| |
| /** |
| * getHostPort |
| * |
| * @param context |
| * @return |
| */ |
| private int getHostPort(Context context) { |
| Integer result = null; |
| // first get host port from dataProxy.conf |
| String tmpVal = context.getString(SourceConstants.SRCCXT_CONFIG_PORT); |
| if (StringUtils.isNotBlank(tmpVal)) { |
| tmpVal = tmpVal.trim(); |
| try { |
| result = Integer.parseInt(tmpVal); |
| } catch (Throwable e) { |
| throw new IllegalArgumentException( |
| SourceConstants.SYSENV_HOST_PORT + "(" + tmpVal + ") config in conf not integer"); |
| } |
| } |
| if (result != null) { |
| Preconditions.checkArgument(ConfStringUtils.isValidPort(result), |
| SourceConstants.SRCCXT_CONFIG_PORT + "(" + result + ") config in conf not valid"); |
| } |
| // second get host port from system env |
| Map<String, String> envMap = System.getenv(); |
| if (envMap.containsKey(SourceConstants.SYSENV_HOST_PORT)) { |
| tmpVal = envMap.get(SourceConstants.SYSENV_HOST_PORT); |
| if (StringUtils.isNotBlank(tmpVal)) { |
| tmpVal = tmpVal.trim(); |
| try { |
| result = Integer.parseInt(tmpVal); |
| } catch (Throwable e) { |
| throw new IllegalArgumentException( |
| SourceConstants.SYSENV_HOST_PORT + "(" + tmpVal + ") config in system env not integer"); |
| } |
| Preconditions.checkArgument(ConfStringUtils.isValidPort(result), |
| SourceConstants.SYSENV_HOST_PORT + "(" + tmpVal + ") config in system env not valid"); |
| } |
| } |
| if (result == null) { |
| throw new IllegalArgumentException("Required parameter " + |
| SourceConstants.SRCCXT_CONFIG_PORT + " must exist and may not be null"); |
| } |
| return result; |
| } |
| |
| } |