blob: 401715c9738fcdad41975331188f405582ff1fc4 [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.instance;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.logging.Logger;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.basics.NIOLooper;
import com.twitter.heron.common.basics.SingletonRegistry;
import com.twitter.heron.common.config.SystemConfig;
import com.twitter.heron.common.network.HeronSocketOptions;
import com.twitter.heron.common.utils.logging.ErrorReportLoggingHandler;
import com.twitter.heron.common.utils.metrics.JVMMetrics;
import com.twitter.heron.common.utils.metrics.MetricsCollector;
import com.twitter.heron.common.utils.misc.ThreadNames;
import com.twitter.heron.metrics.GatewayMetrics;
import com.twitter.heron.network.MetricsManagerClient;
import com.twitter.heron.network.StreamManagerClient;
import com.twitter.heron.proto.system.HeronTuples;
import com.twitter.heron.proto.system.Metrics;
import com.twitter.heron.proto.system.PhysicalPlans;
/**
* Gateway is a Runnable and will be executed in a thread.
* It will new the streamManagerClient and metricsManagerClient in constructor and
* ask them to connect with corresponding socket endpoint in run().
*/
public class Gateway implements Runnable, AutoCloseable {
private static final Logger LOG = Logger.getLogger(Gateway.class.getName());
// Some pre-defined value
private static final String STREAM_MGR_HOST = "127.0.0.1";
private static final String METRICS_MGR_HOST = "127.0.0.1";
// MetricsManagerClient will communicate with Metrics Manager
private final MetricsManagerClient metricsManagerClient;
// StreamManagerClient will communicate with Stream Manager
private final StreamManagerClient streamManagerClient;
private final NIOLooper gatewayLooper;
private final MetricsCollector gatewayMetricsCollector;
private final JVMMetrics jvmMetrics;
private final GatewayMetrics gatewayMetrics;
private final SystemConfig systemConfig;
/**
* Construct a Gateway basing on given arguments
*/
public Gateway(String topologyName, String topologyId, PhysicalPlans.Instance instance,
int streamPort, int metricsPort, final NIOLooper gatewayLooper,
final Communicator<HeronTuples.HeronTupleSet> inStreamQueue,
final Communicator<HeronTuples.HeronTupleSet> outStreamQueue,
final Communicator<InstanceControlMsg> inControlQueue,
final List<Communicator<Metrics.MetricPublisherPublishMessage>> outMetricsQueues)
throws IOException {
systemConfig =
(SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
// New the client
this.gatewayLooper = gatewayLooper;
this.gatewayMetricsCollector = new MetricsCollector(gatewayLooper, outMetricsQueues.get(0));
// JVM Metrics are auto-sample metrics so we do not have to insert it inside singleton
// since it should not be used in other places
jvmMetrics = new JVMMetrics();
jvmMetrics.registerMetrics(gatewayMetricsCollector);
// since we need to call its methods in a lot of classes
gatewayMetrics = new GatewayMetrics();
gatewayMetrics.registerMetrics(gatewayMetricsCollector);
// Init the ErrorReportHandler
ErrorReportLoggingHandler.init(gatewayMetricsCollector,
systemConfig.getHeronMetricsExportInterval(),
systemConfig.getHeronMetricsMaxExceptionsPerMessageCount());
// Initialize the corresponding 2 socket clients with corresponding socket options
HeronSocketOptions socketOptions = new HeronSocketOptions(
systemConfig.getInstanceNetworkWriteBatchSize(),
systemConfig.getInstanceNetworkWriteBatchTime(),
systemConfig.getInstanceNetworkReadBatchSize(),
systemConfig.getInstanceNetworkReadBatchTime(),
systemConfig.getInstanceNetworkOptionsSocketSendBufferSize(),
systemConfig.getInstanceNetworkOptionsSocketReceivedBufferSize()
);
this.streamManagerClient =
new StreamManagerClient(gatewayLooper, STREAM_MGR_HOST, streamPort,
topologyName, topologyId, instance,
inStreamQueue, outStreamQueue, inControlQueue,
socketOptions, gatewayMetrics);
this.metricsManagerClient = new MetricsManagerClient(gatewayLooper, METRICS_MGR_HOST,
metricsPort, instance, outMetricsQueues, socketOptions, gatewayMetrics);
// Attach sample Runnable to gatewayMetricsCollector
gatewayMetricsCollector.registerMetricSampleRunnable(jvmMetrics.getJVMSampleRunnable(),
systemConfig.getInstanceMetricsSystemSampleInterval());
Runnable sampleStreamQueuesSize = new Runnable() {
@Override
public void run() {
gatewayMetrics.setInStreamQueueSize(inStreamQueue.size());
gatewayMetrics.setOutStreamQueueSize(outStreamQueue.size());
gatewayMetrics.setInStreamQueueExpectedCapacity(
inStreamQueue.getExpectedAvailableCapacity());
gatewayMetrics.setOutStreamQueueExpectedCapacity(
outStreamQueue.getExpectedAvailableCapacity());
}
};
gatewayMetricsCollector.registerMetricSampleRunnable(sampleStreamQueuesSize,
systemConfig.getInstanceMetricsSystemSampleInterval());
final Duration instanceTuningInterval = systemConfig.getInstanceTuningInterval();
// Attache Runnable to update the expected stream's expected available capacity
Runnable tuningStreamQueueSize = new Runnable() {
@Override
public void run() {
inStreamQueue.updateExpectedAvailableCapacity();
outStreamQueue.updateExpectedAvailableCapacity();
gatewayLooper.registerTimerEvent(instanceTuningInterval, this);
}
};
gatewayLooper.registerTimerEvent(
systemConfig.getInstanceMetricsSystemSampleInterval(),
tuningStreamQueueSize);
}
@Override
public void run() {
Thread.currentThread().setName(ThreadNames.THREAD_GATEWAY_NAME);
streamManagerClient.start();
metricsManagerClient.start();
gatewayLooper.loop();
}
public void close() {
LOG.info("Closing the Gateway thread");
this.gatewayMetricsCollector.forceGatherAllMetrics();
this.metricsManagerClient.sendAllMessage();
this.streamManagerClient.sendAllMessage();
this.metricsManagerClient.stop();
this.streamManagerClient.stop();
}
}