Merge branch 'TIKA-4181-grpc' into tika-grpc-3x-features
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index 1ef13c5..c5e1347 100644
--- a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -200,38 +200,18 @@
}
Metadata tikaMetadata = new Metadata();
try {
- Map<String, Object> metadataJsonObject = new HashMap<>();
- if (!StringUtils.isBlank(request.getMetadataJson())) {
- try {
- metadataJsonObject = OBJECT_MAPPER.readValue(request.getMetadataJson(), new TypeReference<>() {});
- } catch (JsonProcessingException e) {
- metadataJsonObject = new HashMap<>();
- }
- }
- for (Map.Entry<String, Object> entry : metadataJsonObject.entrySet()) {
- if (entry.getValue() instanceof List) {
- List<Object> list = (List<Object>) entry.getValue();
- tikaMetadata.set(Property.externalText(entry.getKey()), list.stream()
- .map(String::valueOf)
- .collect(Collectors.toList())
- .toArray(new String[] {}));
- } else if (entry.getValue() instanceof String) {
- tikaMetadata.set(Property.externalText(entry.getKey()), (String) entry.getValue());
- } else if (entry.getValue() instanceof Integer) {
- tikaMetadata.set(Property.externalText(entry.getKey()), (Integer) entry.getValue());
- } else if (entry.getValue() instanceof Double) {
- tikaMetadata.set(Property.externalText(entry.getKey()), (Double) entry.getValue());
- } else if (entry.getValue() instanceof Float) {
- tikaMetadata.set(Property.externalText(entry.getKey()), (Float) entry.getValue());
- } else if (entry.getValue() instanceof Boolean) {
- tikaMetadata.set(Property.externalText(entry.getKey()), (Boolean) entry.getValue());
- }
- }
+ String metadataJson = request.getMetadataJson();
+ loadMetadata(metadataJson, tikaMetadata);
PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(request.getFetchKey(),
new FetchKey(fetcher.getName(), request.getFetchKey()), new EmitKey(), tikaMetadata,
- HandlerConfig.DEFAULT_HANDLER_CONFIG, FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+ HandlerConfig.DEFAULT_HANDLER_CONFIG, FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
FetchAndParseReply.Builder fetchReplyBuilder =
- FetchAndParseReply.newBuilder().setFetchKey(request.getFetchKey());
+ FetchAndParseReply.newBuilder()
+ .setFetchKey(request.getFetchKey())
+ .setStatus(pipesResult.getStatus().name());
+ if (pipesResult.getStatus().equals(PipesResult.STATUS.FETCH_EXCEPTION)) {
+ fetchReplyBuilder.setErrorMessage(pipesResult.getMessage());
+ }
if (pipesResult.getEmitData() != null && pipesResult.getEmitData().getMetadataList() != null) {
for (Metadata metadata : pipesResult.getEmitData().getMetadataList()) {
for (String name : metadata.names()) {
@@ -250,6 +230,36 @@
}
}
+ private static void loadMetadata(String metadataJson, Metadata tikaMetadata) throws JsonProcessingException {
+ Map<String, Object> metadataJsonObject = new HashMap<>();
+ if (!StringUtils.isBlank(metadataJson)) {
+ try {
+ metadataJsonObject = OBJECT_MAPPER.readValue(metadataJson, new TypeReference<>() {});
+ } catch (JsonProcessingException e) {
+ metadataJsonObject = new HashMap<>();
+ }
+ }
+ for (Map.Entry<String, Object> entry : metadataJsonObject.entrySet()) {
+ if (entry.getValue() instanceof List) {
+ List<Object> list = (List<Object>) entry.getValue();
+ tikaMetadata.set(Property.externalText(entry.getKey()), list.stream()
+ .map(String::valueOf)
+ .collect(Collectors.toList())
+ .toArray(new String[] {}));
+ } else if (entry.getValue() instanceof String) {
+ tikaMetadata.set(Property.externalText(entry.getKey()), (String) entry.getValue());
+ } else if (entry.getValue() instanceof Integer) {
+ tikaMetadata.set(Property.externalText(entry.getKey()), (Integer) entry.getValue());
+ } else if (entry.getValue() instanceof Double) {
+ tikaMetadata.set(Property.externalText(entry.getKey()), (Double) entry.getValue());
+ } else if (entry.getValue() instanceof Float) {
+ tikaMetadata.set(Property.externalText(entry.getKey()), (Float) entry.getValue());
+ } else if (entry.getValue() instanceof Boolean) {
+ tikaMetadata.set(Property.externalText(entry.getKey()), (Boolean) entry.getValue());
+ }
+ }
+ }
+
@SuppressWarnings("raw")
@Override
public void saveFetcher(SaveFetcherRequest request,
diff --git a/tika-pipes/tika-grpc/src/main/proto/tika.proto b/tika-pipes/tika-grpc/src/main/proto/tika.proto
index 18e2fd1..6b8ac4a 100644
--- a/tika-pipes/tika-grpc/src/main/proto/tika.proto
+++ b/tika-pipes/tika-grpc/src/main/proto/tika.proto
@@ -53,6 +53,8 @@
message FetchAndParseReply {
string fetch_key = 1;
map<string, string> fields = 2;
+ string status = 3;
+ string error_message = 4;
}
message DeleteFetcherRequest {
diff --git a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
index 2def77e..0b698cf 100644
--- a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
+++ b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
@@ -19,6 +19,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
import java.nio.charset.StandardCharsets;
@@ -32,6 +33,7 @@
import java.util.List;
import java.util.Locale;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.asarkar.grpc.test.GrpcCleanupExtension;
import com.asarkar.grpc.test.Resources;
@@ -62,13 +64,14 @@
import org.apache.tika.SaveFetcherReply;
import org.apache.tika.SaveFetcherRequest;
import org.apache.tika.TikaGrpc;
+import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
@ExtendWith(GrpcCleanupExtension.class)
public class TikaGrpcServerTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(TikaGrpcServerTest.class);
- public static final int NUM_TEST_DOCS = 50;
+ public static final int NUM_TEST_DOCS = 2;
static File tikaConfigXmlTemplate = Paths
.get("src", "test", "resources", "tika-pipes-test-config.xml")
.toFile();
@@ -208,28 +211,34 @@
.put("basePath", targetFolder)
.put("extractFileSystemMetadata", true)
.build()))
-
.build());
assertEquals(fetcherId, reply.getFetcherId());
- List<FetchAndParseReply> fetchAndParseReplys = Collections.synchronizedList(new ArrayList<>());
+ List<FetchAndParseReply> successes = Collections.synchronizedList(new ArrayList<>());
+ List<FetchAndParseReply> errors = Collections.synchronizedList(new ArrayList<>());
+ AtomicBoolean finished = new AtomicBoolean(false);
StreamObserver<FetchAndParseReply> replyStreamObserver = new StreamObserver<>() {
@Override
public void onNext(FetchAndParseReply fetchAndParseReply) {
LOG.debug("Fetched {} with metadata {}", fetchAndParseReply.getFetchKey(), fetchAndParseReply.getFieldsMap());
- fetchAndParseReplys.add(fetchAndParseReply);
+ if (PipesResult.STATUS.FETCH_EXCEPTION.name().equals(fetchAndParseReply.getStatus())) {
+ errors.add(fetchAndParseReply);
+ } else {
+ successes.add(fetchAndParseReply);
+ }
}
@Override
public void onError(Throwable throwable) {
- LOG.error("Fetched error found", throwable);
+ fail(throwable);
}
@Override
public void onCompleted() {
LOG.info("Stream completed");
+ finished.set(true);
}
};
@@ -253,9 +262,16 @@
.setFetchKey(testDocument.getAbsolutePath())
.build());
}
+ // Now test error condition
+ requestStreamObserver.onNext(FetchAndParseRequest
+ .newBuilder()
+ .setFetcherId(fetcherId)
+ .setFetchKey("does not exist")
+ .build());
requestStreamObserver.onCompleted();
-
- assertEquals(NUM_TEST_DOCS, fetchAndParseReplys.size());
+ assertEquals(NUM_TEST_DOCS, successes.size());
+ assertEquals(1, errors.size());
+ assertTrue(finished.get());
} finally {
FileUtils.deleteDirectory(testDocumentFolder);
}