blob: 7e2e76d14c204d5a45a6fa26b235dace539c7d68 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.management.jmx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXServiceURL;
import javax.management.remote.rmi.RMIConnectorServer;
import javax.management.remote.rmi.RMIJRMPServerImpl;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.rmi.NoSuchObjectException;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.concurrent.atomic.AtomicReference;
/**
* JMX Server implementation that JMX clients can connect to.
*
* <p>Heavily based on j256 simplejmx project
*
* <p>https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
*/
class JMXServer {
private static final Logger LOG = LoggerFactory.getLogger(JMXServer.class);
private final AtomicReference<Remote> rmiServerReference = new AtomicReference<>();
private Registry rmiRegistry;
private JMXConnectorServer connector;
private int port;
JMXServer() {}
void start(int port) throws IOException {
if (rmiRegistry != null && connector != null) {
LOG.debug("JMXServer is already running.");
return;
}
internalStart(port);
this.port = port;
}
void stop() throws IOException {
rmiServerReference.set(null);
if (connector != null) {
try {
connector.stop();
} finally {
connector = null;
}
}
if (rmiRegistry != null) {
try {
UnicastRemoteObject.unexportObject(rmiRegistry, true);
} catch (NoSuchObjectException e) {
throw new IOException("Could not un-export our RMI registry", e);
} finally {
rmiRegistry = null;
}
}
}
int getPort() {
return port;
}
private void internalStart(int port) throws IOException {
rmiServerReference.set(null);
// this allows clients to lookup the JMX service
rmiRegistry = new JmxRegistry(port, "jmxrmi", rmiServerReference);
String serviceUrl =
"service:jmx:rmi://localhost:" + port + "/jndi/rmi://localhost:" + port + "/jmxrmi";
JMXServiceURL url;
try {
url = new JMXServiceURL(serviceUrl);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Malformed service url created " + serviceUrl, e);
}
final RMIJRMPServerImpl rmiServer = new RMIJRMPServerImpl(port, null, null, null);
connector =
new RMIConnectorServer(
url, null, rmiServer, ManagementFactory.getPlatformMBeanServer());
connector.start();
// we can't pass the created stub directly to the registry since this would form a cyclic
// dependency:
// - you can only start the connector after the registry was started
// - you can only create the stub after the connector was started
// - you can only start the registry after the stub was created
rmiServerReference.set(rmiServer.toStub());
}
/** A registry that only exposes a single remote object. */
@SuppressWarnings("restriction")
private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
private final String lookupName;
private final AtomicReference<Remote> remoteServerStub;
JmxRegistry(
final int port,
final String lookupName,
final AtomicReference<Remote> remoteServerStub)
throws RemoteException {
super(port);
this.lookupName = lookupName;
this.remoteServerStub = remoteServerStub;
}
@Override
public Remote lookup(String s) throws NotBoundException {
if (lookupName.equals(s)) {
final Remote remote = remoteServerStub.get();
if (remote != null) {
return remote;
}
}
throw new NotBoundException("Not bound.");
}
@Override
public void bind(String s, Remote remote) {
// this is called from RMIConnectorServer#start; don't throw a general AccessException
}
@Override
public void unbind(String s) {
// this is called from RMIConnectorServer#stop; don't throw a general AccessException
}
@Override
public void rebind(String s, Remote remote) {
// might as well not throw an exception here given that the others don't
}
@Override
public String[] list() {
return new String[] {lookupName};
}
}
}