blob: 35a9a2e7dc2b15bc80c01db7428758335dc2d6c8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.avro.store;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Properties;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.gora.avro.query.AvroQuery;
import org.apache.gora.avro.query.AvroResult;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.query.impl.FileSplitPartitionQuery;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.store.impl.FileBackedDataStoreBase;
import org.apache.gora.util.OperationNotSupportedException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
/**
* An adapter DataStore for binary-compatible Avro serializations.
* AvroDataStore supports Binary and JSON serializations.
* @param <T>
*/
public class AvroStore<K, T extends Persistent>
extends FileBackedDataStoreBase<K, T> implements Configurable {
/** The property key specifying avro encoder/decoder type to use. Can take values
* "BINARY" or "JSON". */
public static final String CODEC_TYPE_KEY = "codec.type";
/**
* The type of the avro Encoder/Decoder.
*/
public static enum CodecType {
/** Avro binary encoder */
BINARY,
/** Avro JSON encoder */
JSON,
}
private DatumReader<T> datumReader;
private DatumWriter<T> datumWriter;
private Encoder encoder;
private Decoder decoder;
private CodecType codecType = CodecType.JSON;
@Override
public void initialize(Class<K> keyClass, Class<T> persistentClass,
Properties properties) throws IOException {
super.initialize(keyClass, persistentClass, properties);
if(properties != null) {
if(this.codecType == null) {
String codecType = DataStoreFactory.findProperty(
properties, this, CODEC_TYPE_KEY, "BINARY");
this.codecType = CodecType.valueOf(codecType);
}
}
}
public void setCodecType(CodecType codecType) {
this.codecType = codecType;
}
public void setEncoder(Encoder encoder) {
this.encoder = encoder;
}
public void setDecoder(Decoder decoder) {
this.decoder = decoder;
}
public void setDatumReader(DatumReader<T> datumReader) {
this.datumReader = datumReader;
}
public void setDatumWriter(DatumWriter<T> datumWriter) {
this.datumWriter = datumWriter;
}
@Override
public void close() throws IOException {
super.close();
if(encoder != null) {
encoder.flush();
}
encoder = null;
decoder = null;
}
@Override
public boolean delete(K key) throws IOException {
throw new OperationNotSupportedException("delete is not supported for AvroStore");
}
@Override
public long deleteByQuery(Query<K, T> query) throws IOException {
throw new OperationNotSupportedException("delete is not supported for AvroStore");
}
/**
* Executes a normal Query reading the whole data. #execute() calls this function
* for non-PartitionQuery's.
*/
@Override
protected Result<K,T> executeQuery(Query<K,T> query) throws IOException {
return new AvroResult<K,T>(this, (AvroQuery<K,T>)query,
getDatumReader(), getDecoder());
}
/**
* Executes a PartitialQuery, reading the data between start and end.
*/
@Override
protected Result<K,T> executePartial(FileSplitPartitionQuery<K,T> query)
throws IOException {
throw new OperationNotSupportedException("Not yet implemented");
}
@Override
public void flush() throws IOException {
super.flush();
if(encoder != null)
encoder.flush();
}
@Override
public T get(K key, String[] fields) throws IOException {
throw new OperationNotSupportedException();
}
@Override
public AvroQuery<K,T> newQuery() {
return new AvroQuery<K,T>(this);
}
@Override
public void put(K key, T obj) throws IOException {
getDatumWriter().write(obj, getEncoder());
}
public Encoder getEncoder() throws IOException {
if(encoder == null) {
encoder = createEncoder();
}
return encoder;
}
public Decoder getDecoder() throws IOException {
if(decoder == null) {
decoder = createDecoder();
}
return decoder;
}
public DatumReader<T> getDatumReader() {
if(datumReader == null) {
datumReader = createDatumReader();
}
return datumReader;
}
public DatumWriter<T> getDatumWriter() {
if(datumWriter == null) {
datumWriter = createDatumWriter();
}
return datumWriter;
}
protected Encoder createEncoder() throws IOException {
switch(codecType) {
case BINARY:
return new BinaryEncoder(getOrCreateOutputStream());
case JSON:
return new JsonEncoder(schema, getOrCreateOutputStream());
}
return null;
}
@SuppressWarnings("deprecation")
protected Decoder createDecoder() throws IOException {
switch(codecType) {
case BINARY:
return new BinaryDecoder(getOrCreateInputStream());
case JSON:
return new JsonDecoder(schema, getOrCreateInputStream());
}
return null;
}
protected DatumWriter<T> createDatumWriter() {
return new SpecificDatumWriter<T>(schema);
}
protected DatumReader<T> createDatumReader() {
return new SpecificDatumReader<T>(schema);
}
@Override
public Configuration getConf() {
if(conf == null) {
conf = new Configuration();
}
return conf;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
}
@Override
public String getSchemaName() {
return "default";
}
}