| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.giraph.utils; |
| |
| import org.apache.giraph.conf.GiraphConfiguration; |
| import org.apache.giraph.conf.GiraphConstants; |
| import org.apache.giraph.io.formats.GiraphFileInputFormat; |
| import org.apache.giraph.job.GiraphJob; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.WritableComparable; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| import org.apache.log4j.Logger; |
| import org.apache.zookeeper.server.NIOServerCnxn; |
| import org.apache.zookeeper.server.ServerConfig; |
| import org.apache.zookeeper.server.ZooKeeperServerMain; |
| import org.apache.zookeeper.server.quorum.QuorumPeerConfig; |
| |
| import com.google.common.base.Charsets; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.io.Files; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.lang.reflect.Field; |
| import java.util.Properties; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| |
| /** |
| * A base class for running internal tests on a vertex |
| * |
| * Extending classes only have to invoke the run() method to test their vertex. |
| * All data is written to a local tmp directory that is removed afterwards. |
| * A local zookeeper instance is started in an extra thread and |
| * shutdown at the end. |
| * |
| * Heavily inspired from Apache Mahout's MahoutTestCase |
| */ |
| @SuppressWarnings("unchecked") |
| public class InternalVertexRunner { |
| /** ZooKeeper port to use for tests */ |
| public static final int LOCAL_ZOOKEEPER_PORT = 22182; |
| |
| /** Logger */ |
| private static final Logger LOG = |
| Logger.getLogger(InternalVertexRunner.class); |
| |
| /** Don't construct */ |
| private InternalVertexRunner() { } |
| |
| /** |
| * Attempts to run the vertex internally in the current JVM, reading from and |
| * writing to a temporary folder on local disk. Will start its own zookeeper |
| * instance. |
| * |
| * |
| * @param conf GiraphClasses specifying which types to use |
| * @param vertexInputData linewise vertex input data |
| * @return linewise output data |
| * @throws Exception if anything goes wrong |
| */ |
| public static Iterable<String> run( |
| GiraphConfiguration conf, |
| String[] vertexInputData) throws Exception { |
| return run(conf, vertexInputData, null); |
| } |
| |
| /** |
| * Attempts to run the vertex internally in the current JVM, reading from and |
| * writing to a temporary folder on local disk. Will start its own zookeeper |
| * instance. |
| * |
| * |
| * @param conf GiraphClasses specifying which types to use |
| * @param vertexInputData linewise vertex input data |
| * @param edgeInputData linewise edge input data |
| * @return linewise output data |
| * @throws Exception if anything goes wrong |
| */ |
| public static Iterable<String> run( |
| GiraphConfiguration conf, |
| String[] vertexInputData, |
| String[] edgeInputData) throws Exception { |
| File tmpDir = null; |
| try { |
| // Prepare input file, output folder and temporary folders |
| tmpDir = FileUtils.createTestDir(conf.getVertexClass()); |
| |
| File vertexInputFile = null; |
| File edgeInputFile = null; |
| if (conf.hasVertexInputFormat()) { |
| vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt"); |
| } |
| if (conf.hasEdgeInputFormat()) { |
| edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt"); |
| } |
| |
| File outputDir = FileUtils.createTempDir(tmpDir, "output"); |
| File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper"); |
| File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir"); |
| File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints"); |
| |
| // Write input data to disk |
| if (conf.hasVertexInputFormat()) { |
| FileUtils.writeLines(vertexInputFile, vertexInputData); |
| } |
| if (conf.hasEdgeInputFormat()) { |
| FileUtils.writeLines(edgeInputFile, edgeInputData); |
| } |
| |
| conf.setWorkerConfiguration(1, 1, 100.0f); |
| GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false); |
| GiraphConstants.LOCAL_TEST_MODE.set(conf, true); |
| conf.set(GiraphConstants.ZOOKEEPER_LIST, "localhost:" + |
| String.valueOf(LOCAL_ZOOKEEPER_PORT)); |
| |
| conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString()); |
| GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, |
| zkMgrDir.toString()); |
| GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString()); |
| |
| // Create and configure the job to run the vertex |
| GiraphJob job = new GiraphJob(conf, conf.getVertexClass().getName()); |
| |
| Job internalJob = job.getInternalJob(); |
| if (conf.hasVertexInputFormat()) { |
| GiraphFileInputFormat.setVertexInputPath(internalJob.getConfiguration(), |
| new Path(vertexInputFile.toString())); |
| } |
| if (conf.hasEdgeInputFormat()) { |
| GiraphFileInputFormat.setEdgeInputPath(internalJob.getConfiguration(), |
| new Path(edgeInputFile.toString())); |
| } |
| FileOutputFormat.setOutputPath(job.getInternalJob(), |
| new Path(outputDir.toString())); |
| |
| // Configure a local zookeeper instance |
| Properties zkProperties = configLocalZooKeeper(zkDir); |
| |
| QuorumPeerConfig qpConfig = new QuorumPeerConfig(); |
| qpConfig.parseProperties(zkProperties); |
| |
| // Create and run the zookeeper instance |
| final InternalZooKeeper zookeeper = new InternalZooKeeper(); |
| final ServerConfig zkConfig = new ServerConfig(); |
| zkConfig.readFrom(qpConfig); |
| |
| ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| executorService.execute(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| zookeeper.runFromConfig(zkConfig); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| }); |
| try { |
| job.run(true); |
| } finally { |
| executorService.shutdown(); |
| zookeeper.end(); |
| } |
| |
| if (conf.hasVertexOutputFormat()) { |
| return Files.readLines(new File(outputDir, "part-m-00000"), |
| Charsets.UTF_8); |
| } else { |
| return ImmutableList.of(); |
| } |
| } finally { |
| FileUtils.delete(tmpDir); |
| } |
| } |
| |
| /** |
| * Attempts to run the vertex internally in the current JVM, reading and |
| * writing to an in-memory graph. Will start its own zookeeper |
| * instance. |
| * |
| * @param <I> Vertex ID |
| * @param <V> Vertex Value |
| * @param <E> Edge Value |
| * @param <M> Message Value |
| * @param conf GiraphClasses specifying which types to use |
| * @param graph input graph |
| * @return iterable output data |
| * @throws Exception if anything goes wrong |
| */ |
| public static <I extends WritableComparable, |
| V extends Writable, |
| E extends Writable, |
| M extends Writable> TestGraph<I, V, E, M> run( |
| GiraphConfiguration conf, |
| TestGraph<I, V, E, M> graph) throws Exception { |
| File tmpDir = null; |
| try { |
| // Prepare temporary folders |
| tmpDir = FileUtils.createTestDir(conf.getVertexClass()); |
| |
| File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper"); |
| File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir"); |
| File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints"); |
| |
| conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class); |
| |
| // Create and configure the job to run the vertex |
| GiraphJob job = new GiraphJob(conf, conf.getVertexClass().getName()); |
| |
| InMemoryVertexInputFormat.setGraph(graph); |
| |
| conf.setWorkerConfiguration(1, 1, 100.0f); |
| GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false); |
| GiraphConstants.LOCAL_TEST_MODE.set(conf, true); |
| conf.set(GiraphConstants.ZOOKEEPER_LIST, "localhost:" + |
| String.valueOf(LOCAL_ZOOKEEPER_PORT)); |
| |
| conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString()); |
| GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, |
| zkMgrDir.toString()); |
| GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString()); |
| |
| // Configure a local zookeeper instance |
| Properties zkProperties = configLocalZooKeeper(zkDir); |
| |
| QuorumPeerConfig qpConfig = new QuorumPeerConfig(); |
| qpConfig.parseProperties(zkProperties); |
| |
| // Create and run the zookeeper instance |
| final InternalZooKeeper zookeeper = new InternalZooKeeper(); |
| final ServerConfig zkConfig = new ServerConfig(); |
| zkConfig.readFrom(qpConfig); |
| |
| ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| executorService.execute(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| zookeeper.runFromConfig(zkConfig); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| }); |
| try { |
| job.run(true); |
| } finally { |
| executorService.shutdown(); |
| zookeeper.end(); |
| } |
| return graph; |
| } finally { |
| FileUtils.delete(tmpDir); |
| } |
| } |
| |
| /** |
| * Configuration options for running local ZK. |
| * |
| * @param zkDir directory for ZK to hold files in. |
| * @return Properties configured for local ZK. |
| */ |
| private static Properties configLocalZooKeeper(File zkDir) { |
| Properties zkProperties = new Properties(); |
| zkProperties.setProperty("tickTime", "2000"); |
| zkProperties.setProperty("dataDir", zkDir.getAbsolutePath()); |
| zkProperties.setProperty("clientPort", |
| String.valueOf(LOCAL_ZOOKEEPER_PORT)); |
| zkProperties.setProperty("maxClientCnxns", "10000"); |
| zkProperties.setProperty("minSessionTimeout", "10000"); |
| zkProperties.setProperty("maxSessionTimeout", "100000"); |
| zkProperties.setProperty("initLimit", "10"); |
| zkProperties.setProperty("syncLimit", "5"); |
| zkProperties.setProperty("snapCount", "50000"); |
| return zkProperties; |
| } |
| |
| /** |
| * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown |
| */ |
| private static class InternalZooKeeper extends ZooKeeperServerMain { |
| /** |
| * Shutdown the ZooKeeper instance. |
| */ |
| void end() { |
| if (getCnxnFactory() != null) { |
| shutdown(); |
| } |
| } |
| |
| /** |
| * Get the ZooKeeper connection factory using reflection. |
| * @return {@link NIOServerCnxn.Factory} from ZooKeeper |
| */ |
| private NIOServerCnxn.Factory getCnxnFactory() { |
| NIOServerCnxn.Factory factory = null; |
| try { |
| Field field = ZooKeeperServerMain.class.getDeclaredField("cnxnFactory"); |
| field.setAccessible(true); |
| factory = (NIOServerCnxn.Factory) field.get(this); |
| // CHECKSTYLE: stop IllegalCatch |
| } catch (Exception e) { |
| // CHECKSTYLE: resume IllegalCatch |
| LOG.error("Couldn't get cnxn factory", e); |
| } |
| return factory; |
| } |
| } |
| } |