| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.heron.metricsmgr; |
| |
| import java.net.SocketAddress; |
| import java.nio.channels.SocketChannel; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import com.google.protobuf.Message; |
| |
| import org.apache.heron.api.metric.MultiCountMetric; |
| import org.apache.heron.common.basics.Communicator; |
| import org.apache.heron.common.basics.NIOLooper; |
| import org.apache.heron.common.basics.SingletonRegistry; |
| import org.apache.heron.common.network.HeronServer; |
| import org.apache.heron.common.network.HeronSocketOptions; |
| import org.apache.heron.common.network.REQID; |
| import org.apache.heron.proto.system.Common; |
| import org.apache.heron.proto.system.Metrics; |
| import org.apache.heron.proto.tmaster.TopologyMaster; |
| import org.apache.heron.spi.metricsmgr.metrics.ExceptionInfo; |
| import org.apache.heron.spi.metricsmgr.metrics.MetricsInfo; |
| import org.apache.heron.spi.metricsmgr.metrics.MetricsRecord; |
| |
| public class MetricsManagerServer extends HeronServer { |
| private static final Logger LOG = Logger.getLogger(MetricsManagerServer.class.getName()); |
| |
| // Bean name to register the TMasterLocation object into SingletonRegistry |
| private static final String TMASTER_LOCATION_BEAN_NAME = |
| TopologyMaster.TMasterLocation.newBuilder().getDescriptorForType().getFullName(); |
| public static final String METRICSCACHE_LOCATION_BEAN_NAME = |
| TopologyMaster.MetricsCacheLocation.newBuilder().getDescriptorForType().getFullName(); |
| |
| // Metrics Counter Name |
| private static final String SERVER_CLOSE_PUBLISHER = "close-publisher"; |
| private static final String SERVER_NEW_REGISTER = "new-register-request"; |
| private static final String SERVER_METRICS_RECEIVED = "metrics-received"; |
| private static final String SERVER_EXCEPTIONS_RECEIVED = "exceptions-received"; |
| private static final String SERVER_NEW_TMASTER_LOCATION = "new-tmaster-location"; |
| private static final String SERVER_TMASTER_LOCATION_RECEIVED = "tmaster-location-received"; |
| |
| private final List<Communicator<MetricsRecord>> metricsSinkCommunicators; |
| |
| // A map from MetricPublisher's immutable SocketAddress to the MetricPublisher |
| // We would fetch SocketAddress by using SocketChannel.socket().getRemoteSocketAddress, |
| // which will continue to return the connected address after the socket is closed. |
| // So given that the SocketChannel is connected at first, it would not be null in future. |
| private final Map<SocketAddress, Metrics.MetricPublisher> publisherMap; |
| |
| // Internal MultiCountMetric Counters |
| private final MultiCountMetric serverMetricsCounters; |
| |
| /** |
| * Constructor |
| * |
| * @param s the NIOLooper bind with this socket server |
| * @param host the host of endpoint to bind with |
| * @param port the port of endpoint to bind with |
| * @param options the HeronSocketOption for HeronServer |
| * @param serverMetricsCounters The MultiCountMetric to update Metircs for MetricsManagerServer |
| */ |
| public MetricsManagerServer(NIOLooper s, String host, |
| int port, HeronSocketOptions options, |
| MultiCountMetric serverMetricsCounters) { |
| super(s, host, port, options); |
| |
| if (serverMetricsCounters == null) { |
| throw new IllegalArgumentException("Server Metrics Counters is needed."); |
| } |
| this.serverMetricsCounters = serverMetricsCounters; |
| |
| // We use CopyOnWriteArrayList to avoid throwing ConcurrentModifiedException |
| // Since we might mutate the list while iterating it |
| // Consider that the iteration vastly outnumbers mutation, |
| // it would barely hurt any performance |
| this.metricsSinkCommunicators = new CopyOnWriteArrayList<Communicator<MetricsRecord>>(); |
| |
| this.publisherMap = new HashMap<SocketAddress, Metrics.MetricPublisher>(); |
| |
| // Initialize the register |
| registerInitialization(); |
| } |
| |
| private void registerInitialization() { |
| // Register the RegisterRequest from other instances |
| registerOnRequest(Metrics.MetricPublisherRegisterRequest.newBuilder()); |
| |
| // Register the Metrics Message |
| registerOnMessage(Metrics.MetricPublisherPublishMessage.newBuilder()); |
| |
| // Register the TMasterLocationRefreshMessage, which is used by TMasterSink |
| // We do this to avoid communication between TMasterSink and Zookeeper |
| // TODO -- Reading TMasterLocationRefreshMessage from StreamMgr is more a temp solution |
| // TODO -- It adds dependencies on internal broadcast service |
| registerOnMessage(Metrics.TMasterLocationRefreshMessage.newBuilder()); |
| registerOnMessage(Metrics.MetricsCacheLocationRefreshMessage.newBuilder()); |
| } |
| |
| public void addSinkCommunicator(Communicator<MetricsRecord> communicator) { |
| LOG.info("Communicator is added: " + communicator); |
| this.metricsSinkCommunicators.add(communicator); |
| } |
| |
| public boolean removeSinkCommunicator(Communicator<MetricsRecord> communicator) { |
| LOG.info("Communicator is removed: " + communicator); |
| return this.metricsSinkCommunicators.remove(communicator); |
| } |
| |
| @Override |
| public void onConnect(SocketChannel channel) { |
| LOG.info("Metrics Manager got a new connection from host:port " |
| + channel.socket().getRemoteSocketAddress()); |
| // Nothing here. Everything happens in the register |
| } |
| |
| @Override |
| public void onRequest(REQID rid, SocketChannel channel, Message request) { |
| if (request instanceof Metrics.MetricPublisherRegisterRequest) { |
| handleRegisterRequest(rid, channel, (Metrics.MetricPublisherRegisterRequest) request); |
| } else { |
| LOG.severe("Unknown kind of request received from Metrics Manager"); |
| } |
| } |
| |
| @Override |
| public void onMessage(SocketChannel channel, Message message) { |
| // Fetch the request to append necessary info |
| Metrics.MetricPublisher request = publisherMap.get(channel.socket().getRemoteSocketAddress()); |
| if (request == null) { |
| LOG.severe("Publish message from an unknown socket: " + channel.toString()); |
| return; |
| } |
| |
| if (message instanceof Metrics.MetricPublisherPublishMessage) { |
| handlePublisherPublishMessage(request, (Metrics.MetricPublisherPublishMessage) message); |
| } else if (message instanceof Metrics.MetricsCacheLocationRefreshMessage) { |
| // LOG down where the MetricsCache Location comes from |
| LOG.info("MetricsCache Location is refresh from: " |
| + channel.socket().getRemoteSocketAddress()); |
| handleMetricsCacheLocationRefreshMessage( |
| request, (Metrics.MetricsCacheLocationRefreshMessage) message); |
| } else if (message instanceof Metrics.TMasterLocationRefreshMessage) { |
| // LOG down where the TMaster Location comes from |
| LOG.info("TMaster Location is refresh from: " + channel.socket().getRemoteSocketAddress()); |
| handleTMasterLocationRefreshMessage(request, (Metrics.TMasterLocationRefreshMessage) message); |
| } else { |
| LOG.severe("Unknown kind of message received from Metrics Manager"); |
| } |
| } |
| |
| @Override |
| public void onClose(SocketChannel channel) { |
| LOG.log(Level.SEVERE, "Got a connection close from remote socket address: {0}", |
| new Object[] {channel.socket().getRemoteSocketAddress()}); |
| |
| // Unregister the Publisher |
| Metrics.MetricPublisher request = |
| publisherMap.remove(channel.socket().getRemoteSocketAddress()); |
| if (request == null) { |
| LOG.severe("Unknown connection closed"); |
| } else { |
| LOG.log(Level.SEVERE, "Un-register publish from hostname: {0}," |
| + " component_name: {1}, port: {2}, instance_id: {3}, instance_index: {4}", |
| new Object[] {request.getHostname(), request.getComponentName(), request.getPort(), |
| request.getInstanceId(), request.getInstanceIndex()}); |
| } |
| |
| // Update Metrics |
| serverMetricsCounters.scope(SERVER_CLOSE_PUBLISHER).incr(); |
| } |
| |
| // We also allow directly send Metrics Message internally to invoke IMetricsSink |
| // This method is thread-safe, since we would push Messages into a Concurrent Queue. |
| public void onInternalMessage(Metrics.MetricPublisher request, |
| Metrics.MetricPublisherPublishMessage message) { |
| handlePublisherPublishMessage(request, message); |
| } |
| |
| private void handleRegisterRequest( |
| REQID rid, |
| SocketChannel channel, |
| Metrics.MetricPublisherRegisterRequest request) { |
| Metrics.MetricPublisher publisher = request.getPublisher(); |
| LOG.log(Level.SEVERE, "Got a new register publisher from hostname: {0}," |
| + " component_name: {1}, port: {2}, instance_id: {3}, instance_index: {4} from {5}", |
| new Object[] {publisher.getHostname(), publisher.getComponentName(), publisher.getPort(), |
| publisher.getInstanceId(), publisher.getInstanceIndex(), |
| channel.socket().getRemoteSocketAddress()}); |
| |
| // Check whether publisher has already been registered |
| Common.StatusCode responseStatusCode = Common.StatusCode.NOTOK; |
| |
| if (publisherMap.containsKey(channel.socket().getRemoteSocketAddress())) { |
| LOG.log(Level.SEVERE, "Metrics publisher already exists for hostname: {0}," |
| + " component_name: {1}, port: {2}, instance_id: {3}, instance_index: {4}", |
| new Object[] {publisher.getHostname(), publisher.getComponentName(), publisher.getPort(), |
| publisher.getInstanceId(), publisher.getInstanceIndex()}); |
| } else { |
| publisherMap.put(channel.socket().getRemoteSocketAddress(), publisher); |
| // Add it to the map |
| responseStatusCode = Common.StatusCode.OK; |
| } |
| |
| Common.Status responseStatus = Common.Status.newBuilder().setStatus(responseStatusCode).build(); |
| Metrics.MetricPublisherRegisterResponse response = |
| Metrics.MetricPublisherRegisterResponse.newBuilder().setStatus(responseStatus).build(); |
| |
| // Send the response |
| sendResponse(rid, channel, response); |
| |
| // Update the Metrics |
| serverMetricsCounters.scope(SERVER_NEW_REGISTER).incr(); |
| } |
| |
| private void handlePublisherPublishMessage(Metrics.MetricPublisher request, |
| Metrics.MetricPublisherPublishMessage message) { |
| if (message.getMetricsCount() <= 0 && message.getExceptionsCount() <= 0) { |
| LOG.log(Level.SEVERE, |
| "Publish message has no metrics nor exceptions for message from hostname: {0}," |
| + " component_name: {1}, port: {2}, instance_id: {3}, instance_index: {4}", |
| new Object[] {request.getHostname(), request.getComponentName(), request.getPort(), |
| request.getInstanceId(), request.getInstanceIndex()}); |
| return; |
| } |
| |
| // Convert the message to MetricsRecord |
| String source = MetricsUtil.createSource( |
| request.getHostname(), request.getPort(), |
| request.getComponentName(), request.getInstanceId()); |
| |
| List<MetricsInfo> metricsInfos = new ArrayList<MetricsInfo>(message.getMetricsCount()); |
| for (Metrics.MetricDatum metricDatum : message.getMetricsList()) { |
| MetricsInfo info = new MetricsInfo(metricDatum.getName(), metricDatum.getValue()); |
| metricsInfos.add(info); |
| } |
| |
| List<ExceptionInfo> exceptionInfos = new ArrayList<ExceptionInfo>(message.getExceptionsCount()); |
| for (Metrics.ExceptionData exceptionData : message.getExceptionsList()) { |
| ExceptionInfo exceptionInfo = |
| new ExceptionInfo(exceptionData.getStacktrace(), |
| exceptionData.getLasttime(), |
| exceptionData.getFirsttime(), |
| exceptionData.getCount(), |
| exceptionData.getLogging()); |
| exceptionInfos.add(exceptionInfo); |
| } |
| |
| LOG.info(String.format("%d MetricsInfo and %d ExceptionInfo to push", |
| metricsInfos.size(), exceptionInfos.size())); |
| |
| // Update the metrics |
| serverMetricsCounters.scope(SERVER_METRICS_RECEIVED).incrBy(metricsInfos.size()); |
| serverMetricsCounters.scope(SERVER_EXCEPTIONS_RECEIVED).incrBy(exceptionInfos.size()); |
| |
| |
| MetricsRecord record = new MetricsRecord(source, metricsInfos, exceptionInfos); |
| |
| // Push MetricsRecord to Communicator, which would wake up SlaveLooper bind with IMetricsSink |
| for (Communicator<MetricsRecord> c : metricsSinkCommunicators) { |
| c.offer(record); |
| } |
| } |
| |
| // TMasterLocationRefreshMessage handler |
| private void handleTMasterLocationRefreshMessage( |
| Metrics.MetricPublisher request, |
| Metrics.TMasterLocationRefreshMessage tMasterLocationRefreshMessage) { |
| TopologyMaster.TMasterLocation oldLocation = |
| (TopologyMaster.TMasterLocation) |
| SingletonRegistry.INSTANCE.getSingleton(TMASTER_LOCATION_BEAN_NAME); |
| |
| TopologyMaster.TMasterLocation newLocation = tMasterLocationRefreshMessage.getTmaster(); |
| |
| if (oldLocation == null) { |
| // The first time to get TMasterLocation |
| |
| // Register to the SingletonRegistry |
| LOG.info("We received a new TMasterLocation. Register it into SingletonRegistry"); |
| SingletonRegistry.INSTANCE.registerSingleton(TMASTER_LOCATION_BEAN_NAME, newLocation); |
| |
| // Update Metrics |
| serverMetricsCounters.scope(SERVER_NEW_TMASTER_LOCATION).incr(); |
| |
| } else if (oldLocation.equals(newLocation)) { |
| // The new one is the same as old one. |
| |
| // Just Log. Do nothing |
| LOG.info("We received a new TMasterLocation the same as the old one. Do nothing."); |
| } else { |
| // Have received TMasterLocation earlier, but it changed. |
| |
| // We need update the SingletonRegistry |
| LOG.info("We received a new TMasterLocation. Replace the old one."); |
| LOG.info("Old TMasterLocation: " + oldLocation); |
| SingletonRegistry.INSTANCE.updateSingleton(TMASTER_LOCATION_BEAN_NAME, newLocation); |
| |
| // Update Metrics |
| serverMetricsCounters.scope(SERVER_NEW_TMASTER_LOCATION).incr(); |
| } |
| |
| LOG.info("Current TMaster location: " + newLocation); |
| |
| // Update Metrics |
| serverMetricsCounters.scope(SERVER_TMASTER_LOCATION_RECEIVED).incr(); |
| } |
| |
| private void handleMetricsCacheLocationRefreshMessage( |
| Metrics.MetricPublisher request, |
| Metrics.MetricsCacheLocationRefreshMessage tMasterLocationRefreshMessage) { |
| TopologyMaster.MetricsCacheLocation oldLocation = |
| (TopologyMaster.MetricsCacheLocation) |
| SingletonRegistry.INSTANCE.getSingleton(METRICSCACHE_LOCATION_BEAN_NAME); |
| |
| TopologyMaster.MetricsCacheLocation newLocation = |
| tMasterLocationRefreshMessage.getMetricscache(); |
| |
| if (oldLocation == null) { |
| // The first time to get TMasterLocation |
| |
| // Register to the SingletonRegistry |
| LOG.info("We received a new MetricsCacheLocation. Register it into SingletonRegistry"); |
| SingletonRegistry.INSTANCE.registerSingleton(METRICSCACHE_LOCATION_BEAN_NAME, newLocation); |
| |
| // Update Metrics |
| serverMetricsCounters.scope(SERVER_NEW_TMASTER_LOCATION).incr(); |
| |
| } else if (oldLocation.equals(newLocation)) { |
| // The new one is the same as old one. |
| |
| // Just Log. Do nothing |
| LOG.info("We received a new MetricsCacheLocation the same as the old one " |
| + newLocation + " . Do nothing."); |
| } else { |
| // Have received TMasterLocation earlier, but it changed. |
| |
| // We need update the SingletonRegistry |
| LOG.info("We received a new MetricsCacheLocation " + newLocation |
| + ". Replace the old one" + oldLocation + "."); |
| SingletonRegistry.INSTANCE.updateSingleton(METRICSCACHE_LOCATION_BEAN_NAME, newLocation); |
| |
| // Update Metrics |
| serverMetricsCounters.scope(SERVER_NEW_TMASTER_LOCATION).incr(); |
| } |
| |
| // Update Metrics |
| serverMetricsCounters.scope(SERVER_TMASTER_LOCATION_RECEIVED).incr(); |
| } |
| } |