blob: 670663427313f415e2c8f61322644f05ea0b1fcf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.solr.bolt;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
import org.apache.storm.solr.config.CountBasedCommit;
import org.apache.storm.solr.config.SolrCommitStrategy;
import org.apache.storm.solr.config.SolrConfig;
import org.apache.storm.solr.mapper.SolrMapper;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SolrUpdateBolt extends BaseTickTupleAwareRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(SolrUpdateBolt.class);
/**
* Half of the default Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
*/
private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
private final SolrConfig solrConfig;
private final SolrMapper solrMapper;
private final SolrCommitStrategy commitStgy; // if null, acks every tuple
private SolrClient solrClient;
private OutputCollector collector;
private List<Tuple> toCommitTuples;
private int tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS;
public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper) {
this(solrConfig, solrMapper, null);
}
public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper, SolrCommitStrategy commitStgy) {
this.solrConfig = solrConfig;
this.solrMapper = solrMapper;
this.commitStgy = commitStgy;
LOG.debug("Created {} with the following configuration: "
+ "[SolrConfig = {}], [SolrMapper = {}], [CommitStgy = {}]",
this.getClass().getSimpleName(), solrConfig, solrMapper, commitStgy);
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
if (solrConfig.isKerberosEnabled()) {
HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer());
}
solrMapper.configure();
this.solrClient = new CloudSolrClient(solrConfig.getZkHostString());
this.toCommitTuples = new ArrayList<>(capacity());
}
private int capacity() {
final int defArrListCpcty = 10;
return (commitStgy instanceof CountBasedCommit)
? ((CountBasedCommit) commitStgy).getThreshold()
: defArrListCpcty;
}
@Override
protected void process(Tuple tuple) {
try {
SolrRequest request = solrMapper.toSolrRequest(tuple);
solrClient.request(request, solrMapper.getCollection());
ack(tuple);
} catch (Exception e) {
fail(tuple, e);
}
}
private void ack(Tuple tuple) throws SolrServerException, IOException {
if (commitStgy == null) {
collector.ack(tuple);
} else {
final boolean isTickTuple = TupleUtils.isTick(tuple);
if (!isTickTuple) { // Don't ack tick tuples
toCommitTuples.add(tuple);
commitStgy.update();
}
if (isTickTuple || commitStgy.commit()) {
solrClient.commit(solrMapper.getCollection());
ackCommittedTuples();
}
}
}
private void ackCommittedTuples() {
List<Tuple> toAckTuples = getQueuedTuples();
for (Tuple tuple : toAckTuples) {
collector.ack(tuple);
}
}
private void fail(Tuple tuple, Exception e) {
collector.reportError(e);
if (commitStgy == null) {
collector.fail(tuple);
} else {
List<Tuple> failedTuples = getQueuedTuples();
failQueuedTuples(failedTuples);
}
}
private void failQueuedTuples(List<Tuple> failedTuples) {
for (Tuple failedTuple : failedTuples) {
collector.fail(failedTuple);
}
}
private List<Tuple> getQueuedTuples() {
List<Tuple> queuedTuples = toCommitTuples;
toCommitTuples = new ArrayList<>(capacity());
return queuedTuples;
}
@Override
public Map<String, Object> getComponentConfiguration() {
if (solrConfig.getTickTupleInterval() > 0) {
this.tickTupleInterval = solrConfig.getTickTupleInterval();
}
return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), tickTupleInterval);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { }
@Override
public void cleanup() {
if (solrClient != null) {
try {
solrClient.close();
} catch (IOException e) {
LOG.error("Error while closing solrClient", e);
}
}
}
}