blob: 32c2057ed8463ba182661104eaa9189b11bb577a [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.streamscope;
import org.apache.edgent.execution.services.ControlService;
import org.apache.edgent.execution.services.ServiceContainer;
import org.apache.edgent.function.Functions;
import org.apache.edgent.streamscope.mbeans.StreamScopeRegistryMXBean;
import org.apache.edgent.topology.Topology;
import org.apache.edgent.topology.TopologyProvider;
/**
* Utility helpers for StreamScope setup and management.
* <p>
* This class is not thread safe.
*/
public class StreamScopeSetup {
private final StreamScopeRegistry rgy;
private StreamScopeRegistryBean rgyBean;
@SuppressWarnings("unused")
private String rgyBeanControlId;
// Hackery is needed to work around all of the issues cropping
// up with per-provider StreamScopeRegistry and complications
// caused by JMXControlService and SteamsScopeRegistryMXBean registrations.
//
// There are probably a couple of possible long term solutions.
// So for now minimize changing anything already created - i.e.,
// keep lazy StreamScopeBean creation via use of StreamScopeRegistryBean.
//
// Avoid rgy / rgyBean mismatches or rgyBean "already registered"
// conditions by mirroring the static-ness of using the platform JMX instance
// with a singleton StreamScopeSetup instance... which will result in
// singleton rgy and rgyBean instances.
//
// More background on issues:
// Nothing is unregistering the registry control bean from the JMX...
// and attempting to register by the next new provider instance throws.
// Note, there's no cs.unregister(type,alias) (Really? ugh) so we'd
// have to remember the rgyBean control controlId for use with cs.unregister(controlId).
//
// There's a general issue of missing hooks to make unregistering
// controls for jobs or non-job/oplet related entities possible.
// ServiceContainer.addCleaner() provides a hook for job oplet shutdown.
// But there are no hooks available to be able to unregister JobMXBean,
// AppService bean, or StreamScopeRegistry etc beans.
//
// While there's a storage leak for Job beans regardless of
// which ControlService is used, since job beans
// register with a jobId that's unique across successive provider instances
// in a JVM (the Etiao id generator is static), an "already registered"
// condition is avoided. StreamScopeRegistry bean uses the same control
// alias so it encounters this problem.
// AppService bean uses the same alias ("edgent") but it's only being
// used with the JsonControlService so it doesn't encounter "already
// registered" on successive provider instances in a JVM (each of which
// allocates a new control service instance).
// the singleton StreamScopeSetup workaround.
private static StreamScopeSetup setup = new StreamScopeSetup(new StreamScopeRegistry());
/**
* Perform the registrations needed to use the streamscope package.
* <P>
* Typically called during {@link TopologyProvider} construction.
* </P>
* <UL>
* <LI>register the StreamScopeRegistry service</LI>
* <LI>register a cleaner to remove job oplet StreamScope registrations</LI>
* <LI>register a StreamScopeRegistryMXBean with the registered ControlService</LI>
* </UL>
* @param services ServiceContainer to register with.
*/
public static void register(ServiceContainer services) {
setup.registerPvt(services);
}
/**
* Create a new {@link StreamScopeSetup} for setup.
*
* @param registry the registry to use
*/
private StreamScopeSetup(StreamScopeRegistry registry) {
this.rgy = registry;
}
/**
* Perform the necessary registrations.
* <P>
* <UL>
* <LI>register the StreamScopeRegistry service</LI>
* <LI>register a cleaner to remove job oplet StreamScope registrations</LI>
* <LI>register a StreamScopeRegistryMXBean with the registered ControlService</LI>
* </UL>
* </P>
* @param services ServiceContainer to register with.
*/
private void registerPvt(ServiceContainer services) {
services.addService(StreamScopeRegistry.class, rgy);
services.addCleaner(
(jobId, opletId) -> {
rgy.unregister(jobId, opletId);
if (rgyBean != null)
rgyBean.unregister(jobId, opletId);
});
registerRegistryBean(services);
}
/**
* Register a {@link StreamScopeRegistryMXBean} with the registered
* {@link ControlService} (for use by the Edgent Console).
*/
private void registerRegistryBean(ServiceContainer services) {
ControlService cs = services.getService(ControlService.class);
if (cs == null || rgy == null)
throw new IllegalStateException();
{
// more workaround...
//
// If a rgyBean control is already registered, then don't reregister
// (this will/should be the JMXControlService case) so as to avoid
// an "already registered" exception from the ControlService.
// The rgyBean is gotta be for the matching rgy with this
// singleton StreamScopeSetup instance scheme so everything is OK.
StreamScopeRegistryMXBean mbean = cs.getControl(StreamScopeRegistryMXBean.TYPE, StreamScopeRegistryMXBean.TYPE, StreamScopeRegistryMXBean.class);
if (mbean != null) {
return;
}
}
if (rgyBean == null)
rgyBean = new StreamScopeRegistryBean(rgy, cs);
rgyBeanControlId = cs.registerControl(StreamScopeRegistryMXBean.TYPE,
StreamScopeRegistryMXBean.TYPE+"_0", StreamScopeRegistryMXBean.TYPE,
StreamScopeRegistryMXBean.class, rgyBean);
}
// until there's a hook available to be able to unregister things
//
// /**
// * Unregister the StreamScopeRegistry service and the StreamScopeRegistryMXBean
// * control.
// * @param services
// */
// public void unregister(ServiceContainer services) {
// services.removeService(StreamScopeRegistry.class);
// unregisterRegistryBean(services);
// }
//
// private void unregisterRegistryBean(ServiceContainer services) {
// if (rgyBean != null) {
// ControlService cs = services.getService(ControlService.class);
// if (cs != null && rgyBeanControlId != null) {
// cs.unregister(rgyBeanControlId);
// }
// }
// }
/**
* Add StreamScope instances to the topology
* <P>
* Instrument the topology by adding StreamScope peekers to the
* topology's streams. At topology submission time, StreamScopes
* register themselves with the topology's
* {@link StreamScopeRegistry} runtime service.
* </P>
*
* @param t the Topology. The operation is a no-op if the topology
* does not have a StreamScopeRegistry service.
* @see #register(ServiceContainer) register
* @see org.apache.edgent.graph.Graph#peekAll(org.apache.edgent.function.Supplier, org.apache.edgent.function.Predicate) Graph.peekAll()
*/
public static void addStreamScopes(Topology t) {
StreamScopeRegistry rgy = (StreamScopeRegistry)
t.getRuntimeServiceSupplier().get()
.getService(StreamScopeRegistry.class);
if (rgy == null)
return;
// N.B. at runtime, the Console will need to be able to lookup
// StreamScopeMXBean be streamId (see StreamScopeRegistryMXBean).
// Nominally, that streamId should be the jobId/opletId/oport of
// the stream that the StreamScope was created for - what I'll
// call the "origin stream". i.e., the Console shouldn't be
// looking up a streamId for the StreamScopeOplet's opletId.
//
// Registration is left to the StreamScope oplet initialization processing
// since the jobId isn't known until that time.
//
// As noted above we really want the StreamScope's streamId registration
// to be for the "origin stream". Today the API (Graph,Vertex,Oplet)
// doesn't provide that information so we can't capture that as part
// of the StreamScope oplet's info. The net is that for the time being
// a StreamScope will end up having to register itself with its opletId,
// not the origin oplet's id and that has implications for the Console.
//
// We could create a peekAllFn that takes a BiFunction that receives
// the Vertex+oport the StreamScope is being created for but that
// Vertex/Oplet's opletId still isn't accessible today.
// (The Etaio provider maintains the opletId in its Instance object fwiw).
//
// TODO straighten this all out
t.graph().peekAll(
() -> { return new org.apache.edgent.streamscope.oplets.StreamScope<>(new StreamScope<>()); },
Functions.alwaysTrue());
}
}