blob: ee79e4ae75d3f519b0461b697badc61604e5f31b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.vocabulary.XMLSchema;
import org.eclipse.rdf4j.query.Binding;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.impl.ListBindingSet;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Serialize and deserialize a VisibilityBindingSet using Kyro lib. Great for exporting results of queries.
*
*/
public class KryoVisibilityBindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> {
private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
@Override
protected Kryo initialValue() {
final Kryo kryo = new Kryo();
return kryo;
};
};
private static final ValueFactory VF = SimpleValueFactory.getInstance();
/**
* Deserialize a VisibilityBindingSet using Kyro lib. Exporting results of queries.
* If you don't want to use Kyro, here is an alternative:
* return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null);
*
* @param topic
* ignored
* @param data
* serialized bytes
* @return deserialized instance of VisibilityBindingSet
*/
@Override
public VisibilityBindingSet deserialize(final String topic, final byte[] data) {
final KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
final Input input = new Input(new ByteArrayInputStream(data));
return internalSerializer.read(kryos.get(), input, VisibilityBindingSet.class);
}
/**
* Ignored. Nothing to configure.
*/
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
// Do nothing.
}
/**
* Serialize a VisibilityBindingSet using Kyro lib. Exporting results of queries.
* If you don't want to use Kyro, here is an alternative:
* return (new VisibilityBindingSetStringConverter()).convert(data, null).getBytes(StandardCharsets.UTF_8);
*
* @param topic
* ignored
* @param data
* serialize this instance
* @return Serialized form of VisibilityBindingSet
*/
@Override
public byte[] serialize(final String topic, final VisibilityBindingSet data) {
final KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final Output output = new Output(baos);
internalSerializer.write(kryos.get(), output, data);
output.flush();
final byte[] array = baos.toByteArray();
return array;
}
/**
* Ignored. Nothing to close.
*/
@Override
public void close() {
// Do nothing.
}
private static Value makeValue(final String valueString, final IRI typeURI) {
// Convert the String Value into a Value.
if (typeURI.equals(XMLSchema.ANYURI)) {
return VF.createIRI(valueString);
} else {
return VF.createLiteral(valueString, typeURI);
}
}
/**
* De/Serialize a visibility binding set using the Kryo library.
*
*/
private static class KryoInternalSerializer extends com.esotericsoftware.kryo.Serializer<VisibilityBindingSet> {
@Override
public void write(final Kryo kryo, final Output output, final VisibilityBindingSet visBindingSet) {
output.writeString(visBindingSet.getVisibility());
// write the number count for the reader.
output.writeInt(visBindingSet.size());
for (final Binding binding : visBindingSet) {
output.writeString(binding.getName());
final RyaType ryaValue = RdfToRyaConversions.convertValue(binding.getValue());
final String valueString = ryaValue.getData();
final IRI type = ryaValue.getDataType();
output.writeString(valueString);
output.writeString(type.toString());
}
}
@Override
public VisibilityBindingSet read(final Kryo kryo, final Input input, final Class<VisibilityBindingSet> aClass) {
final String visibility = input.readString();
final int bindingCount = input.readInt();
final ArrayList<String> namesList = new ArrayList<String>(bindingCount);
final ArrayList<Value> valuesList = new ArrayList<Value>(bindingCount);
for (int i = bindingCount; i > 0; i--) {
namesList.add(input.readString());
final String valueString = input.readString();
final IRI type = VF.createIRI(input.readString());
valuesList.add(makeValue(valueString, type));
}
final BindingSet bindingSet = new ListBindingSet(namesList, valuesList);
return new VisibilityBindingSet(bindingSet, visibility);
}
}
}