blob: d0185a122eca883aaa9a8ae2da8cf808057e3903 [file] [log] [blame]
package org.apache.rya.accumulo.pig;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.client.mapreduce.lib.util.ConfiguratorBase;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.util.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.LoadFunc;
import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
/**
* A LoadStoreFunc for retrieving data from and storing data to Accumulo
* <p/>
* A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a long.
* <p/>
* Tuples can be written in 2 forms:
* (key, colfam, colqual, colvis, value)
* OR
* (key, colfam, colqual, value)
*/
public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, OrderedLoadFunc {
private static final Log logger = LogFactory.getLog(AccumuloStorage.class);
protected Configuration conf;
protected RecordReader<Key, Value> reader;
protected RecordWriter<Text, Mutation> writer;
protected String inst;
protected String zookeepers;
protected String user;
protected String userP;
protected String table;
protected Text tableName;
protected String auths;
protected Authorizations authorizations;
private final List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text, Text>>();
protected Collection<Range> ranges = new ArrayList<Range>();
protected boolean mock = false;
public AccumuloStorage() {
}
@Override
public Tuple getNext() throws IOException {
try {
// load the next pair
if (!reader.nextKeyValue()) {
logger.info("Reached end of results");
return null;
}
final Key key = reader.getCurrentKey();
final Value value = reader.getCurrentValue();
assert key != null && value != null;
if (logger.isTraceEnabled()) {
logger.trace("Found key[" + key + "] and value[" + value + "]");
}
// and wrap it in a tuple
final Tuple tuple = TupleFactory.getInstance().newTuple(6);
tuple.set(0, new DataByteArray(key.getRow().getBytes()));
tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
tuple.set(4, key.getTimestamp());
tuple.set(5, new DataByteArray(value.get()));
if (logger.isTraceEnabled()) {
logger.trace("Output tuple[" + tuple + "]");
}
return tuple;
} catch (final InterruptedException e) {
throw new IOException(e.getMessage());
}
}
@Override
public InputFormat getInputFormat() {
return new AccumuloInputFormat();
}
@Override
public void prepareToRead(final RecordReader reader, final PigSplit split) {
this.reader = reader;
}
@Override
public void setLocation(final String location, final Job job) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("Set Location[" + location + "] for job[" + job.getJobName() + "]");
}
conf = job.getConfiguration();
setLocationFromUri(location, job);
if (!ConfiguratorBase.isConnectorInfoSet(AccumuloInputFormat.class, conf)) {
try {
AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(userP.getBytes(StandardCharsets.UTF_8)));
} catch (final AccumuloSecurityException e) {
throw new RuntimeException(e);
}
AccumuloInputFormat.setInputTableName(job, table);
AccumuloInputFormat.setScanAuthorizations(job, authorizations);
if (!mock) {
AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers);
} else {
AccumuloInputFormat.setMockInstance(job, inst);
}
}
if (columnFamilyColumnQualifierPairs.size() > 0) {
AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs);
}
logger.info("Set ranges[" + ranges + "] for job[" + job.getJobName() + "] on table[" + table + "] " +
"for columns[" + columnFamilyColumnQualifierPairs + "] with authorizations[" + authorizations + "]");
if (ranges.size() == 0) {
throw new IOException("Accumulo Range must be specified");
}
AccumuloInputFormat.setRanges(job, ranges);
}
protected void setLocationFromUri(final String uri, final Job job) throws IOException {
// ex: accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&range=a|z&range=1|9&mock=true
try {
if (!uri.startsWith("accumulo://")) {
throw new Exception("Bad scheme.");
}
final String[] urlParts = uri.split("\\?");
setLocationFromUriParts(urlParts);
} catch (final Exception e) {
throw new IOException("Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&[range=startRow|endRow[...],columns=[cf1|cq1,cf2|cq2,...]],mock=true(false)]': " + e.getMessage(), e);
}
}
protected void setLocationFromUriParts(final String[] urlParts) {
String columns = "";
if (urlParts.length > 1) {
for (final String param : urlParts[1].split("&")) {
final String[] pair = param.split("=");
if (pair[0].equals("instance")) {
inst = pair[1];
} else if (pair[0].equals("user")) {
user = pair[1];
} else if (pair[0].equals("password")) {
userP = pair[1];
} else if (pair[0].equals("zookeepers")) {
zookeepers = pair[1];
} else if (pair[0].equals("auths")) {
auths = pair[1];
} else if (pair[0].equals("columns")) {
columns = pair[1];
} else if (pair[0].equals("range")) {
final String[] r = pair[1].split("\\|");
if (r.length == 2) {
addRange(new Range(r[0], r[1]));
} else {
addRange(new Range(r[0]));
}
} else if (pair[0].equals("mock")) {
this.mock = Boolean.parseBoolean(pair[1]);
}
addLocationFromUriPart(pair);
}
}
final String[] parts = urlParts[0].split("/+");
table = parts[1];
tableName = new Text(table);
if (auths == null || auths.equals("")) {
authorizations = new Authorizations();
} else {
authorizations = new Authorizations(auths.split(","));
}
if (!columns.equals("")) {
for (final String cfCq : columns.split(",")) {
if (cfCq.contains("|")) {
final String[] c = cfCq.split("\\|");
final String cf = c[0];
final String cq = c[1];
addColumnPair(cf, cq);
} else {
addColumnPair(cfCq, null);
}
}
}
}
protected void addColumnPair(final String cf, final String cq) {
columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>((cf != null) ? new Text(cf) : null, (cq != null) ? new Text(cq) : null));
}
protected void addLocationFromUriPart(final String[] pair) {
}
protected void addRange(final Range range) {
ranges.add(range);
}
@Override
public String relativeToAbsolutePath(final String location, final Path curDir) throws IOException {
return location;
}
@Override
public void setUDFContextSignature(final String signature) {
}
/* StoreFunc methods */
@Override
public void setStoreFuncUDFContextSignature(final String signature) {
}
@Override
public String relToAbsPathForStoreLocation(final String location, final Path curDir) throws IOException {
return relativeToAbsolutePath(location, curDir);
}
@Override
public void setStoreLocation(final String location, final Job job) throws IOException {
conf = job.getConfiguration();
setLocationFromUri(location, job);
if (!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured", false)) {
try {
AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(userP.getBytes(StandardCharsets.UTF_8)));
} catch (final AccumuloSecurityException e) {
throw new RuntimeException(e);
}
AccumuloOutputFormat.setDefaultTableName(job, table);
AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
final BatchWriterConfig config = new BatchWriterConfig();
config.setMaxLatency(10, TimeUnit.SECONDS);
config.setMaxMemory(10 * 1000 * 1000);
config.setMaxWriteThreads(10);
AccumuloOutputFormat.setBatchWriterOptions(job, config);
}
}
@Override
public OutputFormat getOutputFormat() {
return new AccumuloOutputFormat();
}
@Override
public void checkSchema(final ResourceSchema schema) throws IOException {
// we don't care about types, they all get casted to ByteBuffers
}
@Override
public void prepareToWrite(final RecordWriter writer) {
this.writer = writer;
}
@Override
public void putNext(final Tuple t) throws ExecException, IOException {
final Mutation mut = new Mutation(objToText(t.get(0)));
final Text cf = objToText(t.get(1));
final Text cq = objToText(t.get(2));
if (t.size() > 4) {
final Text cv = objToText(t.get(3));
final Value val = new Value(objToBytes(t.get(4)));
if (cv.getLength() == 0) {
mut.put(cf, cq, val);
} else {
mut.put(cf, cq, new ColumnVisibility(cv), val);
}
} else {
final Value val = new Value(objToBytes(t.get(3)));
mut.put(cf, cq, val);
}
try {
writer.write(tableName, mut);
} catch (final InterruptedException e) {
throw new IOException(e);
}
}
private static Text objToText(final Object o) {
return new Text(objToBytes(o));
}
private static byte[] objToBytes(final Object o) {
if (o instanceof String) {
final String str = (String) o;
return str.getBytes(StandardCharsets.UTF_8);
} else if (o instanceof Long) {
final Long l = (Long) o;
return l.toString().getBytes(StandardCharsets.UTF_8);
} else if (o instanceof Integer) {
final Integer l = (Integer) o;
return l.toString().getBytes(StandardCharsets.UTF_8);
} else if (o instanceof Boolean) {
final Boolean l = (Boolean) o;
return l.toString().getBytes(StandardCharsets.UTF_8);
} else if (o instanceof Float) {
final Float l = (Float) o;
return l.toString().getBytes(StandardCharsets.UTF_8);
} else if (o instanceof Double) {
final Double l = (Double) o;
return l.toString().getBytes(StandardCharsets.UTF_8);
}
// TODO: handle DataBag, Map<Object, Object>, and Tuple
return ((DataByteArray) o).get();
}
@Override
public void cleanupOnFailure(final String failure, final Job job) {
}
@Override
public WritableComparable<?> getSplitComparable(final InputSplit inputSplit) throws IOException {
//cannot get access to the range directly
final AccumuloInputFormat.RangeInputSplit rangeInputSplit = (AccumuloInputFormat.RangeInputSplit) inputSplit;
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos);
rangeInputSplit.write(out);
out.close();
final DataInputStream stream = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
final Range range = new Range();
range.readFields(stream);
stream.close();
return range;
}
}