Third round of updates.
diff --git a/blur-indexer/pom.xml b/blur-indexer/pom.xml
new file mode 100644
index 0000000..c7c1753
--- /dev/null
+++ b/blur-indexer/pom.xml
@@ -0,0 +1,58 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.blur</groupId>
+ <artifactId>blur-indexer</artifactId>
+ <version>0.2.8</version>
+ <name>blur-indexer</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <blur.version>0.3.0.incubating.2.5.0.cdh5.3.3-SNAPSHOT</blur.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.blur</groupId>
+ <artifactId>blur-mapred</artifactId>
+ <version>${blur.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.9</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptor>src/main/assemble/bin.xml</descriptor>
+ <finalName>blur-indexer-${project.version}</finalName>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/blur-indexer/src/main/assemble/bin.xml b/blur-indexer/src/main/assemble/bin.xml
new file mode 100644
index 0000000..5fddd56
--- /dev/null
+++ b/blur-indexer/src/main/assemble/bin.xml
@@ -0,0 +1,45 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <dependencySets>
+ <dependencySet>
+ <useProjectArtifact>true</useProjectArtifact>
+ <outputDirectory>blur-indexer-${project.version}/lib</outputDirectory>
+ <unpack>false</unpack>
+ <includes>
+ <include>org.apache.blur:blur-indexer</include>
+ <include>org.apache.blur:*</include>
+ <include>org.apache.zookeeper:zookeeper</include>
+ <include>org.slf4j:slf4j-api</include>
+ <include>org.slf4j:slf4j-log4j12</include>
+ <include>org.json:json</include>
+ <include>log4j:log4j</include>
+ <include>com.yammer.metrics:*</include>
+ <include>com.google.guava:guava</include>
+ <include>org.apache.httpcomponents:*</include>
+ <include>org.apache.lucene:*</include>
+ <include>com.spatial4j:spatial4j</include>
+ <include>commons-cli:commons-cli</include>
+ <include>org.eclipse.jetty:*</include>
+ <include>com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru</include>
+ <include>jline:jline</include>
+ <include>com.fasterxml.jackson.core:*</include>
+ </includes>
+ </dependencySet>
+ </dependencySets>
+
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.scriptSourceDirectory}</directory>
+ <outputDirectory>blur-indexer-${project.version}</outputDirectory>
+ <excludes>
+ <exclude>**/.empty</exclude>
+ </excludes>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
new file mode 100644
index 0000000..a9caabb
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
@@ -0,0 +1,17 @@
+package org.apache.blur.mapreduce.lib.update;
+
+public enum BlurIndexCounter {
+
+ NEW_RECORDS,
+ ROW_IDS_FROM_INDEX,
+ ROW_IDS_TO_UPDATE_FROM_NEW_DATA,
+ ROW_IDS_FROM_NEW_DATA,
+
+ INPUT_FORMAT_MAPPER,
+ INPUT_FORMAT_EXISTING_RECORDS,
+
+ LOOKUP_MAPPER,
+ LOOKUP_MAPPER_EXISTING_RECORDS,
+ LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT
+
+}
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java
new file mode 100644
index 0000000..d44adf1
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java
@@ -0,0 +1,362 @@
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.mapreduce.lib.BlurInputFormat;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.xml.DOMConfigurator;
+
+public class ClusterDriver extends Configured implements Tool {
+
+ private static final String BLUR_ENV = "blur.env";
+ private static final Log LOG = LogFactory.getLog(ClusterDriver.class);
+ private static final String _SEP = "_";
+ private static final String IMPORT = "import";
+
+ public static void main(String[] args) throws Exception {
+ String logFilePath = System.getenv("BLUR_INDEXER_LOG_FILE");
+ System.out.println("Log file path [" + logFilePath + "]");
+ System.setProperty("BLUR_INDEXER_LOG_FILE", logFilePath);
+ URL url = ClusterDriver.class.getResource("/program-log4j.xml");
+ if (url != null) {
+ LOG.info("Reseting log4j config from classpath resource [{0}]", url);
+ LogManager.resetConfiguration();
+ DOMConfigurator.configure(url);
+ }
+ int res = ToolRunner.run(new Configuration(), new ClusterDriver(), args);
+ System.exit(res);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ int c = 0;
+ final String blurEnv = args[c++];
+ final String blurZkConnection = args[c++];
+ final String extraConfig = args[c++];
+ final int reducerMultiplier = Integer.parseInt(args[c++]);
+ final Configuration conf = getConf();
+
+ final ExecutorService service = Executors.newCachedThreadPool();
+ final AtomicBoolean running = new AtomicBoolean();
+ running.set(true);
+
+ // Load configs for all filesystems.
+ Path path = new Path(extraConfig);
+ Configuration mergeHdfsConfigs = HdfsConfigurationNamespaceMerge.mergeHdfsConfigs(path.getFileSystem(conf), path);
+ conf.addResource(mergeHdfsConfigs);
+ conf.set(BlurConstants.BLUR_ZOOKEEPER_CONNECTION, blurZkConnection);
+ conf.set(BLUR_ENV, blurEnv);
+
+ final Iface client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
+
+ stopAllExistingMRJobs(blurEnv, conf);
+ cleanUpOldImportDirs(client, conf);
+ moveInprogressDirsBackToNew(client, conf);
+ unlockLockedTables(client);
+
+ Map<String, Future<Void>> futures = new HashMap<String, Future<Void>>();
+ while (running.get()) {
+ LOG.debug("Starting index update check for blur cluster [" + blurZkConnection + "].");
+ try {
+ List<String> tableList = client.tableList();
+ startMissingIndexerThreads(tableList, service, futures, blurZkConnection, conf, client, reducerMultiplier);
+ } catch (TException t) {
+ LOG.error("Unknown Blur Thrift Error, Retrying...", t);
+ }
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+ }
+ return 0;
+ }
+
+ private void unlockLockedTables(Iface client) throws BlurException, TException {
+ List<String> tableList = client.tableList();
+ for (String table : tableList) {
+ TableDescriptor tableDescriptor = client.describe(table);
+ if (tableDescriptor.isEnabled()) {
+ unlockLockedTables(client, table);
+ }
+ }
+ }
+
+ private void unlockLockedTables(Iface client, String table) throws BlurException, TException {
+ Map<String, List<String>> listSnapshots = client.listSnapshots(table);
+ for (Entry<String, List<String>> e : listSnapshots.entrySet()) {
+ List<String> value = e.getValue();
+ if (value.contains(FasterDriver.MRUPDATE_SNAPSHOT)) {
+ LOG.info("Unlocking table [{0}]", table);
+ client.removeSnapshot(table, FasterDriver.MRUPDATE_SNAPSHOT);
+ return;
+ }
+ }
+ }
+
+ private void moveInprogressDirsBackToNew(Iface client, Configuration conf) throws BlurException, TException,
+ IOException {
+ List<String> tableList = client.tableList();
+ for (String table : tableList) {
+ String mrIncWorkingPathStr = getMRIncWorkingPathStr(client, table);
+ Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
+ Path newData = new Path(mrIncWorkingPath, FasterDriver.NEW);
+ Path inprogressData = new Path(mrIncWorkingPath, FasterDriver.INPROGRESS);
+ FileSystem fileSystem = inprogressData.getFileSystem(conf);
+ FileStatus[] listStatus = fileSystem.listStatus(inprogressData);
+ for (FileStatus fileStatus : listStatus) {
+ Path src = fileStatus.getPath();
+ Path dst = new Path(newData, src.getName());
+ if (fileSystem.rename(src, dst)) {
+ LOG.info("Moved [{0}] to [{1}] to be reprocessed.", src, dst);
+ } else {
+ LOG.error("Could not move [{0}] to [{1}] to be reprocessed.", src, dst);
+ }
+ }
+ }
+ }
+
+ private void cleanUpOldImportDirs(Iface client, Configuration conf) throws BlurException, TException, IOException {
+ List<String> tableList = client.tableList();
+ for (String table : tableList) {
+ cleanUpOldImportDirs(client, conf, table);
+ }
+ }
+
+ private void cleanUpOldImportDirs(Iface client, Configuration conf, String table) throws BlurException, TException,
+ IOException {
+ TableDescriptor descriptor = client.describe(table);
+ String tableUri = descriptor.getTableUri();
+ Path tablePath = new Path(tableUri);
+ FileSystem fileSystem = tablePath.getFileSystem(getConf());
+ Path importPath = new Path(tablePath, IMPORT);
+ if (fileSystem.exists(importPath)) {
+ for (FileStatus fileStatus : fileSystem.listStatus(importPath)) {
+ Path path = fileStatus.getPath();
+ LOG.info("Removing failed import [{0}]", path);
+ fileSystem.delete(path, true);
+ }
+ }
+ }
+
+ private void stopAllExistingMRJobs(String blurEnv, Configuration conf) throws YarnException, IOException,
+ InterruptedException {
+ Cluster cluster = new Cluster(conf);
+ JobStatus[] allJobStatuses = cluster.getAllJobStatuses();
+ for (JobStatus jobStatus : allJobStatuses) {
+ if (jobStatus.isJobComplete()) {
+ continue;
+ }
+ String jobFile = jobStatus.getJobFile();
+ JobID jobID = jobStatus.getJobID();
+ Job job = cluster.getJob(jobID);
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+ Configuration configuration = new Configuration(false);
+ Path path = new Path(jobFile);
+ Path makeQualified = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
+ if (hasReadAccess(fileSystem, makeQualified)) {
+ try (FSDataInputStream in = fileSystem.open(makeQualified)) {
+ configuration.addResource(copy(in));
+ }
+ String jobBlurEnv = configuration.get(BLUR_ENV);
+ LOG.info("Checking job [{0}] has env [{1}] current env set to [{2}]", jobID, jobBlurEnv, blurEnv);
+ if (blurEnv.equals(jobBlurEnv)) {
+ LOG.info("Killing running job [{0}]", jobID);
+ job.killJob();
+ }
+ }
+ }
+ }
+
+ private static InputStream copy(FSDataInputStream input) throws IOException {
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ IOUtils.copy(input, outputStream);
+ return new ByteArrayInputStream(outputStream.toByteArray());
+ }
+ }
+
+ private static boolean hasReadAccess(FileSystem fileSystem, Path p) {
+ try {
+ fileSystem.access(p, FsAction.READ);
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ private Callable<Void> getCallable(final String blurZkConnection, final Configuration conf, final Iface client,
+ final String table, final int reducerMultiplier) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ String originalThreadName = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName(table);
+ if (!isEnabled(client, table)) {
+ LOG.info("Table [" + table + "] is not enabled.");
+ return null;
+ }
+ waitForDataToLoad(client, table);
+ LOG.debug("Starting index update for table [" + table + "].");
+ final String mrIncWorkingPathStr = getMRIncWorkingPathStr(client, table);
+ final String outputPathStr = getOutputPathStr(client, table);
+ Path path = new Path(outputPathStr);
+ FileSystem fileSystem = path.getFileSystem(getConf());
+
+ Configuration configuration = new Configuration(conf);
+ BlurInputFormat.setMaxNumberOfMaps(configuration, 10000);
+
+ FasterDriver driver = new FasterDriver();
+ driver.setConf(configuration);
+ try {
+ driver.run(new String[] { table, mrIncWorkingPathStr, outputPathStr, blurZkConnection,
+ Integer.toString(reducerMultiplier) });
+ } finally {
+ if (fileSystem.exists(path)) {
+ fileSystem.delete(path, true);
+ }
+ }
+ return null;
+ } finally {
+ Thread.currentThread().setName(originalThreadName);
+ }
+ }
+ };
+ }
+
+ private void startMissingIndexerThreads(List<String> tableList, ExecutorService service,
+ Map<String, Future<Void>> futures, final String blurZkConnection, final Configuration conf, final Iface client,
+ int reducerMultiplier) throws BlurException, TException {
+ Set<String> tables = new HashSet<String>(tableList);
+
+ // remove futures that are complete
+ for (String table : tables) {
+ Future<Void> future = futures.get(table);
+ if (future != null) {
+ if (future.isDone()) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ LOG.error("Unknown error while processing table [" + table + "].", e);
+ } catch (ExecutionException e) {
+ LOG.error("Unknown error while processing table [" + table + "].", e.getCause());
+ }
+ futures.remove(table);
+ } else {
+ LOG.info("Update for table [" + table + "] still running.");
+ }
+ }
+ }
+
+ // start missing tables
+ for (String table : tables) {
+ if (!futures.containsKey(table)) {
+ if (isEnabled(client, table)) {
+ Future<Void> future = service.submit(getCallable(blurZkConnection, conf, client, table, reducerMultiplier));
+ futures.put(table, future);
+ }
+ }
+ }
+ }
+
+ public static void waitForDataToLoad(Iface client, String table) throws BlurException, TException,
+ InterruptedException {
+ if (isFullyLoaded(client.tableStats(table))) {
+ return;
+ }
+ while (true) {
+ TableStats tableStats = client.tableStats(table);
+ if (isFullyLoaded(tableStats)) {
+ LOG.info("Data load complete in table [" + table + "] [" + tableStats + "]");
+ return;
+ }
+ LOG.info("Waiting for data to load in table [" + table + "] [" + tableStats + "]");
+ Thread.sleep(5000);
+ }
+ }
+
+ private static boolean isFullyLoaded(TableStats tableStats) {
+ if (tableStats.getSegmentImportInProgressCount() == 0 && tableStats.getSegmentImportPendingCount() == 0) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isEnabled(Iface client, String table) throws BlurException, TException {
+ TableDescriptor tableDescriptor = client.describe(table);
+ return tableDescriptor.isEnabled();
+ }
+
+ private void mkdirs(FileSystem fileSystem, Path path) throws IOException {
+ if (fileSystem.exists(path)) {
+ return;
+ }
+ LOG.info("Creating path [" + path + "].");
+ if (!fileSystem.mkdirs(path)) {
+ LOG.error("Path [" + path + "] could not be created.");
+ }
+ }
+
+ public static String getMRIncWorkingPathStr(Iface client, String table) throws BlurException, TException, IOException {
+ TableDescriptor descriptor = client.describe(table);
+ Map<String, String> tableProperties = descriptor.getTableProperties();
+ if (tableProperties != null) {
+ String workingPath = tableProperties.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
+ if (workingPath != null) {
+ return workingPath;
+ }
+ }
+ throw new IOException("Table [" + table + "] does not have the property ["
+ + BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH + "] setup correctly.");
+ }
+
+ private String getOutputPathStr(Iface client, String table) throws BlurException, TException, IOException {
+ TableDescriptor descriptor = client.describe(table);
+ String tableUri = descriptor.getTableUri();
+ Path tablePath = new Path(tableUri);
+ FileSystem fileSystem = tablePath.getFileSystem(getConf());
+ Path importPath = new Path(tablePath, IMPORT);
+ mkdirs(fileSystem, importPath);
+ return new Path(importPath, IMPORT + _SEP + System.currentTimeMillis() + _SEP + UUID.randomUUID().toString())
+ .toString();
+ }
+
+}
\ No newline at end of file
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java
new file mode 100644
index 0000000..f43cba5
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.mapreduce.lib.BlurInputFormat;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class FasterDriver extends Configured implements Tool {
+
+ public static final String BLUR_UPDATE_ID = "blur.update.id";
+ private static final String BLUR_EXEC_TYPE = "blur.exec.type";
+ public static final String TMP = "tmp";
+
+ public enum EXEC {
+ MR_ONLY, MR_WITH_LOOKUP, AUTOMATIC
+ }
+
+ public static final String MRUPDATE_SNAPSHOT = "mrupdate-snapshot";
+ public static final String CACHE = "cache";
+ public static final String COMPLETE = "complete";
+ public static final String INPROGRESS = "inprogress";
+ public static final String NEW = "new";
+ private static final Log LOG = LogFactory.getLog(FasterDriver.class);
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new FasterDriver(), args);
+ System.exit(res);
+ }
+
+ static class PartitionedInputResult {
+ final Path _partitionedInputData;
+ final Counters _counters;
+ final long[] _rowIdsFromNewData;
+ final long[] _rowIdsToUpdateFromNewData;
+ final long[] _rowIdsFromIndex;
+
+ PartitionedInputResult(Path partitionedInputData, Counters counters, int shards, TaskReport[] taskReports) {
+ _partitionedInputData = partitionedInputData;
+ _counters = counters;
+ _rowIdsFromNewData = new long[shards];
+ _rowIdsToUpdateFromNewData = new long[shards];
+ _rowIdsFromIndex = new long[shards];
+ for (TaskReport tr : taskReports) {
+ int id = tr.getTaskID().getId();
+ Counters taskCounters = tr.getTaskCounters();
+ Counter total = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA);
+ _rowIdsFromNewData[id] = total.getValue();
+ Counter update = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA);
+ _rowIdsToUpdateFromNewData[id] = update.getValue();
+ Counter index = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX);
+ _rowIdsFromIndex[id] = index.getValue();
+ }
+ }
+
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ int c = 0;
+ if (args.length < 5) {
+ System.err
+ .println("Usage Driver <table> <mr inc working path> <output path> <zk connection> <reducer multipler> <extra config files...>");
+ return 1;
+ }
+ String table = args[c++];
+ String mrIncWorkingPathStr = args[c++];
+ String outputPathStr = args[c++];
+ String blurZkConnection = args[c++];
+ int reducerMultipler = Integer.parseInt(args[c++]);
+ for (; c < args.length; c++) {
+ String externalConfigFileToAdd = args[c];
+ getConf().addResource(new Path(externalConfigFileToAdd));
+ }
+
+ Path outputPath = new Path(outputPathStr);
+ Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
+ FileSystem fileSystem = mrIncWorkingPath.getFileSystem(getConf());
+
+ Path newData = new Path(mrIncWorkingPath, NEW);
+ Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS);
+ Path completeData = new Path(mrIncWorkingPath, COMPLETE);
+ Path fileCache = new Path(mrIncWorkingPath, CACHE);
+ Path tmpPathDontDelete = new Path(mrIncWorkingPath, TMP);
+
+ Path tmpPath = new Path(tmpPathDontDelete, UUID.randomUUID().toString());
+
+ fileSystem.mkdirs(newData);
+ fileSystem.mkdirs(inprogressData);
+ fileSystem.mkdirs(completeData);
+ fileSystem.mkdirs(fileCache);
+
+ List<Path> srcPathList = new ArrayList<Path>();
+ for (FileStatus fileStatus : fileSystem.listStatus(newData)) {
+ srcPathList.add(fileStatus.getPath());
+ }
+ if (srcPathList.isEmpty()) {
+ return 0;
+ }
+
+ List<Path> inprogressPathList = new ArrayList<Path>();
+ boolean success = false;
+ Iface client = null;
+
+ EXEC exec = EXEC.valueOf(getConf().get(BLUR_EXEC_TYPE, EXEC.AUTOMATIC.name()).toUpperCase());
+
+ String uuid = UUID.randomUUID().toString();
+
+ try {
+ client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
+ TableDescriptor descriptor = client.describe(table);
+ Map<String, String> tableProperties = descriptor.getTableProperties();
+ String fastDir = tableProperties.get("blur.table.disable.fast.dir");
+ if (fastDir == null || !fastDir.equals("true")) {
+ LOG.error("Table [{0}] has blur.table.disable.fast.dir enabled, not supported in fast MR update.", table);
+ return 1;
+ }
+
+ waitForOtherSnapshotsToBeRemoved(client, table, MRUPDATE_SNAPSHOT);
+ client.createSnapshot(table, MRUPDATE_SNAPSHOT);
+ TableStats tableStats = client.tableStats(table);
+
+ inprogressPathList = movePathList(fileSystem, inprogressData, srcPathList);
+
+ switch (exec) {
+ case MR_ONLY:
+ success = runMrOnly(descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler);
+ break;
+ case MR_WITH_LOOKUP:
+ success = runMrWithLookup(uuid, descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler,
+ tmpPath, tableStats, MRUPDATE_SNAPSHOT);
+ break;
+ case AUTOMATIC:
+ success = runAutomatic(uuid, descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler,
+ tmpPath, tableStats, MRUPDATE_SNAPSHOT);
+ break;
+ default:
+ throw new RuntimeException("Exec type [" + exec + "] not supported.");
+ }
+ } finally {
+ if (success) {
+ LOG.info("Associate lookup cache with new data!");
+ associateLookupCache(uuid, fileCache, outputPath);
+ LOG.info("Indexing job succeeded!");
+ client.loadData(table, outputPathStr);
+ LOG.info("Load data called");
+ movePathList(fileSystem, completeData, inprogressPathList);
+ LOG.info("Input data moved to complete");
+ ClusterDriver.waitForDataToLoad(client, table);
+ LOG.info("Data loaded");
+ } else {
+ LOG.error("Indexing job failed!");
+ movePathList(fileSystem, newData, inprogressPathList);
+ }
+ fileSystem.delete(tmpPath, true);
+ if (client != null) {
+ client.removeSnapshot(table, MRUPDATE_SNAPSHOT);
+ }
+ }
+
+ if (success) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+
+ private void associateLookupCache(String uuid, Path fileCache, Path outputPath) throws IOException {
+ FileSystem fileSystem = fileCache.getFileSystem(getConf());
+ cleanupExtraFileFromSpecX(fileSystem, uuid, fileCache);
+ associateLookupCache(fileSystem, uuid, fileSystem.getFileStatus(fileCache), outputPath);
+ }
+
+ private void cleanupExtraFileFromSpecX(FileSystem fileSystem, String uuid, Path fileCache) throws IOException {
+ FileStatus[] listStatus = fileSystem.listStatus(fileCache);
+ List<FileStatus> uuidPaths = new ArrayList<FileStatus>();
+ for (FileStatus fs : listStatus) {
+ Path path = fs.getPath();
+ if (fs.isDirectory()) {
+ cleanupExtraFileFromSpecX(fileSystem, uuid, path);
+ } else if (path.getName().startsWith(uuid)) {
+ uuidPaths.add(fs);
+ }
+ }
+ if (uuidPaths.size() > 1) {
+ deleteIncomplete(fileSystem, uuidPaths);
+ }
+ }
+
+ private void deleteIncomplete(FileSystem fileSystem, List<FileStatus> uuidPaths) throws IOException {
+ long max = 0;
+ FileStatus keeper = null;
+ for (FileStatus fs : uuidPaths) {
+ long len = fs.getLen();
+ if (len > max) {
+ keeper = fs;
+ max = len;
+ }
+ }
+ for (FileStatus fs : uuidPaths) {
+ if (fs != keeper) {
+ LOG.info("Deleteing incomplete cache file [{0}]", fs.getPath());
+ fileSystem.delete(fs.getPath(), false);
+ }
+ }
+ }
+
+ private void associateLookupCache(FileSystem fileSystem, String uuid, FileStatus fileCache, Path outputPath)
+ throws IOException {
+ Path path = fileCache.getPath();
+ if (fileCache.isDirectory()) {
+ FileStatus[] listStatus = fileSystem.listStatus(path);
+ for (FileStatus fs : listStatus) {
+ associateLookupCache(fileSystem, uuid, fs, outputPath);
+ }
+ } else if (path.getName().startsWith(uuid)) {
+ Path parent = path.getParent();
+ String shardName = parent.getName();
+ Path indexPath = findOutputDirPath(outputPath, shardName);
+ LOG.info("Path found for shard [{0}] outputPath [{1}]", shardName, outputPath);
+ String id = MergeSortRowIdMatcher.getIdForSingleSegmentIndex(getConf(), indexPath);
+ Path file = new Path(path.getParent(), id + ".seq");
+ MergeSortRowIdMatcher.commitWriter(getConf(), file, path);
+ }
+ }
+
+ private Path findOutputDirPath(Path outputPath, String shardName) throws IOException {
+ FileSystem fileSystem = outputPath.getFileSystem(getConf());
+ Path shardPath = new Path(outputPath, shardName);
+ if (!fileSystem.exists(shardPath)) {
+ throw new IOException("Shard path [" + shardPath + "]");
+ }
+ FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().endsWith(".commit");
+ }
+ });
+ if (listStatus.length == 1) {
+ FileStatus fs = listStatus[0];
+ return fs.getPath();
+ } else {
+ throw new IOException("More than one sub dir [" + shardPath + "]");
+ }
+ }
+
+ private boolean runAutomatic(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table,
+ Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot)
+ throws ClassNotFoundException, IOException, InterruptedException {
+ PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot,
+ fileCache);
+
+ Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
+
+ InputSplitPruneUtil.setBlurLookupRowIdFromNewDataCounts(job, table, result._rowIdsFromNewData);
+ InputSplitPruneUtil.setBlurLookupRowIdUpdateFromNewDataCounts(job, table, result._rowIdsToUpdateFromNewData);
+ InputSplitPruneUtil.setBlurLookupRowIdFromIndexCounts(job, table, result._rowIdsFromIndex);
+ InputSplitPruneUtil.setTable(job, table);
+
+ BlurInputFormat.setLocalCachePath(job, fileCache);
+
+ // Existing data - This adds the copy data files first open and stream
+ // through all documents.
+ {
+ Path tablePath = new Path(descriptor.getTableUri());
+ BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
+ MultipleInputs.addInputPath(job, tablePath, PrunedBlurInputFormat.class, MapperForExistingDataMod.class);
+ }
+
+ // Existing data - This adds the row id lookup
+ {
+ MapperForExistingDataWithIndexLookup.setSnapshot(job, MRUPDATE_SNAPSHOT);
+ FileInputFormat.addInputPath(job, result._partitionedInputData);
+ MultipleInputs.addInputPath(job, result._partitionedInputData, PrunedSequenceFileInputFormat.class,
+ MapperForExistingDataWithIndexLookup.class);
+ }
+
+ // New Data
+ for (Path p : inprogressPathList) {
+ FileInputFormat.addInputPath(job, p);
+ MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class);
+ }
+
+ BlurOutputFormat.setOutputPath(job, outputPath);
+ BlurOutputFormat.setupJob(job, descriptor);
+
+ job.setReducerClass(UpdateReducer.class);
+ job.setMapOutputKeyClass(IndexKey.class);
+ job.setMapOutputValueClass(IndexValue.class);
+ job.setPartitionerClass(IndexKeyPartitioner.class);
+ job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
+
+ BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
+
+ boolean success = job.waitForCompletion(true);
+ Counters counters = job.getCounters();
+ LOG.info("Counters [" + counters + "]");
+ return success;
+ }
+
+ private boolean runMrWithLookup(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table,
+ Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot)
+ throws ClassNotFoundException, IOException, InterruptedException {
+ PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot,
+ fileCache);
+
+ Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
+
+ MapperForExistingDataWithIndexLookup.setSnapshot(job, MRUPDATE_SNAPSHOT);
+ FileInputFormat.addInputPath(job, result._partitionedInputData);
+ MultipleInputs.addInputPath(job, result._partitionedInputData, SequenceFileInputFormat.class,
+ MapperForExistingDataWithIndexLookup.class);
+
+ for (Path p : inprogressPathList) {
+ FileInputFormat.addInputPath(job, p);
+ MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class);
+ }
+
+ BlurOutputFormat.setOutputPath(job, outputPath);
+ BlurOutputFormat.setupJob(job, descriptor);
+
+ job.setReducerClass(UpdateReducer.class);
+ job.setMapOutputKeyClass(IndexKey.class);
+ job.setMapOutputValueClass(IndexValue.class);
+ job.setPartitionerClass(IndexKeyPartitioner.class);
+ job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
+
+ BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
+
+ boolean success = job.waitForCompletion(true);
+ Counters counters = job.getCounters();
+ LOG.info("Counters [" + counters + "]");
+ return success;
+ }
+
+ private boolean runMrOnly(TableDescriptor descriptor, List<Path> inprogressPathList, String table, Path fileCache,
+ Path outputPath, int reducerMultipler) throws IOException, ClassNotFoundException, InterruptedException {
+ Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
+ Path tablePath = new Path(descriptor.getTableUri());
+ BlurInputFormat.setLocalCachePath(job, fileCache);
+ BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
+ MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, MapperForExistingDataMod.class);
+
+ for (Path p : inprogressPathList) {
+ FileInputFormat.addInputPath(job, p);
+ MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class);
+ }
+
+ BlurOutputFormat.setOutputPath(job, outputPath);
+ BlurOutputFormat.setupJob(job, descriptor);
+
+ job.setReducerClass(UpdateReducer.class);
+ job.setMapOutputKeyClass(IndexKey.class);
+ job.setMapOutputValueClass(IndexValue.class);
+ job.setPartitionerClass(IndexKeyPartitioner.class);
+ job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
+
+ BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
+
+ boolean success = job.waitForCompletion(true);
+ Counters counters = job.getCounters();
+ LOG.info("Counters [" + counters + "]");
+ return success;
+ }
+
+ private PartitionedInputResult buildPartitionedInputData(String uuid, Path tmpPath, TableDescriptor descriptor,
+ List<Path> inprogressPathList, String snapshot, Path fileCachePath) throws IOException, ClassNotFoundException,
+ InterruptedException {
+ Job job = Job.getInstance(getConf(), "Partitioning data for table [" + descriptor.getName() + "]");
+ job.getConfiguration().set(BLUR_UPDATE_ID, uuid);
+
+ // Needed for the bloom filter path information.
+ BlurOutputFormat.setTableDescriptor(job, descriptor);
+ BlurInputFormat.setLocalCachePath(job, fileCachePath);
+ MapperForExistingDataWithIndexLookup.setSnapshot(job, snapshot);
+
+ for (Path p : inprogressPathList) {
+ FileInputFormat.addInputPath(job, p);
+ }
+ Path outputPath = new Path(tmpPath, UUID.randomUUID().toString());
+ job.setJarByClass(getClass());
+ job.setMapperClass(LookupBuilderMapper.class);
+ job.setReducerClass(LookupBuilderReducer.class);
+
+ int shardCount = descriptor.getShardCount();
+ job.setNumReduceTasks(shardCount);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(BooleanWritable.class);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ if (job.waitForCompletion(true)) {
+ return new PartitionedInputResult(outputPath, job.getCounters(), shardCount, job.getTaskReports(TaskType.REDUCE));
+ } else {
+ throw new IOException("Partitioning failed!");
+ }
+ }
+
+ private void waitForOtherSnapshotsToBeRemoved(Iface client, String table, String snapshot) throws BlurException,
+ TException, InterruptedException {
+ while (true) {
+ Map<String, List<String>> listSnapshots = client.listSnapshots(table);
+ boolean mrupdateSnapshots = false;
+ for (Entry<String, List<String>> e : listSnapshots.entrySet()) {
+ List<String> value = e.getValue();
+ if (value.contains(snapshot)) {
+ mrupdateSnapshots = true;
+ }
+ }
+ if (!mrupdateSnapshots) {
+ return;
+ } else {
+ LOG.info(snapshot + " Snapshot for table [{0}] already exists", table);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+ LOG.info("Retrying");
+ }
+ }
+ }
+
+ private List<Path> movePathList(FileSystem fileSystem, Path dstDir, List<Path> lst) throws IOException {
+ List<Path> result = new ArrayList<Path>();
+ for (Path src : lst) {
+ Path dst = new Path(dstDir, src.getName());
+ if (fileSystem.rename(src, dst)) {
+ LOG.info("Moving [{0}] to [{1}]", src, dst);
+ result.add(dst);
+ } else {
+ LOG.error("Could not move [{0}] to [{1}]", src, dst);
+ }
+ }
+ return result;
+ }
+
+}
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java
new file mode 100644
index 0000000..34d3e99
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java
@@ -0,0 +1,115 @@
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class HdfsConfigurationNamespaceMerge {
+
+ private static final String DFS_NAMESERVICES = "dfs.nameservices";
+ private static final Log LOG = LogFactory.getLog(HdfsConfigurationNamespaceMerge.class);
+
+ public static void main(String[] args) throws IOException {
+ Path p = new Path("./src/main/scripts/conf/hdfs");
+
+ Configuration configuration = mergeHdfsConfigs(p.getFileSystem(new Configuration()), p);
+
+ // configuration.writeXml(System.out);
+
+ Collection<String> nameServices = configuration.getStringCollection(DFS_NAMESERVICES);
+ for (String name : nameServices) {
+ Path path = new Path("hdfs://" + name + "/");
+ FileSystem fileSystem = path.getFileSystem(configuration);
+ FileStatus[] listStatus = fileSystem.listStatus(path);
+ for (FileStatus fileStatus : listStatus) {
+ System.out.println(fileStatus.getPath());
+ }
+ }
+ }
+
+ private static boolean checkHostName(String host) {
+ try {
+ InetAddress.getAllByName(host);
+ return true;
+ } catch (UnknownHostException e) {
+ LOG.warn("Host not found [" + host + "]");
+ return false;
+ }
+ }
+
+ public static Configuration mergeHdfsConfigs(FileSystem fs, Path p) throws IOException {
+ List<Configuration> configList = new ArrayList<Configuration>();
+ gatherConfigs(fs, p, configList);
+ return merge(configList);
+ }
+
+ public static Configuration merge(List<Configuration> configList) throws IOException {
+ Configuration merge = new Configuration(false);
+ Set<String> nameServices = new HashSet<String>();
+ for (Configuration configuration : configList) {
+ String nameService = configuration.get(DFS_NAMESERVICES);
+ if (nameServices.contains(nameService)) {
+ throw new IOException("Multiple confs define namespace [" + nameService + "]");
+ }
+ nameServices.add(nameService);
+ if (shouldAdd(configuration, nameService)) {
+ for (Entry<String, String> e : configuration) {
+ String key = e.getKey();
+ if (key.contains(nameService)) {
+ String value = e.getValue();
+ merge.set(key, value);
+ }
+ }
+ }
+ }
+ merge.set(DFS_NAMESERVICES, StringUtils.join(nameServices, ","));
+ return merge;
+ }
+
+ private static boolean shouldAdd(Configuration configuration, String nameService) {
+ for (Entry<String, String> e : configuration) {
+ String key = e.getKey();
+ if (key.contains(nameService) && key.startsWith("dfs.namenode.rpc-address.")) {
+ return checkHostName(getHost(e.getValue()));
+ }
+ }
+ return false;
+ }
+
+ private static String getHost(String host) {
+ return host.substring(0, host.indexOf(":"));
+ }
+
+ public static void gatherConfigs(FileSystem fs, Path p, List<Configuration> configList) throws IOException {
+ if (fs.isFile(p)) {
+ if (p.getName().endsWith(".xml")) {
+ LOG.info("Loading file [" + p + "]");
+ Configuration configuration = new Configuration(false);
+ configuration.addResource(p);
+ configList.add(configuration);
+ } else {
+ LOG.info("Skipping file [" + p + "]");
+ }
+ } else {
+ FileStatus[] listStatus = fs.listStatus(p);
+ for (FileStatus fileStatus : listStatus) {
+ gatherConfigs(fs, fileStatus.getPath(), configList);
+ }
+ }
+ }
+
+}
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java
new file mode 100644
index 0000000..e295073
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java
@@ -0,0 +1,133 @@
+package org.apache.blur.mapreduce.lib.update;
+
+import org.apache.blur.utils.ShardUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+public class InputSplitPruneUtil {
+
+ private static final String BLUR_LOOKUP_ROWID_UPDATE_FROM_NEW_DATA_COUNT_PREFIX = "blur.lookup.rowid.update.from.new.data.count";
+ private static final String BLUR_LOOKUP_ROWID_FROM_NEW_DATA_COUNT_PREFIX = "blur.lookup.rowid.from.new.data.count.";
+ private static final String BLUR_LOOKUP_ROWID_FROM_INDEX_COUNT_PREFIX = "blur.lookup.rowid.from.index.count.";
+
+ private static final String BLUR_LOOKUP_TABLE = "blur.lookup.table";
+ private static final String BLUR_LOOKUP_RATIO_PER_SHARD = "blur.lookup.ratio.per.shard";
+ private static final String BLUR_LOOKUP_MAX_TOTAL_PER_SHARD = "blur.lookup.max.total.per.shard";
+
+ private static final double DEFAULT_LOOKUP_RATIO = 0.5;
+ private static final long DEFAULT_LOOKUP_MAX_TOTAL = Long.MAX_VALUE;
+
+ public static boolean shouldLookupExecuteOnShard(Configuration configuration, String table, int shard) {
+ double lookupRatio = getLookupRatio(configuration);
+ long maxLookupCount = getMaxLookupCount(configuration);
+ long rowIdFromNewDataCount = getBlurLookupRowIdFromNewDataCount(configuration, table, shard);
+ long rowIdUpdateFromNewDataCount = getBlurLookupRowIdUpdateFromNewDataCount(configuration, table, shard);
+ long rowIdFromIndexCount = getBlurLookupRowIdFromIndexCount(configuration, table, shard);
+ return shouldLookupRun(rowIdFromIndexCount, rowIdFromNewDataCount, rowIdUpdateFromNewDataCount, lookupRatio,
+ maxLookupCount);
+ }
+
+ private static boolean shouldLookupRun(long rowIdFromIndexCount, long rowIdFromNewDataCount,
+ long rowIdUpdateFromNewDataCount, double lookupRatio, long maxLookupCount) {
+ if (rowIdUpdateFromNewDataCount > maxLookupCount) {
+ return false;
+ }
+ double d = (double) rowIdUpdateFromNewDataCount / (double) rowIdFromIndexCount;
+ if (d <= lookupRatio) {
+ return true;
+ }
+ return false;
+ }
+
+ public static double getLookupRatio(Configuration configuration) {
+ return configuration.getDouble(BLUR_LOOKUP_RATIO_PER_SHARD, DEFAULT_LOOKUP_RATIO);
+ }
+
+ private static long getMaxLookupCount(Configuration configuration) {
+ return configuration.getLong(BLUR_LOOKUP_MAX_TOTAL_PER_SHARD, DEFAULT_LOOKUP_MAX_TOTAL);
+ }
+
+ public static void setTable(Job job, String table) {
+ setTable(job.getConfiguration(), table);
+ }
+
+ public static void setTable(Configuration configuration, String table) {
+ configuration.set(BLUR_LOOKUP_TABLE, table);
+ }
+
+ public static String getTable(Configuration configuration) {
+ return configuration.get(BLUR_LOOKUP_TABLE);
+ }
+
+ public static String getBlurLookupRowIdFromIndexCountName(String table) {
+ return BLUR_LOOKUP_ROWID_FROM_INDEX_COUNT_PREFIX + table;
+ }
+
+ public static String getBlurLookupRowIdFromNewDataCountName(String table) {
+ return BLUR_LOOKUP_ROWID_FROM_NEW_DATA_COUNT_PREFIX + table;
+ }
+
+ public static String getBlurLookupRowIdUpdateFromNewDataCountName(String table) {
+ return BLUR_LOOKUP_ROWID_UPDATE_FROM_NEW_DATA_COUNT_PREFIX + table;
+ }
+
+ public static long getBlurLookupRowIdUpdateFromNewDataCount(Configuration configuration, String table, int shard) {
+ String[] strings = configuration.getStrings(getBlurLookupRowIdUpdateFromNewDataCountName(table));
+ return getCount(strings, shard);
+ }
+
+ public static long getBlurLookupRowIdFromNewDataCount(Configuration configuration, String table, int shard) {
+ String[] strings = configuration.getStrings(getBlurLookupRowIdFromNewDataCountName(table));
+ return getCount(strings, shard);
+ }
+
+ public static long getBlurLookupRowIdFromIndexCount(Configuration configuration, String table, int shard) {
+ String[] strings = configuration.getStrings(getBlurLookupRowIdFromIndexCountName(table));
+ return getCount(strings, shard);
+ }
+
+ public static void setBlurLookupRowIdFromNewDataCounts(Job job, String table, long[] counts) {
+ setBlurLookupRowIdFromNewDataCounts(job.getConfiguration(), table, counts);
+ }
+
+ public static void setBlurLookupRowIdFromNewDataCounts(Configuration configuration, String table, long[] counts) {
+ configuration.setStrings(getBlurLookupRowIdFromNewDataCountName(table), toStrings(counts));
+ }
+
+ public static void setBlurLookupRowIdUpdateFromNewDataCounts(Job job, String table, long[] counts) {
+ setBlurLookupRowIdUpdateFromNewDataCounts(job.getConfiguration(), table, counts);
+ }
+
+ public static void setBlurLookupRowIdUpdateFromNewDataCounts(Configuration configuration, String table, long[] counts) {
+ configuration.setStrings(getBlurLookupRowIdUpdateFromNewDataCountName(table), toStrings(counts));
+ }
+
+ public static void setBlurLookupRowIdFromIndexCounts(Job job, String table, long[] counts) {
+ setBlurLookupRowIdFromIndexCounts(job.getConfiguration(), table, counts);
+ }
+
+ public static void setBlurLookupRowIdFromIndexCounts(Configuration configuration, String table, long[] counts) {
+ configuration.setStrings(getBlurLookupRowIdFromIndexCountName(table), toStrings(counts));
+ }
+
+ public static long getCount(String[] strings, int shard) {
+ return Long.parseLong(strings[shard]);
+ }
+
+ public static int getShardFromDirectoryPath(Path path) {
+ return ShardUtil.getShardIndex(path.getName());
+ }
+
+ public static String[] toStrings(long[] counts) {
+ if (counts == null) {
+ return null;
+ }
+ String[] strs = new String[counts.length];
+ for (int i = 0; i < counts.length; i++) {
+ strs[i] = Long.toString(counts[i]);
+ }
+ return strs;
+ }
+
+}
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java
new file mode 100644
index 0000000..ac0d91f
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java
@@ -0,0 +1,18 @@
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class LookupBuilderMapper extends Mapper<Text, BlurRecord, Text, NullWritable> {
+
+ @Override
+ protected void map(Text key, BlurRecord value, Mapper<Text, BlurRecord, Text, NullWritable>.Context context)
+ throws IOException, InterruptedException {
+ context.write(new Text(value.getRowId()), NullWritable.get());
+ }
+
+}
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java
new file mode 100644
index 0000000..1983cae
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java
@@ -0,0 +1,165 @@
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy;
+import org.apache.blur.mapreduce.lib.BlurInputFormat;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.mapreduce.lib.update.MergeSortRowIdMatcher.Action;
+import org.apache.blur.store.BlockCacheDirectoryFactoryV2;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.ShardUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.store.Directory;
+
+import com.google.common.io.Closer;
+
+public class LookupBuilderReducer extends Reducer<Text, NullWritable, Text, BooleanWritable> {
+
+ public static final String BLUR_CACHE_DIR_TOTAL_BYTES = "blur.cache.dir.total.bytes";
+ private Counter _rowIds;
+ private Counter _rowIdsToUpdate;
+
+ private MergeSortRowIdMatcher _matcher;
+ private int _numberOfShardsInTable;
+ private Configuration _configuration;
+ private String _snapshot;
+ private Path _tablePath;
+ private Counter _rowIdsFromIndex;
+ private long _totalNumberOfBytes;
+ private Action _action;
+ private Closer _closer;
+ private Path _cachePath;
+ private String _table;
+ private Writer _writer;
+
+ @Override
+ protected void setup(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException,
+ InterruptedException {
+ _configuration = context.getConfiguration();
+ _rowIds = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA);
+ _rowIdsToUpdate = context.getCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA);
+ _rowIdsFromIndex = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX);
+ TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
+ _numberOfShardsInTable = tableDescriptor.getShardCount();
+ _tablePath = new Path(tableDescriptor.getTableUri());
+ _snapshot = MapperForExistingDataWithIndexLookup.getSnapshot(_configuration);
+ _totalNumberOfBytes = _configuration.getLong(BLUR_CACHE_DIR_TOTAL_BYTES, 128 * 1024 * 1024);
+ _cachePath = BlurInputFormat.getLocalCachePath(_configuration);
+ _table = tableDescriptor.getName();
+ _closer = Closer.create();
+ }
+
+ @Override
+ protected void reduce(Text rowId, Iterable<NullWritable> nothing,
+ Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException, InterruptedException {
+ if (_matcher == null) {
+ _matcher = getMergeSortRowIdMatcher(rowId, context);
+ }
+ if (_writer == null) {
+ _writer = getRowIdWriter(rowId, context);
+ }
+ _writer.append(rowId, NullWritable.get());
+ _rowIds.increment(1);
+ if (_action == null) {
+ _action = new Action() {
+ @Override
+ public void found(Text rowId) throws IOException {
+ _rowIdsToUpdate.increment(1);
+ try {
+ context.write(rowId, new BooleanWritable(true));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ };
+ }
+ _matcher.lookup(rowId, _action);
+ }
+
+ private Writer getRowIdWriter(Text rowId, Reducer<Text, NullWritable, Text, BooleanWritable>.Context context)
+ throws IOException {
+ BlurPartitioner blurPartitioner = new BlurPartitioner();
+ int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable);
+ String shardName = ShardUtil.getShardName(shard);
+ Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, shardName);
+ Configuration configuration = context.getConfiguration();
+ String uuid = configuration.get(FasterDriver.BLUR_UPDATE_ID);
+ Path tmpPath = new Path(cachePath, uuid + "_" + getAttemptString(context));
+ return _closer.register(MergeSortRowIdMatcher.createWriter(_configuration, tmpPath));
+ }
+
+ private String getAttemptString(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) {
+ TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+ return taskAttemptID.toString();
+ }
+
+ @Override
+ protected void cleanup(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException,
+ InterruptedException {
+ _closer.close();
+ }
+
+ private MergeSortRowIdMatcher getMergeSortRowIdMatcher(Text rowId,
+ Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException {
+ BlurPartitioner blurPartitioner = new BlurPartitioner();
+ int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable);
+ String shardName = ShardUtil.getShardName(shard);
+
+ Path shardPath = new Path(_tablePath, shardName);
+ HdfsDirectory hdfsDirectory = new HdfsDirectory(_configuration, shardPath);
+ SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(_configuration,
+ SnapshotIndexDeletionPolicy.getGenerationsPath(shardPath));
+ Long generation = policy.getGeneration(_snapshot);
+ if (generation == null) {
+ hdfsDirectory.close();
+ throw new IOException("Snapshot [" + _snapshot + "] not found in shard [" + shardPath + "]");
+ }
+
+ BlurConfiguration bc = new BlurConfiguration();
+ BlockCacheDirectoryFactoryV2 blockCacheDirectoryFactoryV2 = new BlockCacheDirectoryFactoryV2(bc,
+ _totalNumberOfBytes);
+ _closer.register(blockCacheDirectoryFactoryV2);
+ Directory dir = blockCacheDirectoryFactoryV2.newDirectory("table", "shard", hdfsDirectory, null);
+ List<IndexCommit> listCommits = DirectoryReader.listCommits(dir);
+ IndexCommit indexCommit = MapperForExistingDataWithIndexLookup.findIndexCommit(listCommits, generation, shardPath);
+ DirectoryReader reader = DirectoryReader.open(indexCommit);
+ _rowIdsFromIndex.setValue(getTotalNumberOfRowIds(reader));
+
+ Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, shardName);
+ return new MergeSortRowIdMatcher(dir, generation, _configuration, cachePath, context);
+ }
+
+ private long getTotalNumberOfRowIds(DirectoryReader reader) throws IOException {
+ long total = 0;
+ List<AtomicReaderContext> leaves = reader.leaves();
+ for (AtomicReaderContext context : leaves) {
+ AtomicReader atomicReader = context.reader();
+ Terms terms = atomicReader.terms(BlurConstants.ROW_ID);
+ long expectedInsertions = terms.size();
+ if (expectedInsertions < 0) {
+ return -1;
+ }
+ total += expectedInsertions;
+ }
+ return total;
+ }
+}
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java
new file mode 100644
index 0000000..bf86e19
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.mapreduce.lib.TableBlurRecord;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class MapperForExistingDataMod extends Mapper<Text, TableBlurRecord, IndexKey, IndexValue> {
+
+ private Counter _existingRecords;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ Counter counter = context.getCounter(BlurIndexCounter.INPUT_FORMAT_MAPPER);
+ counter.increment(1);
+ _existingRecords = context.getCounter(BlurIndexCounter.INPUT_FORMAT_EXISTING_RECORDS);
+ }
+
+ @Override
+ protected void map(Text key, TableBlurRecord value, Context context) throws IOException, InterruptedException {
+ BlurRecord blurRecord = value.getBlurRecord();
+ IndexKey oldDataKey = IndexKey.oldData(blurRecord.getRowId(), blurRecord.getRecordId());
+ context.write(oldDataKey, new IndexValue(blurRecord));
+ _existingRecords.increment(1L);
+ }
+
+}
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java
new file mode 100644
index 0000000..0e2fe66
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.store.BlockCacheDirectoryFactoryV2;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.FetchRecordResult;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.RowDocumentUtil;
+import org.apache.blur.utils.ShardUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+
+import com.google.common.io.Closer;
+
+public class MapperForExistingDataWithIndexLookup extends Mapper<Text, BooleanWritable, IndexKey, IndexValue> {
+
+ private static final Log LOG = LogFactory.getLog(MapperForExistingDataWithIndexLookup.class);
+
+ private static final String BLUR_SNAPSHOT = "blur.snapshot";
+ private Counter _existingRecords;
+ private Counter _rowLookup;
+ private BlurPartitioner _blurPartitioner;
+ private Path _tablePath;
+ private int _numberOfShardsInTable;
+ private Configuration _configuration;
+ private String _snapshot;
+
+ private int _indexShard = -1;
+ private DirectoryReader _reader;
+ private IndexSearcher _indexSearcher;
+ private long _totalNumberOfBytes;
+ private Closer _closer;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ Counter counter = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER);
+ counter.increment(1);
+
+ _configuration = context.getConfiguration();
+ _existingRecords = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER_EXISTING_RECORDS);
+ _rowLookup = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT);
+ _blurPartitioner = new BlurPartitioner();
+ TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
+ _numberOfShardsInTable = tableDescriptor.getShardCount();
+ _tablePath = new Path(tableDescriptor.getTableUri());
+ _snapshot = getSnapshot(_configuration);
+ _totalNumberOfBytes = _configuration.getLong(LookupBuilderReducer.BLUR_CACHE_DIR_TOTAL_BYTES, 128 * 1024 * 1024);
+ _closer = Closer.create();
+ }
+
+ @Override
+ protected void map(Text key, BooleanWritable value, Context context) throws IOException, InterruptedException {
+ if (value.get()) {
+ String rowId = key.toString();
+ LOG.debug("Looking up rowid [" + rowId + "]");
+ _rowLookup.increment(1);
+ IndexSearcher indexSearcher = getIndexSearcher(rowId);
+ Term term = new Term(BlurConstants.ROW_ID, rowId);
+ RowCollector collector = getCollector(context);
+ indexSearcher.search(new TermQuery(term), collector);
+ LOG.debug("Looking for rowid [" + rowId + "] has [" + collector.records + "] records");
+ }
+ }
+
+ @Override
+ protected void cleanup(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context) throws IOException,
+ InterruptedException {
+ _closer.close();
+ }
+
+ static class RowCollector extends Collector {
+
+ private AtomicReader reader;
+ private Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context _context;
+ private Counter _existingRecords;
+ int records;
+
+ RowCollector(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context, Counter existingRecords) {
+ _context = context;
+ _existingRecords = existingRecords;
+ }
+
+ @Override
+ public void setScorer(Scorer scorer) throws IOException {
+
+ }
+
+ @Override
+ public void setNextReader(AtomicReaderContext context) throws IOException {
+ reader = context.reader();
+ }
+
+ @Override
+ public void collect(int doc) throws IOException {
+ Document document = reader.document(doc);
+ FetchRecordResult result = RowDocumentUtil.getRecord(document);
+ String rowid = result.getRowid();
+ Record record = result.getRecord();
+ String recordId = record.getRecordId();
+ IndexKey oldDataKey = IndexKey.oldData(rowid, recordId);
+ try {
+ _context.write(oldDataKey, new IndexValue(toBlurRecord(rowid, record)));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ _existingRecords.increment(1L);
+ }
+
+ private BlurRecord toBlurRecord(String rowId, Record record) {
+ BlurRecord blurRecord = new BlurRecord();
+ blurRecord.setRowId(rowId);
+ blurRecord.setRecordId(record.getRecordId());
+ blurRecord.setFamily(record.getFamily());
+ List<Column> columns = record.getColumns();
+ for (Column column : columns) {
+ blurRecord.addColumn(column.getName(), column.getValue());
+ }
+ return blurRecord;
+ }
+
+ @Override
+ public boolean acceptsDocsOutOfOrder() {
+ return false;
+ }
+ }
+
+ private RowCollector getCollector(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context) {
+ return new RowCollector(context, _existingRecords);
+ }
+
+ private IndexSearcher getIndexSearcher(String rowId) throws IOException {
+ int shard = _blurPartitioner.getShard(rowId, _numberOfShardsInTable);
+ if (_indexSearcher != null) {
+ if (shard != _indexShard) {
+ throw new IOException("Input data is not partitioned correctly.");
+ }
+ return _indexSearcher;
+ } else {
+ _indexShard = shard;
+ Path shardPath = new Path(_tablePath, ShardUtil.getShardName(_indexShard));
+ HdfsDirectory hdfsDirectory = new HdfsDirectory(_configuration, shardPath);
+ SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(_configuration,
+ SnapshotIndexDeletionPolicy.getGenerationsPath(shardPath));
+ Long generation = policy.getGeneration(_snapshot);
+ if (generation == null) {
+ hdfsDirectory.close();
+ throw new IOException("Snapshot [" + _snapshot + "] not found in shard [" + shardPath + "]");
+ }
+
+ BlurConfiguration bc = new BlurConfiguration();
+ BlockCacheDirectoryFactoryV2 blockCacheDirectoryFactoryV2 = new BlockCacheDirectoryFactoryV2(bc,
+ _totalNumberOfBytes);
+ _closer.register(blockCacheDirectoryFactoryV2);
+ Directory dir = blockCacheDirectoryFactoryV2.newDirectory("table", "shard", hdfsDirectory, null);
+
+ List<IndexCommit> listCommits = DirectoryReader.listCommits(dir);
+ IndexCommit indexCommit = findIndexCommit(listCommits, generation, shardPath);
+ _reader = DirectoryReader.open(indexCommit);
+ return _indexSearcher = new IndexSearcher(_reader);
+ }
+ }
+
+ public static IndexCommit findIndexCommit(List<IndexCommit> listCommits, long generation, Path shardDir)
+ throws IOException {
+ for (IndexCommit commit : listCommits) {
+ if (commit.getGeneration() == generation) {
+ return commit;
+ }
+ }
+ throw new IOException("Generation [" + generation + "] not found in shard [" + shardDir + "]");
+ }
+
+ public static void setSnapshot(Job job, String snapshot) {
+ setSnapshot(job.getConfiguration(), snapshot);
+ }
+
+ public static void setSnapshot(Configuration configuration, String snapshot) {
+ configuration.set(BLUR_SNAPSHOT, snapshot);
+ }
+
+ public static String getSnapshot(Configuration configuration) {
+ return configuration.get(BLUR_SNAPSHOT);
+ }
+}
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java
new file mode 100644
index 0000000..d91d1f5
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+public class MapperForNewDataMod extends Mapper<Text, BlurRecord, IndexKey, IndexValue> {
+
+ private static final IndexValue EMPTY_RECORD = new IndexValue();
+ private long _timestamp;
+ private Counter _newRecords;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ InputSplit inputSplit = context.getInputSplit();
+ FileSplit fileSplit = getFileSplit(inputSplit);
+ Path path = fileSplit.getPath();
+ FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
+ FileStatus fileStatus = fileSystem.getFileStatus(path);
+ _timestamp = fileStatus.getModificationTime();
+ _newRecords = context.getCounter(BlurIndexCounter.NEW_RECORDS);
+ }
+
+ private FileSplit getFileSplit(InputSplit inputSplit) throws IOException {
+ if (inputSplit instanceof FileSplit) {
+ return (FileSplit) inputSplit;
+ }
+ if (inputSplit.getClass().getName().equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
+ try {
+ Field declaredField = inputSplit.getClass().getDeclaredField("inputSplit");
+ declaredField.setAccessible(true);
+ return getFileSplit((InputSplit) declaredField.get(inputSplit));
+ } catch (NoSuchFieldException e) {
+ throw new IOException(e);
+ } catch (SecurityException e) {
+ throw new IOException(e);
+ } catch (IllegalArgumentException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ } else {
+ throw new IOException("Unknown input split type [" + inputSplit + "] [" + inputSplit.getClass() + "]");
+ }
+ }
+
+ @Override
+ protected void map(Text key, BlurRecord blurRecord, Context context) throws IOException, InterruptedException {
+ IndexKey newDataKey = IndexKey.newData(blurRecord.getRowId(), blurRecord.getRecordId(), _timestamp);
+ context.write(newDataKey, new IndexValue(blurRecord));
+ _newRecords.increment(1L);
+
+ IndexKey newDataMarker = IndexKey.newDataMarker(blurRecord.getRowId());
+ context.write(newDataMarker, EMPTY_RECORD);
+ }
+
+}
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MergeSortRowIdMatcher.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MergeSortRowIdMatcher.java
new file mode 100644
index 0000000..f376274
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MergeSortRowIdMatcher.java
@@ -0,0 +1,372 @@
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.index.AtomicReaderUtil;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.store.hdfs.DirectoryDecorator;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HdfsBlockLocation;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.SegmentInfo;
+import org.apache.lucene.index.SegmentInfoPerCommit;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
+
+public class MergeSortRowIdMatcher {
+
+ private static final String DEL = ".del";
+ private static final Log LOG = LogFactory.getLog(MergeSortRowIdMatcher.class);
+ private static final Progressable NO_OP = new Progressable() {
+ @Override
+ public void progress() {
+
+ }
+ };
+ private static final long _10_SECONDS = TimeUnit.SECONDS.toNanos(10);
+
+ public interface Action {
+ void found(Text rowId) throws IOException;
+ }
+
+ private final MyReader[] _readers;
+ private final Configuration _configuration;
+ private final Path _cachePath;
+ private final IndexCommit _indexCommit;
+ private final Directory _directory;
+ private final Progressable _progressable;
+
+ private DirectoryReader _reader;
+
+ public MergeSortRowIdMatcher(Directory directory, long generation, Configuration configuration, Path cachePath)
+ throws IOException {
+ this(directory, generation, configuration, cachePath, null);
+ }
+
+ public MergeSortRowIdMatcher(Directory directory, long generation, Configuration configuration, Path cachePath,
+ Progressable progressable) throws IOException {
+ List<IndexCommit> listCommits = DirectoryReader.listCommits(directory);
+ _indexCommit = findIndexCommit(listCommits, generation);
+ _configuration = configuration;
+ _cachePath = cachePath;
+ _directory = directory;
+ _progressable = progressable == null ? NO_OP : progressable;
+ _readers = openReaders();
+ }
+
+ public void lookup(Text rowId, Action action) throws IOException {
+ if (lookup(rowId)) {
+ action.found(rowId);
+ }
+ }
+
+ private boolean lookup(Text rowId) throws IOException {
+ advanceReadersIfNeeded(rowId);
+ sortReaders();
+ return checkReaders(rowId);
+ }
+
+ private boolean checkReaders(Text rowId) {
+ for (MyReader reader : _readers) {
+ int compareTo = reader.getCurrentRowId().compareTo(rowId);
+ if (compareTo == 0) {
+ return true;
+ } else if (compareTo > 0) {
+ return false;
+ }
+ }
+ return false;
+ }
+
+ private void advanceReadersIfNeeded(Text rowId) throws IOException {
+ _progressable.progress();
+ for (MyReader reader : _readers) {
+ if (rowId.compareTo(reader.getCurrentRowId()) > 0) {
+ advanceReader(reader, rowId);
+ }
+ }
+ }
+
+ private void advanceReader(MyReader reader, Text rowId) throws IOException {
+ while (reader.next()) {
+ if (rowId.compareTo(reader.getCurrentRowId()) <= 0) {
+ return;
+ }
+ }
+ }
+
+ private static final Comparator<MyReader> COMP = new Comparator<MyReader>() {
+ @Override
+ public int compare(MyReader o1, MyReader o2) {
+ return o1.getCurrentRowId().compareTo(o2.getCurrentRowId());
+ }
+ };
+
+ private void sortReaders() {
+ Arrays.sort(_readers, COMP);
+ }
+
+ private MyReader[] openReaders() throws IOException {
+ Collection<SegmentKey> segmentKeys = getSegmentKeys();
+ MyReader[] readers = new MyReader[segmentKeys.size()];
+ int i = 0;
+ for (SegmentKey segmentKey : segmentKeys) {
+ readers[i++] = openReader(segmentKey);
+ }
+ return readers;
+ }
+
+ private MyReader openReader(SegmentKey segmentKey) throws IOException {
+ Path file = getCacheFilePath(segmentKey);
+ FileSystem fileSystem = _cachePath.getFileSystem(_configuration);
+ if (!fileSystem.exists(file)) {
+ createCacheFile(file, segmentKey);
+ }
+ Reader reader = new SequenceFile.Reader(_configuration, SequenceFile.Reader.file(file));
+ return new MyReader(reader);
+ }
+
+ private void createCacheFile(Path file, SegmentKey segmentKey) throws IOException {
+ LOG.info("Building cache for segment [{0}] to [{1}]", segmentKey, file);
+ Path tmpPath = getTmpWriterPath(file.getParent());
+ try (Writer writer = createWriter(_configuration, tmpPath)) {
+ DirectoryReader reader = getReader();
+ for (AtomicReaderContext context : reader.leaves()) {
+ SegmentReader segmentReader = AtomicReaderUtil.getSegmentReader(context.reader());
+ if (segmentReader.getSegmentName().equals(segmentKey.getSegmentName())) {
+ writeRowIds(writer, segmentReader);
+ break;
+ }
+ }
+ }
+ commitWriter(_configuration, file, tmpPath);
+ }
+
+ public static void commitWriter(Configuration configuration, Path file, Path tmpPath) throws IOException {
+ FileSystem fileSystem = tmpPath.getFileSystem(configuration);
+ LOG.info("Commit tmp [{0}] to file [{1}]", tmpPath, file);
+ if (!fileSystem.rename(tmpPath, file)) {
+ LOG.warn("Could not commit tmp file [{0}] to file [{1}]", tmpPath, file);
+ }
+ }
+
+ public static Path getTmpWriterPath(Path dir) {
+ return new Path(dir, UUID.randomUUID().toString() + ".tmp");
+ }
+
+ public static Writer createWriter(Configuration configuration, Path tmpPath) throws IOException {
+ return SequenceFile.createWriter(configuration, SequenceFile.Writer.file(tmpPath),
+ SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(NullWritable.class),
+ SequenceFile.Writer.compression(CompressionType.BLOCK, getCodec(configuration)));
+ }
+
+ private static CompressionCodec getCodec(Configuration configuration) {
+ if (ZlibFactory.isNativeZlibLoaded(configuration)) {
+ return new GzipCodec();
+ }
+ return new DeflateCodec();
+ }
+
+ private void writeRowIds(Writer writer, SegmentReader segmentReader) throws IOException {
+ Terms terms = segmentReader.terms(BlurConstants.ROW_ID);
+ if (terms == null) {
+ return;
+ }
+ TermsEnum termsEnum = terms.iterator(null);
+ BytesRef rowId;
+ long s = System.nanoTime();
+ while ((rowId = termsEnum.next()) != null) {
+ long n = System.nanoTime();
+ if (n + _10_SECONDS > s) {
+ _progressable.progress();
+ s = System.nanoTime();
+ }
+ writer.append(new Text(rowId.utf8ToString()), NullWritable.get());
+ }
+ }
+
+ private IndexCommit findIndexCommit(List<IndexCommit> listCommits, long generation) throws IOException {
+ for (IndexCommit commit : listCommits) {
+ if (commit.getGeneration() == generation) {
+ return commit;
+ }
+ }
+ throw new IOException("Generation [" + generation + "] not found.");
+ }
+
+ static class SegmentKey {
+
+ final String _segmentName;
+ final String _id;
+
+ SegmentKey(String segmentName, String id) throws IOException {
+ _segmentName = segmentName;
+ _id = id;
+ }
+
+ String getSegmentName() {
+ return _segmentName;
+ }
+
+ @Override
+ public String toString() {
+ return _id;
+ }
+ }
+
+ private DirectoryReader getReader() throws IOException {
+ if (_reader == null) {
+ _reader = DirectoryReader.open(_indexCommit);
+ }
+ return _reader;
+ }
+
+ private Collection<SegmentKey> getSegmentKeys() throws IOException {
+ List<SegmentKey> keys = new ArrayList<SegmentKey>();
+ SegmentInfos segmentInfos = new SegmentInfos();
+ segmentInfos.read(_directory, _indexCommit.getSegmentsFileName());
+ for (SegmentInfoPerCommit segmentInfoPerCommit : segmentInfos) {
+ String name = segmentInfoPerCommit.info.name;
+ String id = getId(segmentInfoPerCommit.info);
+ keys.add(new SegmentKey(name, id));
+ }
+ return keys;
+ }
+
+ private String getId(SegmentInfo si) throws IOException {
+ HdfsDirectory dir = getHdfsDirectory(si.dir);
+ Set<String> files = new TreeSet<String>(si.files());
+ return getId(_configuration, dir, files);
+ }
+
+ private static String getId(Configuration configuration, HdfsDirectory dir, Set<String> files) throws IOException {
+ long ts = 0;
+ String file = null;
+ for (String f : files) {
+ if (f.endsWith(DEL)) {
+ continue;
+ }
+ long fileModified = dir.getFileModified(f);
+ if (fileModified > ts) {
+ ts = fileModified;
+ file = f;
+ }
+ }
+
+ Path path = dir.getPath();
+ FileSystem fileSystem = path.getFileSystem(configuration);
+ Path realFile = new Path(path, file);
+ if (!fileSystem.exists(realFile)) {
+ realFile = dir.getRealFilePathFromSymlink(file);
+ if (!fileSystem.exists(realFile)) {
+ throw new IOException("Lucene file [" + file + "] for dir [" + path + "] can not be found.");
+ }
+ }
+ return getFirstBlockId(fileSystem, realFile);
+ }
+
+ public static String getIdForSingleSegmentIndex(Configuration configuration, Path indexPath) throws IOException {
+ HdfsDirectory dir = new HdfsDirectory(configuration, indexPath);
+ Set<String> files = new TreeSet<String>(Arrays.asList(dir.listAll()));
+ return getId(configuration, dir, files);
+ }
+
+ private static String getFirstBlockId(FileSystem fileSystem, Path realFile) throws IOException {
+ FileStatus fileStatus = fileSystem.getFileStatus(realFile);
+ BlockLocation[] locations = fileSystem.getFileBlockLocations(fileStatus, 0, 1);
+ HdfsBlockLocation location = (HdfsBlockLocation) locations[0];
+ LocatedBlock locatedBlock = location.getLocatedBlock();
+ ExtendedBlock block = locatedBlock.getBlock();
+ return toNiceString(block.getBlockId());
+ }
+
+ private static String toNiceString(long blockId) {
+ return "b" + blockId;
+ }
+
+ private static HdfsDirectory getHdfsDirectory(Directory dir) {
+ if (dir instanceof HdfsDirectory) {
+ return (HdfsDirectory) dir;
+ } else if (dir instanceof DirectoryDecorator) {
+ DirectoryDecorator dd = (DirectoryDecorator) dir;
+ return getHdfsDirectory(dd.getOriginalDirectory());
+ } else {
+ throw new RuntimeException("Unknown directory type.");
+ }
+ }
+
+ private Path getCacheFilePath(SegmentKey segmentKey) {
+ return new Path(_cachePath, segmentKey + ".seq");
+ }
+
+ static class MyReader {
+
+ final Reader _reader;
+ final Text _rowId = new Text();
+ boolean _finished = false;
+
+ public MyReader(Reader reader) {
+ _reader = reader;
+ }
+
+ public Text getCurrentRowId() {
+ return _rowId;
+ }
+
+ public boolean next() throws IOException {
+ if (_finished) {
+ return false;
+ }
+ if (_reader.next(_rowId)) {
+ return true;
+ }
+ _finished = true;
+ return false;
+ }
+
+ public boolean isFinished() {
+ return _finished;
+ }
+ }
+
+ public static Path getCachePath(Path cachePath, String table, String shardName) {
+ return new Path(new Path(cachePath, table), shardName);
+ }
+}
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedBlurInputFormat.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedBlurInputFormat.java
new file mode 100644
index 0000000..6ec2877
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedBlurInputFormat.java
@@ -0,0 +1,57 @@
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.blur.mapreduce.lib.BlurInputFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+public class PrunedBlurInputFormat extends BlurInputFormat {
+
+ private static final Log LOG = LogFactory.getLog(PrunedBlurInputFormat.class);
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException {
+ Path[] dirs = getInputPaths(context);
+ Configuration configuration = context.getConfiguration();
+ List<BlurInputSplit> splits = getSplits(configuration, dirs);
+ Map<Path, List<BlurInputSplit>> splitMap = new TreeMap<Path, List<BlurInputSplit>>();
+ for (BlurInputSplit split : splits) {
+ Path path = split.getDir();
+ String table = split.getTable().toString();
+ int shard = InputSplitPruneUtil.getShardFromDirectoryPath(path);
+ long rowIdUpdateFromNewDataCount = InputSplitPruneUtil.getBlurLookupRowIdUpdateFromNewDataCount(configuration,
+ table, shard);
+ long indexCount = InputSplitPruneUtil.getBlurLookupRowIdFromIndexCount(configuration, table, shard);
+ if (rowIdUpdateFromNewDataCount == 0 || indexCount == 0) {
+ LOG.info("Pruning id lookup input path [" + path + "] no overlapping ids.");
+ } else if (InputSplitPruneUtil.shouldLookupExecuteOnShard(configuration, table, shard)) {
+ LOG.info("Pruning blur input path [" + split.getDir() + "]");
+ } else {
+ LOG.debug("Keeping blur input path [" + split.getDir() + "]");
+ List<BlurInputSplit> list = splitMap.get(path);
+ if (list == null) {
+ splitMap.put(path, list = new ArrayList<BlurInputSplit>());
+ }
+ list.add(split);
+ }
+ }
+ List<InputSplit> result = new ArrayList<InputSplit>();
+ for (List<BlurInputSplit> lst : splitMap.values()) {
+ BlurInputSplitColletion blurInputSplitColletion = new BlurInputSplitColletion();
+ for (BlurInputSplit blurInputSplit : lst) {
+ blurInputSplitColletion.add(blurInputSplit);
+ }
+ result.add(blurInputSplitColletion);
+ }
+ return result;
+ }
+}
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedSequenceFileInputFormat.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedSequenceFileInputFormat.java
new file mode 100644
index 0000000..becebbd
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/PrunedSequenceFileInputFormat.java
@@ -0,0 +1,59 @@
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import com.google.common.base.Splitter;
+
+public class PrunedSequenceFileInputFormat<K, V> extends SequenceFileInputFormat<K, V> {
+
+ private static final Log LOG = LogFactory.getLog(PrunedSequenceFileInputFormat.class);
+
+ @Override
+ public List<InputSplit> getSplits(JobContext job) throws IOException {
+ List<InputSplit> splits = super.getSplits(job);
+ List<InputSplit> results = new ArrayList<InputSplit>();
+ Configuration configuration = job.getConfiguration();
+ String table = InputSplitPruneUtil.getTable(configuration);
+ for (InputSplit inputSplit : splits) {
+ FileSplit fileSplit = (FileSplit) inputSplit;
+ Path path = fileSplit.getPath();
+ LOG.debug("Getting shard index from path [" + path + "]");
+ String name = path.getName();
+ int shard = getShardIndex(name);
+ long rowIdUpdateFromNewDataCount = InputSplitPruneUtil.getBlurLookupRowIdUpdateFromNewDataCount(configuration,
+ table, shard);
+ long indexCount = InputSplitPruneUtil.getBlurLookupRowIdFromIndexCount(configuration, table, shard);
+ if (rowIdUpdateFromNewDataCount == 0 || indexCount == 0) {
+ LOG.info("Pruning id lookup input path [" + path + "] no overlapping ids.");
+ } else if (InputSplitPruneUtil.shouldLookupExecuteOnShard(configuration, table, shard)) {
+ LOG.debug("Keeping id lookup input path [" + path + "]");
+ results.add(inputSplit);
+ } else {
+ LOG.info("Pruning id lookup input path [" + path + "]");
+ }
+ }
+ return results;
+ }
+
+ private int getShardIndex(String name) {
+ // based on file format of "part-r-00000", etc
+ Iterable<String> split = Splitter.on('-').split(name);
+ List<String> parts = new ArrayList<String>();
+ for (String s : split) {
+ parts.add(s);
+ }
+ return Integer.parseInt(parts.get(2));
+ }
+
+}
diff --git a/blur-indexer/src/main/resources/blur-site.properties b/blur-indexer/src/main/resources/blur-site.properties
new file mode 100644
index 0000000..6b28452
--- /dev/null
+++ b/blur-indexer/src/main/resources/blur-site.properties
@@ -0,0 +1 @@
+blur.thrift.max.frame.size=131072000
\ No newline at end of file
diff --git a/blur-indexer/src/main/resources/program-log4j.xml b/blur-indexer/src/main/resources/program-log4j.xml
new file mode 100644
index 0000000..30c132b
--- /dev/null
+++ b/blur-indexer/src/main/resources/program-log4j.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ you under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="${BLUR_INDEXER_LOG_FILE}" />
+ <param name="DatePattern" value="'.'yyyyMMdd" />
+ <param name="Append" value="true" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%-5p %d{yyyyMMdd_HH:mm:ss:SSS_z} [%t] %c{2}: %m%n" />
+ </layout>
+ </appender>
+
+ <root>
+ <priority value="INFO" />
+ <appender-ref ref="FILE" />
+ </root>
+</log4j:configuration>
\ No newline at end of file
diff --git a/blur-indexer/src/main/resources/test-log4j.xml b/blur-indexer/src/main/resources/test-log4j.xml
new file mode 100644
index 0000000..bf705ca
--- /dev/null
+++ b/blur-indexer/src/main/resources/test-log4j.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ you under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%-5p %d{yyyyMMdd_HH:mm:ss:SSS_z} [%t] %c{2}: %m%n" />
+ </layout>
+ </appender>
+ <logger name="org.apache.hadoop">
+ <level value="ERROR" />
+ <appender-ref ref="console"/>
+ </logger>
+ <logger name="REQUEST_LOG" additivity="false">
+ <!-- Make value = "INFO"to enable -->
+ <level value="ERROR" />
+ <appender-ref ref="console"/>
+ </logger>
+
+ <logger name="RESPONSE_LOG" additivity="false">
+ <!-- Make value = "INFO"to enable -->
+ <level value="ERROR" />
+ <appender-ref ref="console"/>
+ </logger>
+
+ <logger name="LUCENE_WRITER_INFO_STREAM" additivity="false">
+ <!-- Make value = "INFO"to enable -->
+ <level value="ERROR" />
+ <appender-ref ref="console"/>
+ </logger>
+ <root>
+ <priority value="info" />
+ <appender-ref ref="console" />
+ </root>
+</log4j:configuration>
\ No newline at end of file