| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.tinkerpop.gremlin.driver.util; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.concurrent.BasicThreadFactory; |
| import org.apache.tinkerpop.gremlin.driver.Channelizer; |
| import org.apache.tinkerpop.gremlin.driver.Client; |
| import org.apache.tinkerpop.gremlin.driver.Cluster; |
| import org.apache.tinkerpop.gremlin.driver.ser.Serializers; |
| import org.apache.tinkerpop.gremlin.structure.util.ElementHelper; |
| |
| import java.io.BufferedWriter; |
| import java.io.File; |
| import java.io.FileWriter; |
| import java.io.PrintWriter; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.stream.IntStream; |
| |
| /** |
| * An internal application used to test out configuration parameters for Gremlin Driver. |
| * |
| * @author Stephen Mallette (http://stephen.genoprime.com) |
| */ |
| public class ProfilingApplication { |
| |
| private static final Random random = new Random(); |
| private static final String[] scripts = new String[]{ |
| "g.V()", |
| "g.V(1).out('knows')", |
| "g.V(1).out('knows').has('name','josh')", |
| "g.V(1).as(\"a\").out(\"knows\").as(\"b\").select(\"a\", \"b\")", |
| "g.V(1).as(\"a\").out(\"knows\").as(\"b\").select(\"a\", \"b\").by(\"name\")", |
| "g.V().hasLabel(\"person\").as(\"p\").map(__.bothE().label().groupCount()).as(\"r\").select(\"p\", \"r\")", |
| "g.V().choose(__.outE().count().is(0L), __.as(\"a\"), __.as(\"b\")).choose(__.select(\"a\"), __.select(\"a\"), __.select(\"b\"))", |
| "g.V().group(\"a\").by(T.label).by(outE().values(\"weight\").sum()).cap(\"a\")", |
| "g.V().repeat(__.union(__.out(\"knows\").group(\"a\").by(\"age\"), __.out(\"created\").group(\"b\").by(\"name\").by(count())).group(\"a\").by(\"name\")).times(2).cap(\"a\", \"b\")", |
| "g.V().match(\n" + |
| " as(\"a\").out(\"knows\").as(\"b\"),\n" + |
| " as(\"b\").out(\"created\").has(\"name\", \"lop\"),\n" + |
| " as(\"b\").match(\n" + |
| " as(\"b\").out(\"created\").as(\"d\"),\n" + |
| " as(\"d\").in(\"created\").as(\"c\")).select(\"c\").as(\"c\")).<Vertex>select(\"a\", \"b\", \"c\")" |
| }; |
| |
| private final Cluster cluster; |
| private final int requests; |
| private final String executionName; |
| private final String script; |
| private final int tooSlowThreshold; |
| private final boolean exercise; |
| |
| private final ExecutorService executor; |
| |
| public ProfilingApplication(final String executionName, final Cluster cluster, final int requests, final ExecutorService executor, |
| final String script, final int tooSlowThreshold, final boolean exercise) { |
| this.executionName = executionName; |
| this.cluster = cluster; |
| this.requests = requests; |
| this.executor = executor; |
| this.script = script; |
| this.tooSlowThreshold = tooSlowThreshold; |
| this.exercise = exercise; |
| } |
| |
| public long execute() throws Exception { |
| final AtomicInteger tooSlow = new AtomicInteger(0); |
| |
| final Client client = cluster.connect(); |
| final String executionId = "[" + executionName + "]"; |
| try { |
| final CountDownLatch latch = new CountDownLatch(requests); |
| client.init(); |
| |
| final long start = System.nanoTime(); |
| IntStream.range(0, requests).forEach(i -> { |
| final String s = exercise ? chooseScript() : script; |
| client.submitAsync(s).thenAcceptAsync(r -> { |
| try { |
| r.all().get(tooSlowThreshold, TimeUnit.MILLISECONDS); |
| } catch (TimeoutException ex) { |
| tooSlow.incrementAndGet(); |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| } finally { |
| latch.countDown(); |
| } |
| }, executor); |
| }); |
| |
| // finish once all requests are accounted for |
| latch.await(); |
| |
| final long end = System.nanoTime(); |
| final long total = end - start; |
| final double totalSeconds = total / 1000000000d; |
| final long requestCount = requests; |
| final long reqSec = Math.round(requestCount / totalSeconds); |
| System.out.println(String.format(StringUtils.rightPad(executionId, 10) + " requests: %s | time(s): %s | req/sec: %s | too slow: %s", requestCount, StringUtils.rightPad(String.valueOf(totalSeconds), 14), StringUtils.rightPad(String.valueOf(reqSec), 7), exercise ? "N/A" : tooSlow.get())); |
| return reqSec; |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| throw new RuntimeException(ex); |
| } finally { |
| client.close(); |
| } |
| } |
| |
| private String chooseScript() { |
| return scripts[random.nextInt(scripts.length - 1)]; |
| } |
| |
| public static void main(final String[] args) { |
| final Map<String,Object> options = ElementHelper.asMap(args); |
| final boolean noExit = Boolean.parseBoolean(options.getOrDefault("noExit", "false").toString()); |
| final int parallelism = Integer.parseInt(options.getOrDefault("parallelism", "16").toString()); |
| final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("profiler-%d").build(); |
| final ExecutorService executor = Executors.newFixedThreadPool(parallelism, threadFactory); |
| |
| final String host = options.getOrDefault("host", "localhost").toString(); |
| final int minExpectedRps = Integer.parseInt(options.getOrDefault("minExpectedRps", "1000").toString()); |
| final int timeout = Integer.parseInt(options.getOrDefault("timeout", "1200000").toString()); |
| final int warmups = Integer.parseInt(options.getOrDefault("warmups", "5").toString()); |
| final int executions = Integer.parseInt(options.getOrDefault("executions", "10").toString()); |
| final int nioPoolSize = Integer.parseInt(options.getOrDefault("nioPoolSize", "1").toString()); |
| final int requests = Integer.parseInt(options.getOrDefault("requests", "10000").toString()); |
| final int minConnectionPoolSize = Integer.parseInt(options.getOrDefault("minConnectionPoolSize", "256").toString()); |
| final int maxConnectionPoolSize = Integer.parseInt(options.getOrDefault("maxConnectionPoolSize", "256").toString()); |
| final int minSimultaneousUsagePerConnection = Integer.parseInt(options.getOrDefault("minSimultaneousUsagePerConnection", "8").toString()); |
| final int maxSimultaneousUsagePerConnection = Integer.parseInt(options.getOrDefault("maxSimultaneousUsagePerConnection", "32").toString()); |
| final int maxInProcessPerConnection = Integer.parseInt(options.getOrDefault("maxInProcessPerConnection", "64").toString()); |
| final int minInProcessPerConnection = Integer.parseInt(options.getOrDefault("minInProcessPerConnection", "16").toString()); |
| final int maxWaitForConnection = Integer.parseInt(options.getOrDefault("maxWaitForConnection", "3000").toString()); |
| final int workerPoolSize = Integer.parseInt(options.getOrDefault("workerPoolSize", "2").toString()); |
| final int tooSlowThreshold = Integer.parseInt(options.getOrDefault("tooSlowThreshold", "125").toString()); |
| final String channelizer = options.getOrDefault("channelizer", Channelizer.WebSocketChannelizer.class.getName()).toString(); |
| final String serializer = options.getOrDefault("serializer", Serializers.GRAPHBINARY_V1D0.name()).toString(); |
| |
| final boolean exercise = Boolean.parseBoolean(options.getOrDefault("exercise", "false").toString()); |
| final String script = options.getOrDefault("script", "1+1").toString(); |
| |
| final Cluster cluster = Cluster.build(host) |
| .minConnectionPoolSize(minConnectionPoolSize) |
| .maxConnectionPoolSize(maxConnectionPoolSize) |
| .minSimultaneousUsagePerConnection(minSimultaneousUsagePerConnection) |
| .maxSimultaneousUsagePerConnection(maxSimultaneousUsagePerConnection) |
| .minInProcessPerConnection(minInProcessPerConnection) |
| .maxInProcessPerConnection(maxInProcessPerConnection) |
| .nioPoolSize(nioPoolSize) |
| .channelizer(channelizer) |
| .maxWaitForConnection(maxWaitForConnection) |
| .serializer(Serializers.valueOf(serializer)) |
| .workerPoolSize(workerPoolSize).create(); |
| |
| try { |
| if (exercise) { |
| System.out.println("--------------------------INITIALIZATION--------------------------"); |
| final Client client = cluster.connect(); |
| client.submit("graph.clear()").all().join(); |
| System.out.println("Cleared existing 'graph'"); |
| |
| client.submit("TinkerFactory.generateModern(graph)").all().join(); |
| client.close(); |
| |
| System.out.println("Modern graph loaded"); |
| } |
| |
| final Object fileName = options.get("store"); |
| final File f = null == fileName ? null : new File(fileName.toString()); |
| if (f != null && f.length() == 0) { |
| try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) { |
| writer.println("parallelism\tnioPoolSize\tminConnectionPoolSize\tmaxConnectionPoolSize\tminSimultaneousUsagePerConnection\tmaxSimultaneousUsagePerConnection\tminInProcessPerConnection\tmaxInProcessPerConnection\tworkerPoolSize\trequestPerSecond"); |
| } |
| } |
| |
| // not much point to continuing with a line of tests if we can't get at least minExpectedRps. |
| final AtomicBoolean meetsRpsExpectation = new AtomicBoolean(true); |
| System.out.println("---------------------------WARMUP CYCLE---------------------------"); |
| for (int ix = 0; ix < warmups && meetsRpsExpectation.get(); ix++) { |
| final long averageRequestsPerSecond = new ProfilingApplication("warmup-" + (ix + 1), cluster, 1000, executor, script, tooSlowThreshold, exercise).execute(); |
| meetsRpsExpectation.set(averageRequestsPerSecond > minExpectedRps); |
| TimeUnit.SECONDS.sleep(1); // pause between executions |
| } |
| |
| final AtomicBoolean exceededTimeout = new AtomicBoolean(false); |
| long totalRequestsPerSecond = 0; |
| |
| // no need to execute this if we didn't pass the basic expectation in the warmups |
| if (exercise || meetsRpsExpectation.get()) { |
| final long start = System.nanoTime(); |
| System.out.println("----------------------------TEST CYCLE----------------------------"); |
| for (int ix = 0; ix < executions && !exceededTimeout.get(); ix++) { |
| totalRequestsPerSecond += new ProfilingApplication("test-" + (ix + 1), cluster, requests, executor, script, tooSlowThreshold, exercise).execute(); |
| exceededTimeout.set((System.nanoTime() - start) > TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS)); |
| TimeUnit.SECONDS.sleep(1); // pause between executions |
| } |
| } |
| |
| final int averageRequestPerSecond = !meetsRpsExpectation.get() || exceededTimeout.get() ? 0 : Math.round(totalRequestsPerSecond / executions); |
| System.out.println(String.format("avg req/sec: %s", averageRequestPerSecond)); |
| if (f != null) { |
| try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) { |
| writer.println(String.join("\t", String.valueOf(parallelism), String.valueOf(nioPoolSize), String.valueOf(minConnectionPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(minSimultaneousUsagePerConnection), String.valueOf(maxSimultaneousUsagePerConnection), String.valueOf(minInProcessPerConnection), String.valueOf(maxInProcessPerConnection), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond))); |
| } |
| } |
| |
| if (!noExit) System.exit(0); |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| if (!noExit) System.exit(1); |
| } finally { |
| executor.shutdown(); |
| cluster.close(); |
| } |
| } |
| } |