Merge branch '1.4' into 1.5
diff --git a/.gitignore b/.gitignore
index f0e4507..1121465 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,6 @@
+*~
.classpath
.project
.settings
+/lib
/target
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..7f58156
--- /dev/null
+++ b/README.md
@@ -0,0 +1,90 @@
+build the JAR (Note, you will need to download the accumulo src, build it, and install it into your maven repo before this will work)
+
+ mvn package
+
+download the JARs needed by pig
+
+ mvn dependency:copy-dependencies -DoutputDirectory=lib -DincludeArtifactIds=zookeeper,libthrift,accumulo-core,cloudtrace
+
+print the register statements we will need in pig
+
+ for JAR in lib/*.jar target/accumulo-pig-1.4.0.jar ;
+ do
+ echo register `pwd`/$JAR;
+ done
+
+Example output
+
+ register /home/developer/workspace/accumulo-pig/lib/accumulo-core-1.4.0.jar
+ register /home/developer/workspace/accumulo-pig/lib/cloudtrace-1.4.0.jar
+ register /home/developer/workspace/accumulo-pig/lib/libthrift-0.6.1.jar
+ register /home/developer/workspace/accumulo-pig/lib/zookeeper-3.3.1.jar
+ register /home/developer/workspace/accumulo-pig/target/accumulo-pig-1.4.0.jar
+
+Run Pig, copy the register statements above and paste them into the pig terminal. Then you can LOAD from and STORE into accumulo.
+
+ $ pig
+ 2012-03-02 08:15:25,808 [main] INFO org.apache.pig.Main - Logging error messages to: /home/developer/workspace/accumulo-pig/pig_1330694125807.log
+ 2012-03-02 08:15:25,937 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://127.0.0.1/
+ 2012-03-02 08:15:26,032 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: 127.0.0.1:9001
+ grunt> register /home/developer/workspace/accumulo-pig/lib/accumulo-core-1.4.0.jar
+ grunt> register /home/developer/workspace/accumulo-pig/lib/cloudtrace-1.4.0.jar
+ grunt> register /home/developer/workspace/accumulo-pig/lib/libthrift-0.6.1.jar
+ grunt> register /home/developer/workspace/accumulo-pig/lib/zookeeper-3.3.1.jar
+ grunt> register /home/developer/workspace/accumulo-pig/target/accumulo-pig-1.4.0.jar
+ grunt>
+ grunt> DATA = LOAD 'accumulo://webpage?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181&columns=f:cnt'
+ >> using org.apache.accumulo.pig.AccumuloStorage() AS (row, cf, cq, cv, ts, val);
+ grunt>
+ grunt> DATA2 = FOREACH DATA GENERATE row, cf, cq, cv, val;
+ grunt>
+ grunt> STORE DATA2 into 'accumulo://webpage_content?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181' using org.apache.accumulo.pig.AccumuloStorage();
+ 2012-03-02 08:18:44,090 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: UNKNOWN
+ 2012-03-02 08:18:44,093 [main] INFO org.apache.pig.newplan.logical.rules.ColumnPruneVisitor - Columns pruned for DATA: $4
+ 2012-03-02 08:18:44,108 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
+ 2012-03-02 08:18:44,110 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
+ 2012-03-02 08:18:44,110 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
+ 2012-03-02 08:18:44,117 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job
+ 2012-03-02 08:18:44,118 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
+ 2012-03-02 08:18:44,120 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - creating jar file Job7611629033341757288.jar
+ 2012-03-02 08:18:46,282 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - jar file Job7611629033341757288.jar created
+ 2012-03-02 08:18:46,286 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
+ 2012-03-02 08:18:46,375 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
+ 2012-03-02 08:18:46,876 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
+ 2012-03-02 08:18:46,878 [Thread-17] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
+ 2012-03-02 08:18:47,887 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201203020643_0001
+ 2012-03-02 08:18:47,887 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://127.0.0.1:50030/jobdetails.jsp?jobid=job_201203020643_0001
+ 2012-03-02 08:18:54,434 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 50% complete
+ 2012-03-02 08:18:57,484 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
+ 2012-03-02 08:18:57,485 [main] INFO org.apache.pig.tools.pigstats.SimplePigStats - Script Statistics:
+
+ HadoopVersion PigVersion UserId StartedAt FinishedAt Features
+ 0.20.2 0.9.2 developer 2012-03-02 08:18:44 2012-03-02 08:18:57 UNKNOWN
+
+ Success!
+
+ Job Stats (time in seconds):
+ JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs
+ job_201203020643_0001 1 0 3 3 3 0 0 0 DATA,DATA2 MAP_ONLY accumulo://webpage_content?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181,
+
+ Input(s):
+ Successfully read 288 records from: "accumulo://webpage?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181&columns=f:cnt"
+
+ Output(s):
+ Successfully stored 288 records in: "accumulo://webpage_content?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181"
+
+ Counters:
+ Total records written : 288
+ Total bytes written : 0
+ Spillable Memory Manager spill count : 0
+ Total bags proactively spilled: 0
+ Total records proactively spilled: 0
+
+ Job DAG:
+ job_201203020643_0001
+
+
+ 2012-03-02 08:18:57,492 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
+ grunt>
+
+
diff --git a/pom.xml b/pom.xml
index 249dcce..37f2841 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,7 +18,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-pig</artifactId>
- <version>1.4.4-SNAPSHOT</version>
+ <version>1.5.0-SNAPSHOT</version>
<dependencies>
<dependency>
@@ -32,20 +32,77 @@
<version>1.0.4</version>
</dependency>
<dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.16</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
- <dependency>
- <groupId>org.apache.accumulo</groupId>
- <artifactId>accumulo-core</artifactId>
- <version>1.4.4</version>
- </dependency>
+ <!-- Needed by Apache Pig -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>1.6</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ <version>1.5.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.9.0</version>
+ </dependency>
</dependencies>
-
+
+ <profiles>
+ <profile>
+ <id>assemble</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifest>
+ <mainClass />
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
</project>
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index d26cf40..cf152a7 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -21,9 +21,14 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.lib.util.ConfiguratorBase;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
@@ -185,26 +190,30 @@
conf = job.getConfiguration();
setLocationFromUri(location);
- if (!conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured", false)) {
- AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
- AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
- if (columnFamilyColumnQualifierPairs.size() > 0) {
- LOG.info("columns: " + columnFamilyColumnQualifierPairs);
- AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
+ if (!ConfiguratorBase.isConnectorInfoSet(AccumuloInputFormat.class, conf)) {
+ AccumuloInputFormat.setInputTableName(job, table);
+ AccumuloInputFormat.setScanAuthorizations(job, authorizations);
+ AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers);
+
+ try {
+ AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
+ } catch (AccumuloSecurityException e) {
+ throw new IOException(e);
}
- AccumuloInputFormat.setRanges(conf, Collections.singleton(new Range(start, end)));
+ if (columnFamilyColumnQualifierPairs.size() > 0) {
+ LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+ AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs);
+ }
+
+ AccumuloInputFormat.setRanges(job, Collections.singleton(new Range(start, end)));
configureInputFormat(conf);
}
}
- protected void configureInputFormat(Configuration conf) {
-
- }
+ protected void configureInputFormat(Configuration conf) {}
- protected void configureOutputFormat(Configuration conf) {
-
- }
+ protected void configureOutputFormat(Configuration conf) {}
@Override
public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
@@ -212,14 +221,10 @@
}
@Override
- public void setUDFContextSignature(String signature) {
-
- }
+ public void setUDFContextSignature(String signature) {}
/* StoreFunc methods */
- public void setStoreFuncUDFContextSignature(String signature) {
-
- }
+ public void setStoreFuncUDFContextSignature(String signature) {}
public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
return relativeToAbsolutePath(location, curDir);
@@ -229,13 +234,28 @@
conf = job.getConfiguration();
setLocationFromUri(location);
- if (!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured", false)) {
- AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
- AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
- AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
- AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
- AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
- configureOutputFormat(conf);
+ try {
+ if (!ConfiguratorBase.isConnectorInfoSet(AccumuloOutputFormat.class, conf)) {
+ BatchWriterConfig bwConfig = new BatchWriterConfig();
+ bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
+ bwConfig.setMaxMemory(maxMutationBufferSize);
+ bwConfig.setMaxWriteThreads(maxWriteThreads);
+ AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
+
+ AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
+ AccumuloOutputFormat.setDefaultTableName(job, table);
+ AccumuloOutputFormat.setCreateTables(job, true);
+
+ try {
+ AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
+ } catch (AccumuloSecurityException e) {
+ throw new IOException(e);
+ }
+
+ configureOutputFormat(conf);
+ }
+ } catch (java.lang.IllegalStateException e1) {
+ e1.printStackTrace();
}
}
@@ -267,6 +287,7 @@
}
public void cleanupOnFailure(String failure, Job job) {}
-
- public void cleanupOnSuccess(String location, Job job) {}
+
+ @Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {}
}
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 15b1c47..4de1618 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -61,23 +61,29 @@
@Override
public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
- Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
- Text cf = Utils.objToText(tuple.get(1));
- Text cq = Utils.objToText(tuple.get(2));
- if (tuple.size() > 4) {
- Text cv = Utils.objToText(tuple.get(3));
- Value val = new Value(Utils.objToBytes(tuple.get(4)));
- if (cv.getLength() == 0) {
- mut.put(cf, cq, val);
+ try {
+ Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
+ Text cf = Utils.objToText(tuple.get(1));
+ Text cq = Utils.objToText(tuple.get(2));
+
+ if (tuple.size() > 4) {
+ Text cv = Utils.objToText(tuple.get(3));
+ Value val = new Value(Utils.objToBytes(tuple.get(4)));
+ if (cv.getLength() == 0) {
+ mut.put(cf, cq, val);
+ } else {
+ mut.put(cf, cq, new ColumnVisibility(cv), val);
+ }
} else {
- mut.put(cf, cq, new ColumnVisibility(cv), val);
+ Value val = new Value(Utils.objToBytes(tuple.get(3)));
+ mut.put(cf, cq, val);
}
- } else {
- Value val = new Value(Utils.objToBytes(tuple.get(3)));
- mut.put(cf, cq, val);
+
+ return Collections.singleton(mut);
+ } catch (IOException e) {
+ System.err.println("Error on Tuple: " + tuple);
+ throw e;
}
-
- return Collections.singleton(mut);
}
}
diff --git a/src/main/java/org/apache/accumulo/pig/Utils.java b/src/main/java/org/apache/accumulo/pig/Utils.java
index 199778a..4fecd99 100644
--- a/src/main/java/org/apache/accumulo/pig/Utils.java
+++ b/src/main/java/org/apache/accumulo/pig/Utils.java
@@ -25,6 +25,10 @@
}
public static byte[] objToBytes(Object o) {
+ if (o == null) {
+ return new byte[0];
+ }
+
if (o instanceof String) {
String str = (String) o;
return str.getBytes();