| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nutch.indexwriter.cloudsearch; |
| |
| import java.lang.invoke.MethodHandles; |
| import java.io.ByteArrayInputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.charset.StandardCharsets; |
| import java.text.SimpleDateFormat; |
| import java.util.AbstractMap; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.nutch.indexer.IndexWriter; |
| import org.apache.nutch.indexer.IndexWriterParams; |
| import org.apache.nutch.indexer.NutchDocument; |
| import org.apache.nutch.indexer.NutchField; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.amazonaws.regions.RegionUtils; |
| import com.amazonaws.services.cloudsearchdomain.AmazonCloudSearchDomainClient; |
| import com.amazonaws.services.cloudsearchdomain.model.ContentType; |
| import com.amazonaws.services.cloudsearchdomain.model.UploadDocumentsRequest; |
| import com.amazonaws.services.cloudsearchdomain.model.UploadDocumentsResult; |
| import com.amazonaws.services.cloudsearchv2.AmazonCloudSearchClient; |
| import com.amazonaws.services.cloudsearchv2.model.DescribeDomainsRequest; |
| import com.amazonaws.services.cloudsearchv2.model.DescribeDomainsResult; |
| import com.amazonaws.services.cloudsearchv2.model.DescribeIndexFieldsRequest; |
| import com.amazonaws.services.cloudsearchv2.model.DescribeIndexFieldsResult; |
| import com.amazonaws.services.cloudsearchv2.model.DomainStatus; |
| import com.amazonaws.services.cloudsearchv2.model.IndexFieldStatus; |
| import com.amazonaws.util.json.JSONException; |
| import com.amazonaws.util.json.JSONObject; |
| |
| /** |
| * Writes documents to CloudSearch. |
| */ |
| public class CloudSearchIndexWriter implements IndexWriter { |
| private static final Logger LOG = LoggerFactory |
| .getLogger(MethodHandles.lookup().lookupClass()); |
| |
| private static final int MAX_SIZE_BATCH_BYTES = 5242880; |
| private static final int MAX_SIZE_DOC_BYTES = 1048576; |
| |
| private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat( |
| "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); |
| |
| private AmazonCloudSearchDomainClient client; |
| |
| private int maxDocsInBatch = -1; |
| |
| private StringBuffer buffer; |
| |
| private int numDocsInBatch = 0; |
| |
| private boolean dumpBatchFilesToTemp = false; |
| |
| private Configuration conf; |
| |
| private Map<String, String> csfields = new HashMap<String, String>(); |
| |
| private String endpoint; |
| private String regionName; |
| |
| @Override |
| public void open(Configuration conf, String name) throws IOException { |
| //Implementation not required |
| } |
| |
| @Override |
| public void open(IndexWriterParams parameters) throws IOException { |
| // LOG.debug("CloudSearchIndexWriter.open() name={} ", name); |
| |
| endpoint = parameters.get(CloudSearchConstants.ENDPOINT); |
| dumpBatchFilesToTemp = parameters |
| .getBoolean(CloudSearchConstants.BATCH_DUMP, false); |
| this.regionName = parameters.get(CloudSearchConstants.REGION); |
| |
| if (StringUtils.isBlank(endpoint) && !dumpBatchFilesToTemp) { |
| String message = "Missing CloudSearch endpoint. Should set it set via -D " |
| + CloudSearchConstants.ENDPOINT + " or in nutch-site.xml"; |
| message += "\n" + describe(); |
| LOG.error(message); |
| throw new RuntimeException(message); |
| } |
| |
| maxDocsInBatch = parameters.getInt(CloudSearchConstants.MAX_DOCS_BATCH, -1); |
| |
| buffer = new StringBuffer(MAX_SIZE_BATCH_BYTES).append('['); |
| |
| if (dumpBatchFilesToTemp) { |
| // only dumping to local file |
| // no more config required |
| return; |
| } |
| |
| if (StringUtils.isBlank(endpoint)) { |
| throw new RuntimeException("endpoint not set for CloudSearch"); |
| } |
| |
| AmazonCloudSearchClient cl = new AmazonCloudSearchClient(); |
| if (StringUtils.isNotBlank(regionName)) { |
| cl.setRegion(RegionUtils.getRegion(regionName)); |
| } |
| |
| String domainName = null; |
| |
| // retrieve the domain name |
| DescribeDomainsResult domains = cl |
| .describeDomains(new DescribeDomainsRequest()); |
| |
| Iterator<DomainStatus> dsiter = domains.getDomainStatusList().iterator(); |
| while (dsiter.hasNext()) { |
| DomainStatus ds = dsiter.next(); |
| if (ds.getDocService().getEndpoint().equals(endpoint)) { |
| domainName = ds.getDomainName(); |
| break; |
| } |
| } |
| |
| // check domain name |
| if (StringUtils.isBlank(domainName)) { |
| throw new RuntimeException( |
| "No domain name found for CloudSearch endpoint"); |
| } |
| |
| DescribeIndexFieldsResult indexDescription = cl.describeIndexFields( |
| new DescribeIndexFieldsRequest().withDomainName(domainName)); |
| for (IndexFieldStatus ifs : indexDescription.getIndexFields()) { |
| String indexname = ifs.getOptions().getIndexFieldName(); |
| String indextype = ifs.getOptions().getIndexFieldType(); |
| LOG.info("CloudSearch index name {} of type {}", indexname, indextype); |
| csfields.put(indexname, indextype); |
| } |
| |
| client = new AmazonCloudSearchDomainClient(); |
| client.setEndpoint(endpoint); |
| } |
| |
| @Override |
| public void delete(String url) throws IOException { |
| |
| try { |
| JSONObject doc_builder = new JSONObject(); |
| |
| doc_builder.put("type", "delete"); |
| |
| // generate the id from the url |
| String ID = CloudSearchUtils.getID(url); |
| doc_builder.put("id", ID); |
| |
| // add to the batch |
| addToBatch(doc_builder.toString(2), url); |
| |
| } catch (JSONException e) { |
| LOG.error("Exception caught while building JSON object", e); |
| } |
| |
| } |
| |
| @Override |
| public void update(NutchDocument doc) throws IOException { |
| write(doc); |
| } |
| |
| @Override |
| public void write(NutchDocument doc) throws IOException { |
| try { |
| JSONObject doc_builder = new JSONObject(); |
| |
| doc_builder.put("type", "add"); |
| |
| String url = doc.getField("url").toString(); |
| |
| // generate the id from the url |
| String ID = CloudSearchUtils.getID(url); |
| doc_builder.put("id", ID); |
| |
| JSONObject fields = new JSONObject(); |
| |
| for (final Entry<String, NutchField> e : doc) { |
| String fieldname = cleanFieldName(e.getKey()); |
| String type = csfields.get(fieldname); |
| |
| // undefined in index |
| if (!dumpBatchFilesToTemp && type == null) { |
| LOG.info( |
| "Field {} not defined in CloudSearch domain for {} - skipping.", |
| fieldname, url); |
| continue; |
| } |
| |
| List<Object> values = e.getValue().getValues(); |
| // write the values |
| for (Object value : values) { |
| // Convert dates to an integer |
| if (value instanceof Date) { |
| Date d = (Date) value; |
| value = DATE_FORMAT.format(d); |
| } |
| // normalise strings |
| else if (value instanceof String) { |
| value = CloudSearchUtils.stripNonCharCodepoints((String) value); |
| } |
| |
| fields.accumulate(fieldname, value); |
| } |
| } |
| |
| doc_builder.put("fields", fields); |
| |
| addToBatch(doc_builder.toString(2), url); |
| |
| } catch (JSONException e) { |
| LOG.error("Exception caught while building JSON object", e); |
| } |
| } |
| |
| private void addToBatch(String currentDoc, String url) throws IOException { |
| int currentDocLength = currentDoc.getBytes(StandardCharsets.UTF_8).length; |
| |
| // check that the doc is not too large -> skip it if it does |
| if (currentDocLength > MAX_SIZE_DOC_BYTES) { |
| LOG.error("Doc too large. currentDoc.length {} : {}", currentDocLength, |
| url); |
| return; |
| } |
| |
| int currentBufferLength = buffer.toString() |
| .getBytes(StandardCharsets.UTF_8).length; |
| |
| LOG.debug("currentDoc.length {}, buffer length {}", currentDocLength, |
| currentBufferLength); |
| |
| // can add it to the buffer without overflowing? |
| if (currentDocLength + 2 + currentBufferLength < MAX_SIZE_BATCH_BYTES) { |
| if (numDocsInBatch != 0) |
| buffer.append(','); |
| buffer.append(currentDoc); |
| numDocsInBatch++; |
| } |
| // flush the previous batch and create a new one with this doc |
| else { |
| commit(); |
| buffer.append(currentDoc); |
| numDocsInBatch++; |
| } |
| |
| // have we reached the max number of docs in a batch after adding |
| // this doc? |
| if (maxDocsInBatch > 0 && numDocsInBatch == maxDocsInBatch) { |
| commit(); |
| } |
| } |
| |
| @Override |
| public void commit() throws IOException { |
| |
| // nothing to do |
| if (numDocsInBatch == 0) { |
| return; |
| } |
| |
| // close the array |
| buffer.append(']'); |
| |
| LOG.info("Sending {} docs to CloudSearch", numDocsInBatch); |
| |
| byte[] bb = buffer.toString().getBytes(StandardCharsets.UTF_8); |
| |
| if (dumpBatchFilesToTemp) { |
| try { |
| File temp = File.createTempFile("CloudSearch_", ".json"); |
| FileUtils.writeByteArrayToFile(temp, bb); |
| LOG.info("Wrote batch file {}", temp.getName()); |
| } catch (IOException e1) { |
| LOG.error("Exception while generating batch file", e1); |
| } finally { |
| // reset buffer and doc counter |
| buffer = new StringBuffer(MAX_SIZE_BATCH_BYTES).append('['); |
| numDocsInBatch = 0; |
| } |
| return; |
| } |
| // not in debug mode |
| try (InputStream inputStream = new ByteArrayInputStream(bb)) { |
| UploadDocumentsRequest batch = new UploadDocumentsRequest(); |
| batch.setContentLength((long) bb.length); |
| batch.setContentType(ContentType.Applicationjson); |
| batch.setDocuments(inputStream); |
| @SuppressWarnings("unused") |
| UploadDocumentsResult result = client.uploadDocuments(batch); |
| } catch (Exception e) { |
| LOG.error("Exception while sending batch", e); |
| LOG.error(buffer.toString()); |
| } finally { |
| // reset buffer and doc counter |
| buffer = new StringBuffer(MAX_SIZE_BATCH_BYTES).append('['); |
| numDocsInBatch = 0; |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| // This will flush any unsent documents. |
| commit(); |
| // close the client |
| if (client != null) { |
| client.shutdown(); |
| } |
| } |
| |
| public Configuration getConf() { |
| return this.conf; |
| } |
| |
| @Override |
| public void setConf(Configuration conf) { |
| this.conf = conf; |
| } |
| |
| /** |
| * Returns {@link Map} with the specific parameters the IndexWriter instance can take. |
| * |
| * @return The values of each row. It must have the form <KEY,<DESCRIPTION,VALUE>>. |
| */ |
| @Override |
| public Map<String, Entry<String, Object>> describe() { |
| Map<String, Map.Entry<String, Object>> properties = new LinkedHashMap<>(); |
| |
| properties.put(CloudSearchConstants.ENDPOINT, new AbstractMap.SimpleEntry<>( |
| "Endpoint where service requests should be submitted.", this.endpoint)); |
| properties.put(CloudSearchConstants.REGION, |
| new AbstractMap.SimpleEntry<>("Region name.", this.regionName)); |
| properties.put(CloudSearchConstants.BATCH_DUMP, |
| new AbstractMap.SimpleEntry<>("true to send documents to a local file.", |
| this.dumpBatchFilesToTemp)); |
| properties.put(CloudSearchConstants.MAX_DOCS_BATCH, |
| new AbstractMap.SimpleEntry<>( |
| "Maximum number of documents to send as a batch to CloudSearch.", |
| this.maxDocsInBatch)); |
| |
| return properties; |
| } |
| |
| /** |
| * Remove the non-cloudSearch-legal characters. Note that this might convert |
| * two fields to the same name. |
| * |
| * @param name |
| * @return |
| */ |
| String cleanFieldName(String name) { |
| String lowercase = name.toLowerCase(); |
| return lowercase.replaceAll("[^a-z_0-9]", "_"); |
| } |
| |
| } |