blob: 17f2bfebbf9bbe56b91d25b6b623fce8df6cc70f [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sdap.mudrod.tools;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import javax.net.ssl.SSLHandshakeException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.sdap.mudrod.discoveryengine.MudrodAbstract;
import org.apache.sdap.mudrod.driver.ESDriver;
import org.apache.sdap.mudrod.driver.SparkDriver;
import org.apache.sdap.mudrod.main.MudrodConstants;
import org.apache.sdap.mudrod.main.MudrodEngine;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.get.GetResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
/**
* Entry point providing ingestion logic of <a href="https://eonet.sci.gsfc.nasa.gov/">
* Earth Observatory Natural Event Tracker (EONET)</a> data into
* the SDAP search server.
*/
public class EONETIngester extends MudrodAbstract {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(EONETIngester.class);
private static final String[] EVENTS_URLS = {
"https://eonet.sci.gsfc.nasa.gov/api/v2.1/events?status=closed",
"https://eonet.sci.gsfc.nasa.gov/api/v2.1/events?status=open"};
public EONETIngester(Properties props, ESDriver es, SparkDriver spark) {
super(props, es, spark);
initMudrod();
}
/**
* @param args there are no arguments for this service...
* It ingests the entire EONET database. In the future this will
* most likely change to provide finer grained controls over
* what kind of EONET data we wish to ingest e.g. recent events from the
* last day only, etc.
*/
public static void main(String[] args) {
MudrodEngine mudrodEngine = checkConfigInit(new MudrodEngine());
EONETIngester eonetIngester = new EONETIngester(
mudrodEngine.getConfig(), mudrodEngine.getESDriver(), null);
eonetIngester.acquireAllEvents(mudrodEngine);
}
private static MudrodEngine checkConfigInit(MudrodEngine mudrodEngine) {
if (mudrodEngine.getConfig().isEmpty()) {
mudrodEngine.loadConfig();
mudrodEngine.setESDriver(mudrodEngine.startESDriver());
}
return mudrodEngine;
}
public String acquireAllEvents(MudrodEngine mEngine) {
ESDriver esDriver = mEngine.getESDriver();
if (mEngine.getConfig().isEmpty()) {
mEngine.loadConfig();
esDriver = mEngine.startESDriver();
}
String result = null;
try {
result = executeBulkIndexRequest(mEngine, esDriver, executeEonetGetOperations());
} catch (SSLHandshakeException e) {
LOG.error("SSL handshake is failed whilst acquiring all events!", e);
}
if (result != null) {
return result;
} else {
return "";
}
}
private String executeBulkIndexRequest(MudrodEngine mEngine, ESDriver esDriver, JsonArray jsonEventsArray) {
esDriver.createBulkProcessor();
BulkProcessor bp = esDriver.getBulkProcessor();
GetResult result = null;
String index = mEngine.getConfig().getProperty(MudrodConstants.ES_INDEX_NAME);
String eventType = "eonet_event";
//for each event
for (JsonElement jsonElement : jsonEventsArray) {
UpdateRequest updateRequest = null;
JsonObject event = jsonElement.getAsJsonObject();
String eventId = event.get("id").toString();
try {
IndexRequest indexRequest = new IndexRequest(
index, eventType, eventId).source(executeEventMapping(event));
updateRequest =
new UpdateRequest(index, eventType, eventId).upsert(indexRequest);
updateRequest.doc(indexRequest);
bp.add(updateRequest);
} catch (NumberFormatException e) {
LOG.error("Error whilst processing numbers", e);
}
UpdateResponse updateResponse = null;
try {
updateResponse = esDriver.getClient().update(updateRequest).get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Failed to execute bulk Index request : ", e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
if (updateResponse != null) {
result = updateResponse.getGetResult();
}
}
esDriver.destroyBulkProcessor();
//return result.getSource().toString();
return "";
}
private XContentBuilder executeEventMapping(JsonObject event) {
//for an individual event
XContentBuilder eventMapping = null;
if (null != event.get("closed")) {
try {
eventMapping = jsonBuilder()
.startObject()
.field("id", event.get("id"))
.field("title", event.get("title"))
.field("description", event.get("description"))
.field("link", event.get("link"))
.field("closed", event.get("closed"))
.field("categories", event.get("categories").getAsJsonArray())
.field("sources", event.get("sources").getAsJsonArray())
.field("geometries", event.get("geometries").getAsJsonArray())
.endObject();
} catch (IOException e) {
LOG.error("Failed to create event mapping : ", e);
}
} else {
try {
eventMapping = jsonBuilder()
.startObject()
.field("id", event.get("id"))
.field("title", event.get("title"))
.field("description", event.get("description"))
.field("link", event.get("link"))
.field("categories", event.get("categories").getAsJsonArray())
.field("sources", event.get("sources").getAsJsonArray())
.field("geometries", event.get("geometries").getAsJsonArray())
.endObject();
} catch (IOException e) {
LOG.error("Failed to create event mapping : ", e);
}
}
return eventMapping;
}
private JsonArray executeEonetGetOperations() throws SSLHandshakeException {
HttpClient client = HttpClientBuilder.create().build();
HttpResponse response = null;
JsonArray eventArray = new JsonArray();
for (String string : EVENTS_URLS) {
HttpGet request = new HttpGet(string);
// add request header
request.addHeader("User-Agent", "Apache SDAP MUDROD EONETIngester");
LOG.info("Executing: {}", request.toString());
try {
response = client.execute(request);
HttpEntity entity = response.getEntity();
JsonArray partialEventsArray = extractEventsArrayFromJsonResponse(EntityUtils.toString(entity, "UTF-8"));
for (JsonElement jsonElement : partialEventsArray) {
eventArray.add(jsonElement);
}
} catch (IOException e) {
LOG.error("Failed to execute HTTP GET " + request.toString() + " : ", e);
}
}
return eventArray;
}
private JsonArray extractEventsArrayFromJsonResponse(String string) {
return ((JsonObject) new JsonParser().parse(string)).getAsJsonArray("events");
}
}