blob: e84f17b7b324d0ca210c25e17ee60f86f0299af3 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.providers.development;
import java.util.Hashtable;
import java.util.concurrent.Future;
import org.apache.edgent.console.server.HttpServer;
import org.apache.edgent.execution.Job;
import org.apache.edgent.execution.services.ControlService;
import org.apache.edgent.metrics.Metrics;
import org.apache.edgent.metrics.MetricsSetup;
import org.apache.edgent.metrics.oplets.CounterOp;
import org.apache.edgent.providers.direct.DirectProvider;
import org.apache.edgent.runtime.jmxcontrol.JMXControlService;
import org.apache.edgent.streamscope.StreamScopeRegistry;
import org.apache.edgent.streamscope.StreamScopeSetup;
import org.apache.edgent.streamscope.mbeans.StreamScopeRegistryMXBean;
import org.apache.edgent.topology.Topology;
import com.codahale.metrics.MetricRegistry;
import com.google.gson.JsonObject;
/**
* Provider intended for development.
* This provider executes topologies using {@code DirectProvider}
* and extends it by:
* <UL>
* <LI>
* starting an embedded web-server providing the Edgent development console
* that shows live graphs for running applications.
* The console {@link HttpServer} is registered as a service with
* the provider and is accessible via {@link #getServices()}.
* The console's HTTP server starts with a random available port unless
* a port is specified via the {@code edgent.console.port} system property.
* </LI>
* <LI>
* Creating a metrics registry with metrics registered
* in the platform MBean server.
* </LI>
* <LI>
* Add a {@link ControlService} that registers control management
* beans in the platform MBean server.
* </LI>
* <LI>
* Add tuple count metrics on all the streams before submitting a topology.
* The implementation calls {@link Metrics#counter(Topology)} to insert
* {@link CounterOp} oplets into each stream.
* </LI>
* <LI>
* Instrument the topology adding {@link org.apache.edgent.streamscope.oplets.StreamScope StreamScope}
* oplets on all the streams before submitting a topology.
* See {@link StreamScopeSetup#addStreamScopes(Topology) StreamScopeSetup.addStreamscopes}.
* </LI>
* <LI>
* Add a {@link StreamScopeRegistry} runtime service and a
* {@link StreamScopeRegistryMXBean} management bean to the {@code ControlService}.
* See {@link StreamScopeSetup#register(org.apache.edgent.execution.services.ServiceContainer) StreamScopeSetup.register}.
* </LI>
* </UL>
* @see StreamScopeRegistry
*/
public class DevelopmentProvider extends DirectProvider {
/**
* JMX domains that this provider uses to register MBeans.
* Set to {@value}.
*/
public static final String JMX_DOMAIN = "org.apache.edgent.providers.development";
public DevelopmentProvider() throws Exception {
MetricsSetup.withRegistry(getServices(), new MetricRegistry()).
startJMXReporter(JMX_DOMAIN);
getServices().addService(ControlService.class,
new JMXControlService(JMX_DOMAIN, new Hashtable<>()));
StreamScopeSetup.register(getServices());
HttpServer server = HttpServer.getInstance();
getServices().addService(HttpServer.class, server);
server.startServer();
}
@Override
public Future<Job> submit(Topology topology, JsonObject config) {
Metrics.counter(topology);
StreamScopeSetup.addStreamScopes(topology);
return super.submit(topology, config);
}
}