Merge branch 'TIKA-4181-grpc' into tika-grpc-3x-features
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index 1ef13c5..c5e1347 100644
--- a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -200,38 +200,18 @@
         }
         Metadata tikaMetadata = new Metadata();
         try {
-            Map<String, Object> metadataJsonObject = new HashMap<>();
-            if (!StringUtils.isBlank(request.getMetadataJson())) {
-                try {
-                    metadataJsonObject = OBJECT_MAPPER.readValue(request.getMetadataJson(), new TypeReference<>() {});
-                } catch (JsonProcessingException e) {
-                    metadataJsonObject = new HashMap<>();
-                }
-            }
-            for (Map.Entry<String, Object> entry : metadataJsonObject.entrySet()) {
-                if (entry.getValue() instanceof List) {
-                    List<Object> list = (List<Object>) entry.getValue();
-                    tikaMetadata.set(Property.externalText(entry.getKey()), list.stream()
-                                                                                .map(String::valueOf)
-                                                                                .collect(Collectors.toList())
-                                                                                .toArray(new String[] {}));
-                } else if (entry.getValue() instanceof String) {
-                    tikaMetadata.set(Property.externalText(entry.getKey()), (String) entry.getValue());
-                } else if (entry.getValue() instanceof Integer) {
-                    tikaMetadata.set(Property.externalText(entry.getKey()), (Integer) entry.getValue());
-                } else if (entry.getValue() instanceof Double) {
-                    tikaMetadata.set(Property.externalText(entry.getKey()), (Double) entry.getValue());
-                } else if (entry.getValue() instanceof Float) {
-                    tikaMetadata.set(Property.externalText(entry.getKey()), (Float) entry.getValue());
-                } else if (entry.getValue() instanceof Boolean) {
-                    tikaMetadata.set(Property.externalText(entry.getKey()), (Boolean) entry.getValue());
-                }
-            }
+            String metadataJson = request.getMetadataJson();
+            loadMetadata(metadataJson, tikaMetadata);
             PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(request.getFetchKey(),
                     new FetchKey(fetcher.getName(), request.getFetchKey()), new EmitKey(), tikaMetadata,
-                        HandlerConfig.DEFAULT_HANDLER_CONFIG, FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+                    HandlerConfig.DEFAULT_HANDLER_CONFIG, FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
             FetchAndParseReply.Builder fetchReplyBuilder =
-                    FetchAndParseReply.newBuilder().setFetchKey(request.getFetchKey());
+                    FetchAndParseReply.newBuilder()
+                                      .setFetchKey(request.getFetchKey())
+                            .setStatus(pipesResult.getStatus().name());
+            if (pipesResult.getStatus().equals(PipesResult.STATUS.FETCH_EXCEPTION)) {
+                fetchReplyBuilder.setErrorMessage(pipesResult.getMessage());
+            }
             if (pipesResult.getEmitData() != null && pipesResult.getEmitData().getMetadataList() != null) {
                 for (Metadata metadata : pipesResult.getEmitData().getMetadataList()) {
                     for (String name : metadata.names()) {
@@ -250,6 +230,36 @@
         }
     }
 
+    private static void loadMetadata(String metadataJson, Metadata tikaMetadata) throws JsonProcessingException {
+        Map<String, Object> metadataJsonObject = new HashMap<>();
+        if (!StringUtils.isBlank(metadataJson)) {
+            try {
+                metadataJsonObject = OBJECT_MAPPER.readValue(metadataJson, new TypeReference<>() {});
+            } catch (JsonProcessingException e) {
+                metadataJsonObject = new HashMap<>();
+            }
+        }
+        for (Map.Entry<String, Object> entry : metadataJsonObject.entrySet()) {
+            if (entry.getValue() instanceof List) {
+                List<Object> list = (List<Object>) entry.getValue();
+                tikaMetadata.set(Property.externalText(entry.getKey()), list.stream()
+                                                                            .map(String::valueOf)
+                                                                            .collect(Collectors.toList())
+                                                                            .toArray(new String[] {}));
+            } else if (entry.getValue() instanceof String) {
+                tikaMetadata.set(Property.externalText(entry.getKey()), (String) entry.getValue());
+            } else if (entry.getValue() instanceof Integer) {
+                tikaMetadata.set(Property.externalText(entry.getKey()), (Integer) entry.getValue());
+            } else if (entry.getValue() instanceof Double) {
+                tikaMetadata.set(Property.externalText(entry.getKey()), (Double) entry.getValue());
+            } else if (entry.getValue() instanceof Float) {
+                tikaMetadata.set(Property.externalText(entry.getKey()), (Float) entry.getValue());
+            } else if (entry.getValue() instanceof Boolean) {
+                tikaMetadata.set(Property.externalText(entry.getKey()), (Boolean) entry.getValue());
+            }
+        }
+    }
+
     @SuppressWarnings("raw")
     @Override
     public void saveFetcher(SaveFetcherRequest request,
diff --git a/tika-pipes/tika-grpc/src/main/proto/tika.proto b/tika-pipes/tika-grpc/src/main/proto/tika.proto
index 18e2fd1..6b8ac4a 100644
--- a/tika-pipes/tika-grpc/src/main/proto/tika.proto
+++ b/tika-pipes/tika-grpc/src/main/proto/tika.proto
@@ -53,6 +53,8 @@
 message FetchAndParseReply {
   string fetch_key = 1;
   map<string, string> fields = 2;
+  string status = 3;
+  string error_message = 4;
 }
 
 message DeleteFetcherRequest {
diff --git a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
index 2def77e..0b698cf 100644
--- a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
+++ b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
@@ -19,6 +19,7 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
@@ -32,6 +33,7 @@
 import java.util.List;
 import java.util.Locale;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.asarkar.grpc.test.GrpcCleanupExtension;
 import com.asarkar.grpc.test.Resources;
@@ -62,13 +64,14 @@
 import org.apache.tika.SaveFetcherReply;
 import org.apache.tika.SaveFetcherRequest;
 import org.apache.tika.TikaGrpc;
+import org.apache.tika.pipes.PipesResult;
 import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
 
 @ExtendWith(GrpcCleanupExtension.class)
 public class TikaGrpcServerTest {
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final Logger LOG = LoggerFactory.getLogger(TikaGrpcServerTest.class);
-    public static final int NUM_TEST_DOCS = 50;
+    public static final int NUM_TEST_DOCS = 2;
     static File tikaConfigXmlTemplate = Paths
             .get("src", "test", "resources", "tika-pipes-test-config.xml")
             .toFile();
@@ -208,28 +211,34 @@
                         .put("basePath", targetFolder)
                         .put("extractFileSystemMetadata", true)
                         .build()))
-
                 .build());
 
         assertEquals(fetcherId, reply.getFetcherId());
 
-        List<FetchAndParseReply> fetchAndParseReplys = Collections.synchronizedList(new ArrayList<>());
+        List<FetchAndParseReply> successes = Collections.synchronizedList(new ArrayList<>());
+        List<FetchAndParseReply> errors = Collections.synchronizedList(new ArrayList<>());
+        AtomicBoolean finished = new AtomicBoolean(false);
 
         StreamObserver<FetchAndParseReply> replyStreamObserver = new StreamObserver<>() {
             @Override
             public void onNext(FetchAndParseReply fetchAndParseReply) {
                 LOG.debug("Fetched {} with metadata {}", fetchAndParseReply.getFetchKey(), fetchAndParseReply.getFieldsMap());
-                fetchAndParseReplys.add(fetchAndParseReply);
+                if (PipesResult.STATUS.FETCH_EXCEPTION.name().equals(fetchAndParseReply.getStatus())) {
+                    errors.add(fetchAndParseReply);
+                } else {
+                    successes.add(fetchAndParseReply);
+                }
             }
 
             @Override
             public void onError(Throwable throwable) {
-                LOG.error("Fetched error found", throwable);
+                fail(throwable);
             }
 
             @Override
             public void onCompleted() {
                 LOG.info("Stream completed");
+                finished.set(true);
             }
         };
 
@@ -253,9 +262,16 @@
                         .setFetchKey(testDocument.getAbsolutePath())
                         .build());
             }
+            // Now test error condition
+            requestStreamObserver.onNext(FetchAndParseRequest
+                    .newBuilder()
+                    .setFetcherId(fetcherId)
+                    .setFetchKey("does not exist")
+                    .build());
             requestStreamObserver.onCompleted();
-
-            assertEquals(NUM_TEST_DOCS, fetchAndParseReplys.size());
+            assertEquals(NUM_TEST_DOCS, successes.size());
+            assertEquals(1, errors.size());
+            assertTrue(finished.get());
         } finally {
             FileUtils.deleteDirectory(testDocumentFolder);
         }