| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| */ |
| package org.apache.edgent.test.metrics; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.edgent.execution.DirectSubmitter; |
| import org.apache.edgent.execution.Job; |
| import org.apache.edgent.graph.Edge; |
| import org.apache.edgent.graph.Graph; |
| import org.apache.edgent.graph.Vertex; |
| import org.apache.edgent.metrics.Metrics; |
| import org.apache.edgent.metrics.MetricsSetup; |
| import org.apache.edgent.oplet.Oplet; |
| import org.apache.edgent.oplet.OpletContext; |
| import org.apache.edgent.oplet.core.Peek; |
| import org.apache.edgent.topology.TopologyAbstractTest; |
| import org.apache.edgent.topology.TStream; |
| import org.apache.edgent.topology.Topology; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| |
| import com.codahale.metrics.Gauge; |
| import com.codahale.metrics.Meter; |
| import com.codahale.metrics.Metric; |
| import com.codahale.metrics.MetricRegistry; |
| |
| @Ignore("abstract, provides common tests for concrete implementations") |
| public abstract class MetricsEverywhereTest extends TopologyAbstractTest { |
| |
| protected MetricRegistry metricRegistry; |
| |
| // Register Metrics service before each test. |
| @Before |
| public void createMetricRegistry() { |
| metricRegistry = new MetricRegistry(); |
| MetricsSetup.withRegistry(((DirectSubmitter<?,?>)getSubmitter()).getServices(), metricRegistry); |
| } |
| |
| /* |
| * Test that Metrics are automatically unregistered after the job is closed |
| */ |
| @Test |
| public void automaticMetricCleanup1() throws Exception { |
| // Declare org.apache.edgent.org.apache.edgent.topology with custom metric oplet |
| Topology t = newTopology(); |
| AtomicInteger n = new AtomicInteger(0); |
| TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS); |
| ints.pipe(new TestOplet<Integer>()); |
| |
| // Submit job |
| Future<? extends Job> fj = getSubmitter().submit(t); |
| Job job = fj.get(); |
| Thread.sleep(TimeUnit.MILLISECONDS.toMillis(50)); |
| |
| // At least one tuple was processed |
| int tupleCount = n.get(); |
| assertTrue("Expected more tuples than "+ tupleCount, tupleCount > 0); |
| |
| // Each test oplet registers two metrics |
| Map<String, Metric> all = metricRegistry.getMetrics(); |
| assertEquals(2, all.size()); |
| |
| // After close all metrics have been unregistered |
| job.stateChange(Job.Action.CLOSE); |
| assertEquals(0, all.size()); |
| } |
| |
| /* |
| * Test that Metrics are automatically unregistered after the job is closed |
| * in a org.apache.edgent.org.apache.edgent.topology with two oplets registering metrics. |
| */ |
| @Test |
| public void automaticMetricCleanup2() throws Exception { |
| // Declare org.apache.edgent.org.apache.edgent.topology with custom metric oplet |
| Topology t = newTopology(); |
| AtomicInteger n = new AtomicInteger(0); |
| TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS); |
| TStream<Integer> ints2 = ints.pipe(new TestOplet<Integer>()); |
| ints2.pipe(new TestOplet<Integer>()); |
| |
| // Submit job |
| Future<? extends Job> fj = getSubmitter().submit(t); |
| Job job = fj.get(); |
| Thread.sleep(TimeUnit.MILLISECONDS.toMillis(50)); |
| |
| // Each test oplet registers two metrics |
| Map<String, Metric> all = metricRegistry.getMetrics(); |
| assertEquals(4, all.size()); |
| |
| // After close all metrics have been unregistered |
| job.stateChange(Job.Action.CLOSE); |
| assertEquals(0, all.size()); |
| } |
| |
| // Apply Metrics on all streams, simple org.apache.edgent.graph |
| @Test |
| public void metricsEverywhereSimple() throws Exception { |
| |
| Topology t = newTopology(); |
| Graph g = t.graph(); |
| |
| // Source |
| TStream<Integer> d = integers(t, 1, 2, 3); |
| d.sink(tuple -> System.out.print(".")); |
| |
| // Insert counter metrics into all the org.apache.edgent.org.apache.edgent.topology streams |
| Metrics.counter(t); |
| |
| printGraph(g); |
| |
| Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices(); |
| assertEquals(3, vertices.size()); |
| |
| Collection<Edge> edges = g.getEdges(); |
| assertEquals(2, edges.size()); |
| } |
| |
| // Apply Metrics on all streams, org.apache.edgent.graph with split oplet and Metric oplet |
| // Counter oplets are inserted everywhere except after OP_2 (Counter). |
| @Test |
| public void metricsEverywhereSplit() throws Exception { |
| |
| /* -- OP_2 (Counter) --- OP_3 (Sink) |
| * / |
| * OP_0 -- OP_1(Split) ----- OP_4 (Sink) |
| * \ |
| * -- OP_5 (Sink) |
| */ |
| Topology t = newTopology(); |
| Graph g = t.graph(); |
| |
| // Source |
| TStream<Integer> d = integers(t, 1, 2, 3); |
| |
| // Split |
| List<TStream<Integer>> splits = d.split(3, tuple -> { |
| switch (tuple.intValue()) { |
| case 0: |
| return 0; |
| case 1: |
| return 1; |
| default: |
| return 2; |
| } |
| }); |
| |
| // Insert counter metric for the zeroes stream |
| Metrics.counter(splits.get(0)).sink(tuple -> System.out.print(".")); |
| |
| splits.get(1).sink(tuple -> System.out.print("#")); |
| splits.get(2).sink(tuple -> System.out.print("@")); |
| |
| // Insert counter metrics into all the org.apache.edgent.org.apache.edgent.topology streams |
| Metrics.counter(t); |
| |
| printGraph(g); |
| |
| Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices(); |
| assertEquals(10, vertices.size()); |
| |
| Collection<Edge> edges = g.getEdges(); |
| assertEquals(9, edges.size()); |
| } |
| |
| @Test |
| public void metricsEverywhereFanOut() { |
| |
| Topology t = newTopology(); |
| Graph g = t.graph(); |
| |
| /* -- OP_3 (Sink) |
| * / |
| * OP_0 -- FanOut ----- OP_4 (Sink) |
| */ |
| TStream<Integer> d = integers(t, 1, 2, 3); |
| d.sink(tuple -> System.out.print(".")); |
| d.sink(tuple -> System.out.print("@")); |
| |
| // Insert counter metrics into all the org.apache.edgent.org.apache.edgent.topology streams |
| Metrics.counter(t); |
| |
| printGraph(g); |
| |
| Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices(); |
| assertEquals(5, vertices.size()); |
| |
| Collection<Edge> edges = g.getEdges(); |
| assertEquals(4, edges.size()); |
| } |
| |
| /** |
| * Test Peek. This will only work with an embedded setup. |
| * |
| * @throws Exception on failure |
| */ |
| @Test |
| public void metricsEverywherePeek() throws Exception { |
| |
| Topology t = newTopology(); |
| Graph g = t.graph(); |
| |
| TStream<String> s = t.strings("a", "b", "c"); |
| List<String> peekedValues = new ArrayList<>(); |
| TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple)); |
| speek.sink(tuple -> System.out.print(".")); |
| |
| Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices(); |
| assertEquals(3, vertices.size()); |
| |
| Collection<Edge> edges = g.getEdges(); |
| assertEquals(2, edges.size()); |
| |
| Metrics.counter(t); |
| |
| printGraph(g); |
| |
| // One single counter inserted after the peek |
| vertices = g.getVertices(); |
| assertEquals(4, vertices.size()); |
| |
| edges = g.getEdges(); |
| assertEquals(3, edges.size()); |
| } |
| |
| @Test |
| public void metricsEverywhereMultiplePeek() throws Exception { |
| |
| Topology t = newTopology(); |
| Graph g = t.graph(); |
| |
| TStream<String> s = t.strings("a", "b", "c"); |
| List<String> peekedValues = new ArrayList<>(); |
| TStream<String> speek = s.peek(tuple -> peekedValues.add(tuple + "1st")); |
| TStream<String> speek2 = speek.peek(tuple -> peekedValues.add(tuple + "2nd")); |
| TStream<String> speek3 = speek2.peek(tuple -> peekedValues.add(tuple + "3rd")); |
| speek3.sink(tuple -> System.out.print(".")); |
| |
| Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices(); |
| assertEquals(5, vertices.size()); |
| |
| Collection<Edge> edges = g.getEdges(); |
| assertEquals(4, edges.size()); |
| |
| Metrics.counter(t); |
| |
| printGraph(g); |
| |
| // One single counter inserted after the 3rd peek |
| vertices = g.getVertices(); |
| assertEquals(6, vertices.size()); |
| |
| edges = g.getEdges(); |
| assertEquals(5, edges.size()); |
| } |
| |
| @Test |
| public void metricsEverywherePeekThenFanout() throws Exception { |
| _testFanoutWithPeek(false); |
| } |
| |
| @Test |
| public void metricsEverywhereFanoutThenPeek() throws Exception { |
| _testFanoutWithPeek(true); |
| } |
| |
| private void _testFanoutWithPeek(boolean after) throws Exception { |
| Topology t = newTopology(); |
| Graph g = t.graph(); |
| |
| /* -- Filter -- Sink(.) |
| * / |
| * Source -- Peek -- FanOut ---- Modify -- Sink(@) |
| * |
| */ |
| TStream<Integer> d = integers(t, 1, 2, 3); |
| List<Integer> peekedValues = new ArrayList<>(); |
| |
| if (!after) |
| d.peek(tuple -> peekedValues.add(tuple)); |
| |
| TStream<Integer> df = d.filter(tuple -> tuple.intValue() > 0); |
| TStream<Integer> dm = d.modify(tuple -> new Integer(tuple.intValue() + 1)); |
| |
| if (after) |
| d.peek(tuple -> peekedValues.add(tuple)); |
| |
| df.sink(tuple -> System.out.print(".")); |
| dm.sink(tuple -> System.out.print("@")); |
| |
| assertEquals(7, g.getVertices().size()); |
| assertEquals(6, g.getEdges().size()); |
| |
| // Insert counter metrics into all the streams |
| Metrics.counter(t); |
| |
| printGraph(g); |
| |
| assertEquals(10, g.getVertices().size()); |
| assertEquals(9, g.getEdges().size()); |
| } |
| |
| private <T> TStream<T> integers(Topology t, @SuppressWarnings("unchecked") T... values) { |
| return t.source(() -> Arrays.asList(values)); |
| } |
| |
| private void printGraph(Graph g) { |
| // Gson gson = new GsonBuilder().setPrettyPrinting().create(); |
| // String json = gson.toJson(new GraphType(g)); |
| // System.out.println(json); |
| } |
| |
| @SuppressWarnings("serial") |
| private static class TestOplet<T> extends Peek<T> { |
| private Meter meter; |
| private Gauge<Long> gauge; |
| |
| @Override |
| public void close() throws Exception { |
| } |
| |
| @Override |
| protected void peek(T tuple) { |
| meter.mark(); |
| } |
| |
| @Override |
| public final void initialize(OpletContext<T, T> context) { |
| super.initialize(context); |
| |
| this.meter = new Meter(); |
| this.gauge = new Gauge<Long>() { |
| @Override |
| public Long getValue() { |
| return System.currentTimeMillis(); |
| } |
| }; |
| |
| MetricRegistry registry = context.getService(MetricRegistry.class); |
| if (registry != null) { |
| registry.register(context.uniquify("testMeter"), meter); |
| registry.register(context.uniquify("testGauge"), gauge); |
| } |
| } |
| } |
| } |