blob: 27a28a1e992c536e71d3d1bc67f7aacedcb4d1f4 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.accumulo.examples.wikisearch.ingest;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.SimpleAnalyzer;
public class WikipediaConfiguration {
public final static String INSTANCE_NAME = "wikipedia.accumulo.instance_name";
public final static String USER = "wikipedia.accumulo.user";
public final static String PASSWORD = "wikipedia.accumulo.password";
public final static String TABLE_NAME = "wikipedia.accumulo.table";
public final static String ZOOKEEPERS = "wikipedia.accumulo.zookeepers";
public final static String NAMESPACES_FILENAME = "wikipedia.namespaces.filename";
public final static String LANGUAGES_FILENAME = "wikipedia.languages.filename";
public final static String WORKING_DIRECTORY = "wikipedia.ingest.working";
public final static String ANALYZER = "wikipedia.index.analyzer";
public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions";
public final static String NUM_GROUPS = "wikipedia.ingest.groups";
public final static String PARTITIONED_ARTICLES_DIRECTORY = "";
public final static String RUN_PARTITIONER = "";
public final static String RUN_INGEST = "";
public final static String BULK_INGEST = "wikipedia.bulk.ingest";
public final static String BULK_INGEST_DIR = "wikipedia.bulk.ingest.dir";
public final static String BULK_INGEST_FAILURE_DIR = "wikipedia.bulk.ingest.failure.dir";
public final static String BULK_INGEST_BUFFER_SIZE = "wikipedia.bulk.ingest.buffer.size";
public final static String PARTITIONED_INPUT_MIN_SPLIT_SIZE = "wikipedia.min.input.split.size";
public static String getUser(Configuration conf) {
return conf.get(USER);
public static byte[] getPassword(Configuration conf) {
String pass = conf.get(PASSWORD);
if (pass == null) {
return null;
return pass.getBytes();
public static String getTableName(Configuration conf) {
String tablename = conf.get(TABLE_NAME);
if (tablename == null) {
throw new RuntimeException("No data table name specified in " + TABLE_NAME);
return tablename;
public static String getInstanceName(Configuration conf) {
return conf.get(INSTANCE_NAME);
public static String getZookeepers(Configuration conf) {
String zookeepers = conf.get(ZOOKEEPERS);
if (zookeepers == null) {
throw new RuntimeException("No zookeepers specified in " + ZOOKEEPERS);
return zookeepers;
public static Path getNamespacesFile(Configuration conf) {
String filename = conf.get(NAMESPACES_FILENAME, new Path(getWorkingDirectory(conf), "namespaces.dat").toString());
return new Path(filename);
public static Path getLanguagesFile(Configuration conf) {
String filename = conf.get(LANGUAGES_FILENAME, new Path(getWorkingDirectory(conf), "languages.txt").toString());
return new Path(filename);
public static Path getWorkingDirectory(Configuration conf) {
String filename = conf.get(WORKING_DIRECTORY);
return new Path(filename);
public static Analyzer getAnalyzer(Configuration conf) throws IOException {
Class<? extends Analyzer> analyzerClass = conf.getClass(ANALYZER, SimpleAnalyzer.class, Analyzer.class);
return ReflectionUtils.newInstance(analyzerClass, conf);
public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException {
return getInstance(conf).getConnector(getUser(conf), getPassword(conf));
public static Instance getInstance(Configuration conf) {
return new ZooKeeperInstance(getInstanceName(conf), getZookeepers(conf));
public static int getNumPartitions(Configuration conf) {
return conf.getInt(NUM_PARTITIONS, 25);
public static int getNumGroups(Configuration conf) {
return conf.getInt(NUM_GROUPS, 1);
public static Path getPartitionedArticlesPath(Configuration conf) {
return new Path(conf.get(PARTITIONED_ARTICLES_DIRECTORY));
public static long getMinInputSplitSize(Configuration conf) {
return conf.getLong(PARTITIONED_INPUT_MIN_SPLIT_SIZE, 1l << 27);
public static boolean runPartitioner(Configuration conf) {
return conf.getBoolean(RUN_PARTITIONER, false);
public static boolean runIngest(Configuration conf) {
return conf.getBoolean(RUN_INGEST, true);
public static boolean bulkIngest(Configuration conf) {
return conf.getBoolean(BULK_INGEST, true);
public static String bulkIngestDir(Configuration conf) {
return conf.get(BULK_INGEST_DIR);
public static String bulkIngestFailureDir(Configuration conf) {
return conf.get(BULK_INGEST_FAILURE_DIR);
public static long bulkIngestBufferSize(Configuration conf) {
return conf.getLong(BULK_INGEST_BUFFER_SIZE,1l<<28);
* Helper method to get properties from Hadoop configuration
* @param <T>
* @param conf
* @param propertyName
* @param resultClass
* @throws IllegalArgumentException
* if property is not defined, null, or empty. Or if resultClass is not handled.
* @return value of property
public static <T> T isNull(Configuration conf, String propertyName, Class<T> resultClass) {
String p = conf.get(propertyName);
if (StringUtils.isEmpty(p))
throw new IllegalArgumentException(propertyName + " must be specified");
if (resultClass.equals(String.class))
return (T) p;
else if (resultClass.equals(String[].class))
return (T) conf.getStrings(propertyName);
else if (resultClass.equals(Boolean.class))
return (T) Boolean.valueOf(p);
else if (resultClass.equals(Long.class))
return (T) Long.valueOf(p);
else if (resultClass.equals(Integer.class))
return (T) Integer.valueOf(p);
else if (resultClass.equals(Float.class))
return (T) Float.valueOf(p);
else if (resultClass.equals(Double.class))
return (T) Double.valueOf(p);
throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");