blob: bf64b24d9c1db8eec79e7c1d1f4e3a9d4c8993ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datacollection.writer.solr;
import java.io.IOException;
import java.nio.charset.Charset;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.common.SolrInputDocument;
public class SolrWriter extends PipelineableWriter {
private static Logger log = Logger.getLogger(SolrWriter.class);
private CloudSolrServer server;
private final static String ID = "id";
private final static String SEQ_ID = "seqId";
private final static String DATA_TYPE = "type";
private final static String STREAM_NAME = "stream";
private final static String TAGS = "tags";
private final static String SOURCE = "source";
private final static String DATA = "data";
private final static String USER = "user";
private final static String SERVICE = "service";
private final static String DATE = "date";
private final static Pattern userPattern = Pattern.compile("user=(.+?)[, ]");
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
public SolrWriter() throws WriterException {
init(ChukwaAgent.getStaticConfiguration());
}
@Override
public void init(Configuration c) throws WriterException {
String serverName = c.get("solr.cloud.address");
if (serverName == null) {
throw new WriterException("Solr server address is not defined.");
}
String collection = c.get("solr.collection", "logs");
if(server == null) {
server = new CloudSolrServer(serverName);
server.setDefaultCollection(collection);
}
}
@Override
public void close() throws WriterException {
}
@Override
public CommitStatus add(List<Chunk> chunks) throws WriterException {
CommitStatus rv = ChukwaWriter.COMMIT_OK;
for(Chunk chunk : chunks) {
try {
SolrInputDocument doc = new SolrInputDocument();
doc.addField(ID, chunk.getSource() + "_" + chunk.getSeqID());
doc.addField(TAGS, chunk.getTags());
doc.addField(STREAM_NAME, chunk.getStreamName());
doc.addField(SOURCE, chunk.getSource());
doc.addField(SEQ_ID, chunk.getSeqID());
doc.addField(DATA_TYPE, chunk.getDataType());
doc.addField(DATA, new String(chunk.getData(), Charset.forName("UTF-8")));
// TODO: improve parsing logic for more sophisticated tagging
String data = new String(chunk.getData(), Charset.forName("UTF-8"));
Matcher m = userPattern.matcher(data);
if(m.find()) {
doc.addField(USER, m.group(1));
}
if(data.contains("hdfs")) {
doc.addField(SERVICE, "hdfs");
}
if(data.contains("yarn")) {
doc.addField(SERVICE, "yarn");
}
if(data.contains("mapredice")) {
doc.addField(SERVICE, "mapreduce");
}
try {
Date d = sdf.parse(data);
doc.addField(DATE, d, 1.0f);
} catch(ParseException e) {
}
server.add(doc);
server.commit();
} catch (SolrServerException | IOException e) {
log.warn("Failed to store data to Solr Cloud.");
log.warn(ExceptionUtil.getStackTrace(e));
}
}
if (next != null) {
rv = next.add(chunks); //pass data through
}
return rv;
}
}