| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.management.jmx; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.management.remote.JMXConnectorServer; |
| import javax.management.remote.JMXServiceURL; |
| import javax.management.remote.rmi.RMIConnectorServer; |
| import javax.management.remote.rmi.RMIJRMPServerImpl; |
| |
| import java.io.IOException; |
| import java.lang.management.ManagementFactory; |
| import java.net.MalformedURLException; |
| import java.rmi.NoSuchObjectException; |
| import java.rmi.NotBoundException; |
| import java.rmi.Remote; |
| import java.rmi.RemoteException; |
| import java.rmi.registry.Registry; |
| import java.rmi.server.UnicastRemoteObject; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| /** |
| * JMX Server implementation that JMX clients can connect to. |
| * |
| * <p>Heavily based on j256 simplejmx project |
| * |
| * <p>https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java |
| */ |
| class JMXServer { |
| private static final Logger LOG = LoggerFactory.getLogger(JMXServer.class); |
| |
| private final AtomicReference<Remote> rmiServerReference = new AtomicReference<>(); |
| |
| private Registry rmiRegistry; |
| private JMXConnectorServer connector; |
| private int port; |
| |
| JMXServer() {} |
| |
| void start(int port) throws IOException { |
| if (rmiRegistry != null && connector != null) { |
| LOG.debug("JMXServer is already running."); |
| return; |
| } |
| internalStart(port); |
| this.port = port; |
| } |
| |
| void stop() throws IOException { |
| rmiServerReference.set(null); |
| if (connector != null) { |
| try { |
| connector.stop(); |
| } finally { |
| connector = null; |
| } |
| } |
| if (rmiRegistry != null) { |
| try { |
| UnicastRemoteObject.unexportObject(rmiRegistry, true); |
| } catch (NoSuchObjectException e) { |
| throw new IOException("Could not un-export our RMI registry", e); |
| } finally { |
| rmiRegistry = null; |
| } |
| } |
| } |
| |
| int getPort() { |
| return port; |
| } |
| |
| private void internalStart(int port) throws IOException { |
| rmiServerReference.set(null); |
| |
| // this allows clients to lookup the JMX service |
| rmiRegistry = new JmxRegistry(port, "jmxrmi", rmiServerReference); |
| |
| String serviceUrl = |
| "service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi"; |
| JMXServiceURL url; |
| try { |
| url = new JMXServiceURL(serviceUrl); |
| } catch (MalformedURLException e) { |
| throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e); |
| } |
| |
| final RMIJRMPServerImpl rmiServer = new RMIJRMPServerImpl(port, null, null, null); |
| |
| connector = |
| new RMIConnectorServer( |
| url, null, rmiServer, ManagementFactory.getPlatformMBeanServer()); |
| connector.start(); |
| |
| // we can't pass the created stub directly to the registry since this would form a cyclic |
| // dependency: |
| // - you can only start the connector after the registry was started |
| // - you can only create the stub after the connector was started |
| // - you can only start the registry after the stub was created |
| rmiServerReference.set(rmiServer.toStub()); |
| } |
| |
| /** A registry that only exposes a single remote object. */ |
| @SuppressWarnings("restriction") |
| private static class JmxRegistry extends sun.rmi.registry.RegistryImpl { |
| private final String lookupName; |
| private final AtomicReference<Remote> remoteServerStub; |
| |
| JmxRegistry( |
| final int port, |
| final String lookupName, |
| final AtomicReference<Remote> remoteServerStub) |
| throws RemoteException { |
| super(port); |
| this.lookupName = lookupName; |
| this.remoteServerStub = remoteServerStub; |
| } |
| |
| @Override |
| public Remote lookup(String s) throws NotBoundException { |
| if (lookupName.equals(s)) { |
| final Remote remote = remoteServerStub.get(); |
| if (remote != null) { |
| return remote; |
| } |
| } |
| throw new NotBoundException("Not bound."); |
| } |
| |
| @Override |
| public void bind(String s, Remote remote) { |
| // this is called from RMIConnectorServer#start; don't throw a general AccessException |
| } |
| |
| @Override |
| public void unbind(String s) { |
| // this is called from RMIConnectorServer#stop; don't throw a general AccessException |
| } |
| |
| @Override |
| public void rebind(String s, Remote remote) { |
| // might as well not throw an exception here given that the others don't |
| } |
| |
| @Override |
| public String[] list() { |
| return new String[] {lookupName}; |
| } |
| } |
| } |