blob: e4012ca6924089392c6f33614a1ecce9adb2b3bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.riot.protobuf;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import org.apache.jena.atlas.io.IO;
import org.apache.jena.riot.protobuf.wire.PB_RDF.RDF_StreamRow;
import org.apache.jena.riot.system.PrefixMapFactory;
import org.apache.jena.riot.system.StreamRDF;
import org.apache.jena.riot.thrift.ThriftRDF;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.exec.RowSet;
import org.apache.jena.sparql.exec.RowSetStream;
/**
* Operations on binary RDF using <a href="https://developers.google.com/protocol-buffers">Google Protobuf</a>.
* <p>
* An encoding use Apache Thrift is available in {@link ThriftRDF}.
*/
public class ProtobufRDF {
/**
* Create an {@link StreamRDF} for output. A filename ending {@code .gz} will have
* a gzip compressor added to the output path. A filename of "-" is {@code System.out}.
* The file is closed when {@link StreamRDF#finish()} is called unless it is {@code System.out}.
* Call {@link StreamRDF#start()}...{@link StreamRDF#finish()}.
*
* @param filename The file
* @return StreamRDF A stream to send to.
*/
public static StreamRDF streamToFile(String filename) {
return streamToFile(filename, false) ;
}
/**
* Create an {@link StreamRDF} for output. A filenames ending {@code .gz} or {@code .bz2} will have
* the respective compressor added to the output path. A filename of "-" is {@code System.out}.
* The file is closed when {@link StreamRDF#finish()} is called unless it is {@code System.out}.
* Call {@link StreamRDF#start()}...{@link StreamRDF#finish()}.
*
* @param filename The file
* @param withValues - whether to encode numeric values as values.
* @return StreamRDF A stream to send to.
*/
public static StreamRDF streamToFile(String filename, boolean withValues) {
OutputStream out = IO.openOutputFile(filename) ;
out = IO.ensureBuffered(out);
return streamToOutputStream(out);
}
/**
* An {@link StreamRDF} that writes to an output stream.
* The output stream is written with "writeDelimitedTo".
*/
public static StreamRDF streamToOutputStream(OutputStream output) {
return streamToOutputStream(output, false);
}
/**
* An {@link StreamRDF} that writes to an output stream.
* The output stream is written with "writeDelimitedTo".
*/
public static StreamRDF streamToOutputStream(OutputStream output, boolean withValues) {
output = IO.ensureBuffered(output);
try {
return StreamRDF2Protobuf.createDelimited(output, withValues);
} finally { IO.flush(output); }
}
/**
* Decode the contents of the file and send to the {@link StreamRDF}.
* A filename ending {@code .gz} will have a gzip decompressor added.
* A filename of "-" is {@code System.in}.
* @param filename The file.
* @param dest Sink
*/
public static void fileToStream(String filename, StreamRDF dest) {
InputStream in = IO.openFileBuffered(filename) ;
inputStreamToStreamRDF(in, dest);
}
/**
* Read an input stream and send item to the {@link StreamRDF}. The input stream
* will be read with "parseDelimitedTo".
* @param input InputStream
* @param stream StreamRDF
*/
public static void inputStreamToStreamRDF(InputStream input, StreamRDF stream) {
Protobuf2StreamRDF visitor = new Protobuf2StreamRDF(PrefixMapFactory.create(), stream);
stream.start();
try {
apply(input, visitor);
}
finally { stream.finish(); }
}
/**
* Send the contents of a RDF-encoded protobuf file to an "action"
* @param input InputStream
* @param action Code to act on the row.
*/
public static void apply(InputStream input, VisitorStreamRowProtoRDF action) {
input = IO.ensureBuffered(input);
try {
while(true) {
RDF_StreamRow x = RDF_StreamRow.parseDelimitedFrom(input);
boolean b = PBufRDF.visit(x, action);
if ( !b )
return;
}
} catch(IOException ex) { IO.exception(ex); }
}
/**
* Return a streaming {@link RowSet} read from an input stream (with delimiters per row)
*/
public static RowSet readRowSet(InputStream input) {
Protobuf2Binding p2b = new Protobuf2Binding(input);
return RowSetStream.create(p2b.getVars(), p2b);
}
/**
* Write a {@link RowSet} to an output stream (with delimiters per row)
*/
public static void writeRowSet(OutputStream out, RowSet rowSet) {
writeRowSet(out, rowSet, false);
}
/**
* Write a {@link RowSet} to an output stream (with delimiters per row)
*/
public static void writeRowSet(OutputStream out, RowSet rowSet, boolean withValues) {
out = IO.ensureBuffered(out);
try {
List<Var> vars = rowSet.getResultVars();
try ( Binding2Protobuf b2p = new Binding2Protobuf(out, vars, false) ) {
rowSet.forEachRemaining(b2p::output);
}
} finally { IO.flush(out); }
}
}