blob: 4109d2c0f350298336d87d0378c8320744993118 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.riot.thrift;
import java.io.OutputStream ;
import org.apache.jena.atlas.logging.Log ;
import org.apache.jena.graph.Node ;
import org.apache.jena.graph.Triple ;
import org.apache.jena.riot.RiotException ;
import org.apache.jena.riot.system.PrefixMap ;
import org.apache.jena.riot.system.PrefixMapFactory ;
import org.apache.jena.riot.system.StreamRDF ;
import org.apache.jena.riot.thrift.wire.* ;
import org.apache.jena.sparql.core.Quad ;
import org.apache.thrift.TException ;
import org.apache.thrift.protocol.TProtocol ;
/** Encode StreamRDF in Thrift.
* Usually used via {@link BinRDF} functions.
*
* @see Thrift2StreamRDF (for each RDF_StreamRow) for the reverse process.
*/
public class StreamRDF2Thrift implements StreamRDF, AutoCloseable
{
// No REPEAT support.
private final OutputStream out ;
private final TProtocol protocol ;
private PrefixMap pmap = PrefixMapFactory.create() ;
private final boolean encodeValues ;
// public StreamRDF2Thrift(OutputStream out) {
// this(out, false) ;
// }
public StreamRDF2Thrift(OutputStream out, boolean encodeValues) {
this.out = out ;
this.protocol = TRDF.protocol(out) ;
this.encodeValues = encodeValues ;
}
// public StreamRDF2Thrift(TProtocol out) {
// this(out, false) ;
// }
public StreamRDF2Thrift(TProtocol out, boolean encodeValues) {
this.out = null ;
this.protocol = out ;
this.pmap = PrefixMapFactory.create() ;
this.encodeValues = encodeValues ;
}
@Override
public void start() { }
private final RDF_StreamRow tStreamRow = new RDF_StreamRow() ;
private final RDF_Triple ttriple = new RDF_Triple() ;
private final RDF_Quad tquad = new RDF_Quad() ;
private final RDF_Term tsubject = new RDF_Term() ;
private final RDF_Term tpredicate = new RDF_Term() ;
private final RDF_Term tobject = new RDF_Term() ;
private final RDF_Term tgraph = new RDF_Term() ;
@Override
public void triple(Triple triple) {
doTriple(triple.getSubject(), triple.getPredicate(), triple.getObject()) ;
}
private void doTriple(Node subject, Node predicate, Node object) {
ThriftConvert.toThrift(subject, pmap, tsubject, encodeValues) ;
ThriftConvert.toThrift(predicate, pmap, tpredicate, encodeValues) ;
ThriftConvert.toThrift(object, pmap, tobject, encodeValues) ;
ttriple.setS(tsubject) ;
ttriple.setP(tpredicate) ;
ttriple.setO(tobject) ;
tStreamRow.setTriple(ttriple) ;
try { tStreamRow.write(protocol) ; }
catch (TException e) { TRDF.exception(e) ; }
tStreamRow.clear();
ttriple.clear();
tsubject.clear();
tpredicate.clear() ;
tobject.clear() ;
}
@Override
public void quad(Quad quad) {
if ( quad.getGraph() == null || quad.isDefaultGraph() ) {
doTriple(quad.getSubject(), quad.getPredicate(), quad.getObject()) ;
return ;
}
ThriftConvert.toThrift(quad.getGraph(), pmap, tgraph, encodeValues) ;
ThriftConvert.toThrift(quad.getSubject(), pmap, tsubject, encodeValues) ;
ThriftConvert.toThrift(quad.getPredicate(), pmap, tpredicate, encodeValues) ;
ThriftConvert.toThrift(quad.getObject(), pmap, tobject, encodeValues) ;
tquad.setG(tgraph) ;
tquad.setS(tsubject) ;
tquad.setP(tpredicate) ;
tquad.setO(tobject) ;
tStreamRow.setQuad(tquad) ;
try { tStreamRow.write(protocol) ; }
catch (TException e) { TRDF.exception(e) ; }
tStreamRow.clear() ;
tquad.clear();
tgraph.clear();
tsubject.clear();
tpredicate.clear() ;
tobject.clear() ;
}
@Override
public void base(String base) {
// Ignore.
}
@Override
public void prefix(String prefix, String iri) {
try { pmap.add(prefix, iri) ; }
catch ( RiotException ex) {
Log.warn(this, "Prefix mapping error", ex) ;
}
RDF_PrefixDecl tprefix = new RDF_PrefixDecl(prefix, iri) ;
tStreamRow.setPrefixDecl(tprefix) ;
try { tStreamRow.write(protocol) ; }
catch (TException e) { TRDF.exception(e) ; }
tStreamRow.clear();
}
@Override
public void close() {
finish() ;
}
@Override
public void finish() {
TRDF.flush(protocol) ;
}
}