| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.ambari.logfeeder.output; |
| |
| import org.apache.ambari.logfeeder.common.IdGeneratorHelper; |
| import org.apache.ambari.logfeeder.common.LogFeederSolrClientFactory; |
| import org.apache.ambari.logfeeder.conf.LogFeederProps; |
| import org.apache.ambari.logfeeder.plugin.input.InputMarker; |
| import org.apache.ambari.logfeeder.plugin.output.Output; |
| import org.apache.ambari.logfeeder.util.DateUtil; |
| import org.apache.ambari.logfeeder.util.LogFeederUtil; |
| import org.apache.ambari.logsearch.config.api.ShipperConfigElementDescription; |
| import org.apache.commons.collections.CollectionUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.logging.log4j.Level; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| import org.apache.solr.client.solrj.SolrClient; |
| import org.apache.solr.client.solrj.SolrServerException; |
| import org.apache.solr.client.solrj.impl.CloudSolrClient; |
| import org.apache.solr.client.solrj.response.SolrPingResponse; |
| import org.apache.solr.client.solrj.response.UpdateResponse; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.apache.solr.common.cloud.DocCollection; |
| import org.apache.solr.common.cloud.Slice; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.MalformedURLException; |
| import java.util.ArrayList; |
| import java.util.Calendar; |
| import java.util.Collection;; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Ship (transformed) input data to solr destination. Works with both solr cloud mode or providing static solr url(s). |
| * In Solr cloud mode Log Feeder will manage and listen a ZooKeeper connection. If there are too many Log Feeder nodes that can mean |
| * it requires a lot of client connections. (for static urls, use "solr_urls" field, for Solr cloud mode use "zk_connect_string") |
| * Example configuration (using JSON config api): |
| * <pre> |
| * { |
| * "output": [ |
| * { |
| * "is_enabled": "true", |
| * "comment": "Output to solr for service logs", |
| * "collection" : "hadoop_logs", |
| * "destination": "solr", |
| * "zk_connect_string": "localhost:9983", |
| * "type": "service", |
| * "skip_logtime": "true", |
| * "conditions": { |
| * "fields": { |
| * "rowtype": [ |
| * "service" |
| * ] |
| * } |
| * } |
| * } |
| * ] |
| * } |
| * </pre> |
| */ |
| public class OutputSolr extends Output<LogFeederProps, InputMarker> { |
| |
| private static final Logger logger = LogManager.getLogger(OutputSolr.class); |
| |
| private static final int DEFAULT_MAX_BUFFER_SIZE = 5000; |
| private static final int DEFAULT_MAX_INTERVAL_MS = 3000; |
| private static final int DEFAULT_NUMBER_OF_WORKERS = 1; |
| private static final boolean DEFAULT_SKIP_LOGTIME = false; |
| |
| private static final int RETRY_INTERVAL = 30; |
| |
| private static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config"; |
| private static final String SOLR_HTTPCLIENT_BUILDER_FACTORY = "solr.httpclient.builder.factory"; |
| |
| @ShipperConfigElementDescription( |
| path = "/output/[]/type", |
| type = "string", |
| description = "Output type name, right now it can be service or audit", |
| examples = {"\"service\"", "\"audit\""} |
| ) |
| private String type; |
| private String collection; |
| private int splitInterval; |
| private String zkConnectString; |
| private String[] solrUrls = null; |
| private int maxIntervalMS; |
| private int workers; |
| private int maxBufferSize; |
| private boolean implicitRouting = false; |
| private int lastSlotByMin = -1; |
| private boolean skipLogtime = false; |
| private boolean discoverSolrNodes = false; |
| private List<String> idFields = new ArrayList<>(); |
| |
| private BlockingQueue<OutputData> outgoingBuffer = null; |
| private List<SolrWorkerThread> workerThreadList = new ArrayList<>(); |
| |
| private LogFeederProps logFeederProps; |
| |
| @Override |
| public String getOutputType() { |
| return type; |
| } |
| |
| @Override |
| public String getStatMetricName() { |
| return "output.solr.write_logs"; |
| } |
| |
| @Override |
| public String getWriteBytesMetricName() { |
| return "output.solr.write_bytes"; |
| } |
| |
| @Override |
| public void init(LogFeederProps logFeederProps) throws Exception { |
| this.logFeederProps = logFeederProps; |
| initParams(logFeederProps); |
| setupSecurity(); |
| createOutgoingBuffer(); |
| createSolrWorkers(); |
| } |
| |
| private void initParams(LogFeederProps logFeederProps) throws Exception { |
| type = getStringValue("type"); |
| |
| zkConnectString = getStringValue("zk_connect_string"); |
| List<String> solrUrlsList = getListValue("solr_urls"); |
| |
| if (StringUtils.isBlank(zkConnectString) |
| && CollectionUtils.isEmpty(solrUrlsList) |
| && StringUtils.isBlank(logFeederProps.getSolrUrlsStr())) { |
| throw new Exception("For solr output the zk_connect_string or solr_urls property need to be set"); |
| } |
| |
| if (StringUtils.isNotBlank(logFeederProps.getSolrUrlsStr())) { |
| solrUrls = logFeederProps.getSolrUrls(); |
| } else if (CollectionUtils.isNotEmpty(solrUrlsList)) { |
| solrUrls = solrUrlsList.toArray(new String[0]); |
| } |
| |
| idFields = getListValue("id_fields", new ArrayList<>()); |
| |
| skipLogtime = getBooleanValue("skip_logtime", DEFAULT_SKIP_LOGTIME); |
| |
| maxIntervalMS = getIntValue("idle_flush_time_ms", DEFAULT_MAX_INTERVAL_MS); |
| workers = getIntValue("workers", DEFAULT_NUMBER_OF_WORKERS); |
| |
| splitInterval = 0; |
| String splitMode = getStringValue("split_interval", "none"); |
| if (!splitMode.equals("none")) { |
| splitInterval = Integer.parseInt(splitMode); |
| } |
| |
| collection = getStringValue("collection"); |
| if (StringUtils.isEmpty(collection)) { |
| throw new IllegalStateException("Collection property is mandatory"); |
| } |
| |
| discoverSolrNodes = logFeederProps.isSolrCloudDiscover(); |
| |
| maxBufferSize = getIntValue("flush_size", DEFAULT_MAX_BUFFER_SIZE); |
| if (maxBufferSize < 1) { |
| logger.warn("maxBufferSize is less than 1. Making it 1"); |
| maxBufferSize = 1; |
| } |
| |
| logger.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d." |
| + getShortDescription(), workers, splitMode, splitInterval)); |
| |
| implicitRouting = logFeederProps.isSolrImplicitRouting(); // TODO: in the future, load it from output config (can be a use case to use different routing for audit/service logs) |
| if (implicitRouting) { |
| logger.info("Config: Use implicit routing globally for adding docs to Solr."); |
| } else { |
| logger.info("Config: Use compositeId globally for adding docs to Solr."); |
| } |
| } |
| |
| private void setupSecurity() { |
| boolean securityEnabled = logFeederProps.getLogFeederSecurityConfig().isSolrKerberosEnabled(); |
| if (securityEnabled) { |
| String javaSecurityConfig = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG); |
| String solrHttpBuilderFactory = System.getProperty(SOLR_HTTPCLIENT_BUILDER_FACTORY); |
| logger.info("setupSecurity() called for kerberos configuration, jaas file: " |
| + javaSecurityConfig + ", solr http client factory: " + solrHttpBuilderFactory); |
| } |
| } |
| |
| private void createOutgoingBuffer() { |
| int bufferSize = maxBufferSize * (workers + 3); |
| logger.info("Creating blocking queue with bufferSize=" + bufferSize); |
| outgoingBuffer = new LinkedBlockingQueue<OutputData>(bufferSize); |
| } |
| |
| private void createSolrWorkers() throws Exception, MalformedURLException { |
| for (int count = 0; count < workers; count++) { |
| SolrClient solrClient = getSolrClient(count); |
| createSolrWorkerThread(count, solrClient); |
| } |
| } |
| |
| private SolrClient getSolrClient(int count) throws Exception, MalformedURLException { |
| SolrClient solrClient = new LogFeederSolrClientFactory().createSolrClient(zkConnectString, solrUrls, collection, discoverSolrNodes); |
| pingSolr(count, solrClient); |
| return solrClient; |
| } |
| |
| private void pingSolr(int count, SolrClient solrClient) { |
| try { |
| logger.info("Pinging Solr server."); |
| SolrPingResponse response = solrClient.ping(); |
| if (response.getStatus() == 0) { |
| logger.info("Ping to Solr server is successful for worker=" + count); |
| } else { |
| logger.warn( |
| String.format("Ping to Solr server failed. It would check again. worker=%d, collection=%s, " + |
| "response=%s", count, collection, response)); |
| } |
| } catch (Throwable t) { |
| logger.warn(String.format( |
| "Ping to Solr server failed. It would check again. worker=%d, collection=%s", count, collection), t); |
| } |
| } |
| |
| private void createSolrWorkerThread(int count, SolrClient solrClient) { |
| SolrWorkerThread solrWorkerThread = new SolrWorkerThread(solrClient); |
| solrWorkerThread.setName(getNameForThread() + "," + collection + ",worker=" + count); |
| solrWorkerThread.setDaemon(true); |
| solrWorkerThread.start(); |
| workerThreadList.add(solrWorkerThread); |
| } |
| |
| @Override |
| public void write(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception { |
| try { |
| trimStrValue(jsonObj); |
| useActualDateIfNeeded(jsonObj); |
| outgoingBuffer.put(new OutputData(jsonObj, inputMarker)); |
| } catch (InterruptedException e) { |
| // ignore |
| } |
| } |
| |
| private void useActualDateIfNeeded(Map<String, Object> jsonObj) { |
| if (skipLogtime) { |
| jsonObj.put("logtime", DateUtil.getActualDateStr()); |
| if (jsonObj.get("evtTime") != null) { |
| jsonObj.put("evtTime", DateUtil.getActualDateStr()); |
| } |
| } |
| } |
| |
| public void flush() { |
| logger.info("Flush called..."); |
| setDrain(true); |
| |
| int wrapUpTimeSecs = 30; |
| // Give wrapUpTimeSecs seconds to wrap up |
| boolean isPending = false; |
| for (int i = 0; i < wrapUpTimeSecs; i++) { |
| for (SolrWorkerThread solrWorkerThread : workerThreadList) { |
| if (solrWorkerThread.isDone()) { |
| try { |
| solrWorkerThread.interrupt(); |
| } catch (Throwable t) { |
| // ignore |
| } |
| } else { |
| isPending = true; |
| } |
| } |
| if (isPending) { |
| try { |
| logger.info("Will give " + (wrapUpTimeSecs - i) + " seconds to wrap up"); |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| // ignore |
| } |
| } |
| isPending = false; |
| } |
| } |
| |
| @Override |
| public void setDrain(boolean drain) { |
| super.setDrain(drain); |
| } |
| |
| @Override |
| public Long getPendingCount() { |
| long pendingCount = 0; |
| for (SolrWorkerThread solrWorkerThread : workerThreadList) { |
| pendingCount += solrWorkerThread.localBuffer.size(); |
| } |
| return pendingCount; |
| } |
| |
| @Override |
| public void close() { |
| logger.info("Closing Solr client..."); |
| flush(); |
| |
| logger.info("Closed Solr client"); |
| super.close(); |
| } |
| |
| @Override |
| public String getShortDescription() { |
| return "output:destination=solr,collection=" + collection; |
| } |
| |
| class SolrWorkerThread extends Thread { |
| private static final String ROUTER_FIELD = "_router_field_"; |
| |
| private final SolrClient solrClient; |
| private final Collection<SolrInputDocument> localBuffer = new ArrayList<>(); |
| private final Map<String, InputMarker> latestInputMarkers = new HashMap<>(); |
| |
| private long localBufferBytesSize = 0; |
| |
| public SolrWorkerThread(SolrClient solrClient) { |
| this.solrClient = solrClient; |
| } |
| |
| @Override |
| public void run() { |
| logger.info("SolrWorker thread started"); |
| long lastDispatchTime = System.currentTimeMillis(); |
| |
| while (true) { |
| long currTimeMS = System.currentTimeMillis(); |
| OutputData outputData = null; |
| try { |
| long nextDispatchDuration = maxIntervalMS - (currTimeMS - lastDispatchTime); |
| outputData = getOutputData(nextDispatchDuration); |
| |
| if (outputData != null) { |
| createSolrDocument(outputData); |
| } else { |
| if (isDrain() && outgoingBuffer.isEmpty()) { |
| break; |
| } |
| } |
| |
| if (!localBuffer.isEmpty() && |
| (outputData == null && isDrain() || nextDispatchDuration <= 0 || localBuffer.size() >= maxBufferSize) |
| ) { |
| boolean response = sendToSolr(outputData); |
| if (isDrain() && !response) { |
| //Since sending to Solr response failed and it is in draining mode, let's break; |
| logger.warn("In drain mode and sending to Solr failed. So exiting. output=" + getShortDescription()); |
| break; |
| } |
| } |
| if (localBuffer.isEmpty()) { |
| //If localBuffer is empty, then reset the timer |
| lastDispatchTime = currTimeMS; |
| } |
| } catch (InterruptedException e) { |
| // Handle thread exiting |
| } catch (Throwable t) { |
| String logMessageKey = this.getClass().getSimpleName() + "_SOLR_MAINLOOP_EXCEPTION"; |
| LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception in main loop. " + outputData, t, logger, |
| Level.ERROR); |
| } |
| } |
| |
| closeSolrClient(); |
| |
| resetLocalBuffer(); |
| logger.info("Exiting Solr worker thread. output=" + getShortDescription()); |
| } |
| |
| /** |
| * This will loop till Solr is available and LogFeeder is |
| * successfully able to write to the collection or shard. It will block till |
| * it can write. The outgoingBuffer is a BlockingQueue and when it is full, it |
| * will automatically stop parsing the log files. |
| */ |
| private boolean sendToSolr(OutputData outputData) { |
| boolean result = false; |
| while (!isDrain()) { |
| try { |
| if (implicitRouting) { |
| // Compute the current router value |
| addRouterField(); |
| } |
| addToSolr(outputData); |
| resetLocalBuffer(); |
| //Send successful, will return |
| result = true; |
| break; |
| } catch (IOException | SolrException | SolrServerException exception ) { |
| // Transient error, lets block till it is available |
| try { |
| logger.warn("Solr is not reachable. Going to retry after " + RETRY_INTERVAL + " seconds. " + "output=" |
| + getShortDescription(), exception); |
| Thread.sleep(RETRY_INTERVAL * 1000); |
| } catch (Throwable t) { |
| // ignore |
| } |
| } catch (Throwable serverException) { |
| // Something unknown happened. Let's not block because of this error. |
| // Clear the buffer |
| String logMessageKey = this.getClass().getSimpleName() + "_SOLR_UPDATE_EXCEPTION"; |
| LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error sending log message to server. Dropping logs", |
| serverException, logger, Level.ERROR); |
| resetLocalBuffer(); |
| break; |
| } |
| } |
| return result; |
| } |
| |
| private OutputData getOutputData(long nextDispatchDuration) throws InterruptedException { |
| OutputData outputData = outgoingBuffer.poll(); |
| if (outputData == null && !isDrain() && nextDispatchDuration > 0) { |
| outputData = outgoingBuffer.poll(nextDispatchDuration, TimeUnit.MILLISECONDS); |
| } |
| if (outputData != null && outputData.jsonObj.get("id") == null) { |
| outputData.jsonObj.put("id", IdGeneratorHelper.generateUUID(outputData.jsonObj, idFields)); |
| } |
| return outputData; |
| } |
| |
| private void createSolrDocument(OutputData outputData) { |
| SolrInputDocument document = new SolrInputDocument(); |
| for (String name : outputData.jsonObj.keySet()) { |
| Object obj = outputData.jsonObj.get(name); |
| document.addField(name, obj); |
| try { |
| localBufferBytesSize += obj.toString().length(); |
| } catch (Throwable t) { |
| String logMessageKey = this.getClass().getSimpleName() + "_BYTE_COUNT_ERROR"; |
| LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error calculating byte size. object=" + obj, t, logger, |
| Level.ERROR); |
| } |
| } |
| Object fileKey = outputData.inputMarker.getAllProperties().get("file_key"); |
| if (fileKey != null) { |
| latestInputMarkers.put(fileKey.toString(), outputData.inputMarker); |
| } |
| localBuffer.add(document); |
| } |
| |
| private void addRouterField() { |
| ZkStateReader reader = ((CloudSolrClient) solrClient).getZkStateReader(); |
| DocCollection docCollection = reader.getClusterState().getCollection(collection); |
| Collection<Slice> slices = docCollection.getSlices(); |
| List<String> shards = slices.stream().map(Slice::getName).collect(Collectors.toList()); |
| |
| Calendar cal = Calendar.getInstance(); |
| int weekDay = cal.get(Calendar.DAY_OF_WEEK); |
| int currHour = cal.get(Calendar.HOUR_OF_DAY); |
| int currMin = cal.get(Calendar.MINUTE); |
| |
| int minOfWeek = (weekDay - 1) * 24 * 60 + currHour * 60 + currMin; |
| int slotByMin = minOfWeek / splitInterval % shards.size(); |
| |
| String shard = shards.get(slotByMin); |
| |
| if (lastSlotByMin != slotByMin) { |
| logger.info("Switching to shard " + shard + ", output=" + getShortDescription()); |
| lastSlotByMin = slotByMin; |
| } |
| |
| for (SolrInputDocument solrInputDocument : localBuffer) { |
| solrInputDocument.setField(ROUTER_FIELD, shard); |
| } |
| } |
| |
| private void addToSolr(OutputData outputData) throws SolrServerException, IOException { |
| UpdateResponse response = solrClient.add(localBuffer); |
| if (response.getStatus() != 0) { |
| String logMessageKey = this.getClass().getSimpleName() + "_SOLR_UPDATE_ERROR"; |
| LogFeederUtil.logErrorMessageByInterval(logMessageKey, |
| String.format("Error writing to Solr. response=%s, log=%s", response, outputData), null, logger, Level.ERROR); |
| } |
| statMetric.value += localBuffer.size(); |
| writeBytesMetric.value += localBufferBytesSize; |
| for (InputMarker inputMarker : latestInputMarkers.values()) { |
| inputMarker.getInput().checkIn(inputMarker); |
| } |
| } |
| |
| private void closeSolrClient() { |
| if (solrClient != null) { |
| try { |
| solrClient.close(); |
| } catch (IOException e) { |
| // Ignore |
| } |
| } |
| } |
| |
| public void resetLocalBuffer() { |
| localBuffer.clear(); |
| localBufferBytesSize = 0; |
| latestInputMarkers.clear(); |
| } |
| |
| public boolean isDone() { |
| return localBuffer.isEmpty(); |
| } |
| } |
| |
| @Override |
| public void write(String block, InputMarker inputMarker) { |
| } |
| |
| @Override |
| public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException { |
| throw new UnsupportedOperationException("copyFile method is not yet supported for output=solr"); |
| } |
| |
| @Override |
| public List<String> getIdFields() { |
| return idFields; |
| } |
| } |