| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.jena.riot.thrift; |
| |
| import java.io.OutputStream; |
| |
| import org.apache.jena.atlas.logging.Log; |
| import org.apache.jena.graph.Node; |
| import org.apache.jena.graph.Triple; |
| import org.apache.jena.riot.RiotException; |
| import org.apache.jena.riot.system.PrefixMap; |
| import org.apache.jena.riot.system.PrefixMapFactory; |
| import org.apache.jena.riot.system.StreamRDF; |
| import org.apache.jena.riot.thrift.wire.*; |
| import org.apache.jena.sparql.core.Quad; |
| import org.apache.thrift.TException; |
| import org.apache.thrift.protocol.TProtocol; |
| |
| /** Encode StreamRDF in Thrift. |
| * Usually used via {@link BinRDF} functions. |
| * |
| * @see Thrift2StreamRDF (for each RDF_StreamRow) for the reverse process. |
| */ |
| public class StreamRDF2Thrift implements StreamRDF, AutoCloseable |
| { |
| // No REPEAT support. |
| private final OutputStream out; |
| private final TProtocol protocol; |
| private PrefixMap pmap = PrefixMapFactory.create(); |
| private final boolean encodeValues; |
| |
| public StreamRDF2Thrift(OutputStream out) { |
| this(out, false); |
| } |
| |
| public StreamRDF2Thrift(OutputStream out, boolean encodeValues) { |
| this(TRDF.protocol(out), encodeValues); |
| } |
| |
| // public StreamRDF2Thrift(TProtocol out) { |
| // this(out, false); |
| // } |
| |
| public StreamRDF2Thrift(TProtocol out, boolean encodeValues) { |
| this.out = null; |
| this.protocol = out; |
| this.pmap = PrefixMapFactory.create(); |
| this.encodeValues = encodeValues; |
| } |
| |
| @Override |
| public void start() { } |
| |
| private final RDF_StreamRow tStreamRow = new RDF_StreamRow(); |
| |
| private final RDF_Triple ttriple = new RDF_Triple(); |
| private final RDF_Quad tquad = new RDF_Quad(); |
| |
| private final RDF_Term tsubject = new RDF_Term(); |
| private final RDF_Term tpredicate = new RDF_Term(); |
| private final RDF_Term tobject = new RDF_Term(); |
| private final RDF_Term tgraph = new RDF_Term(); |
| |
| @Override |
| public void triple(Triple triple) { |
| doTriple(triple.getSubject(), triple.getPredicate(), triple.getObject()); |
| } |
| |
| private void doTriple(Node subject, Node predicate, Node object) { |
| ThriftConvert.toThrift(subject, pmap, tsubject, encodeValues); |
| ThriftConvert.toThrift(predicate, pmap, tpredicate, encodeValues); |
| ThriftConvert.toThrift(object, pmap, tobject, encodeValues); |
| ttriple.setS(tsubject); |
| ttriple.setP(tpredicate); |
| ttriple.setO(tobject); |
| |
| tStreamRow.setTriple(ttriple); |
| try { tStreamRow.write(protocol); } |
| catch (TException e) { TRDF.exception(e); } |
| finally { |
| tStreamRow.clear(); |
| ttriple.clear(); |
| tsubject.clear(); |
| tpredicate.clear(); |
| tobject.clear(); |
| } |
| } |
| |
| @Override |
| public void quad(Quad quad) { |
| if ( quad.getGraph() == null || quad.isDefaultGraph() ) { |
| doTriple(quad.getSubject(), quad.getPredicate(), quad.getObject()); |
| return; |
| } |
| |
| ThriftConvert.toThrift(quad.getGraph(), pmap, tgraph, encodeValues); |
| ThriftConvert.toThrift(quad.getSubject(), pmap, tsubject, encodeValues); |
| ThriftConvert.toThrift(quad.getPredicate(), pmap, tpredicate, encodeValues); |
| ThriftConvert.toThrift(quad.getObject(), pmap, tobject, encodeValues); |
| |
| tquad.setG(tgraph); |
| tquad.setS(tsubject); |
| tquad.setP(tpredicate); |
| tquad.setO(tobject); |
| tStreamRow.setQuad(tquad); |
| |
| try { tStreamRow.write(protocol); } |
| catch (TException e) { TRDF.exception(e); } |
| finally { |
| tStreamRow.clear(); |
| tquad.clear(); |
| tgraph.clear(); |
| tsubject.clear(); |
| tpredicate.clear(); |
| tobject.clear(); |
| } |
| } |
| |
| @Override |
| public void base(String base) { |
| // Ignore. |
| } |
| |
| @Override |
| public void prefix(String prefix, String iri) { |
| try { pmap.add(prefix, iri); } |
| catch ( RiotException ex) { |
| Log.warn(this, "Prefix mapping error", ex); |
| } |
| RDF_PrefixDecl tprefix = new RDF_PrefixDecl(prefix, iri); |
| tStreamRow.setPrefixDecl(tprefix); |
| try { tStreamRow.write(protocol); } |
| catch (TException e) { TRDF.exception(e); } |
| tStreamRow.clear(); |
| } |
| |
| @Override |
| public void close() { |
| finish(); |
| } |
| |
| @Override |
| public void finish() { |
| TRDF.flush(protocol); |
| } |
| } |