blob: f77f6d6ec5933b1092b00b6f8a3aeaeb72b7232b [file] [log] [blame] [view]
---
title: Storm Redis Integration
layout: documentation
documentation: true
---
Storm/Trident integration for [Redis](http://redis.io/)
Storm-redis uses Jedis for Redis client.
## Usage
### How do I use it?
use it as a maven dependency:
```xml
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>${storm.version}</version>
<type>jar</type>
</dependency>
```
### For normal Bolt
Storm-redis provides basic Bolt implementations, `RedisLookupBolt` and `RedisStoreBolt`, and `RedisFilterBolt`.
As name represents its usage, `RedisLookupBolt` retrieves value from Redis using key, and `RedisStoreBolt` stores key / value to Redis, and `RedisFilterBolt` filters out tuple which key or field doesn't exist on Redis.
One tuple will be matched to one key / value pair, and you can define match pattern to `TupleMapper`.
You can also choose data type from `RedisDataTypeDescription` to use. Please refer `RedisDataTypeDescription.RedisDataType` to see what data types are supported. In some data types (hash and sorted set, and set if only RedisFilterBolt), it requires additional key and converted key from tuple becomes element.
These interfaces are combined with `RedisLookupMapper` and `RedisStoreMapper` and `RedisFilterMapper` which fit `RedisLookupBolt` and `RedisStoreBolt`, and `RedisFilterBolt` respectively.
(When you want to implement RedisFilterMapper, be sure to set declareOutputFields() to declare same fields to input stream, since FilterBolt forwards input tuples when they exist on Redis.)
#### RedisLookupBolt example
```java
class WordCountRedisLookupMapper implements RedisLookupMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";
public WordCountRedisLookupMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public List<Values> toTuple(ITuple input, Object value) {
String member = getKeyFromTuple(input);
List<Values> values = Lists.newArrayList();
values.add(new Values(member, value));
return values;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordName", "count"));
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return null;
}
}
```
```java
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(host).setPort(port).build();
RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper();
RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
```
#### RedisFilterBolt example
```java
class BlacklistWordFilterMapper implements RedisFilterMapper {
private RedisDataTypeDescription description;
private final String setKey = "blacklist";
public BlacklistWordFilterMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.SET, setKey);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return null;
}
}
```
```java
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(host).setPort(port).build();
RedisFilterMapper filterMapper = new BlacklistWordFilterMapper();
RedisFilterBolt filterBolt = new RedisFilterBolt(poolConfig, filterMapper);
```
#### RedisStoreBolt example
```java
class WordCountStoreMapper implements RedisStoreMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";
public WordCountStoreMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return tuple.getStringByField("count");
}
}
```
```java
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(host).setPort(port).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
```
### For non-simple Bolt
If your scenario doesn't fit ```RedisStoreBolt``` and ```RedisLookupBolt``` and ```RedisFilterBolt```, storm-redis also provides ```AbstractRedisBolt``` to let you extend and apply your business logic.
```java
public static class LookupWordTotalCountBolt extends AbstractRedisBolt {
private static final Logger LOG = LoggerFactory.getLogger(LookupWordTotalCountBolt.class);
private static final Random RANDOM = new Random();
public LookupWordTotalCountBolt(JedisPoolConfig config) {
super(config);
}
public LookupWordTotalCountBolt(JedisClusterConfig config) {
super(config);
}
@Override
public void execute(Tuple input) {
JedisCommands jedisCommands = null;
try {
jedisCommands = getInstance();
String wordName = input.getStringByField("word");
String countStr = jedisCommands.get(wordName);
if (countStr != null) {
int count = Integer.parseInt(countStr);
this.collector.emit(new Values(wordName, count));
// print lookup result with low probability
if(RANDOM.nextInt(1000) > 995) {
LOG.info("Lookup result - word : " + wordName + " / count : " + count);
}
} else {
// skip
LOG.warn("Word not found in Redis - word : " + wordName);
}
} finally {
if (jedisCommands != null) {
returnInstance(jedisCommands);
}
this.collector.ack(input);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// wordName, count
declarer.declare(new Fields("wordName", "count"));
}
}
```
### Trident State usage
1. RedisState and RedisMapState, which provide Jedis interface just for single redis.
2. RedisClusterState and RedisClusterMapState, which provide JedisCluster interface, just for redis cluster.
RedisState
```java
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(redisHost).setPort(redisPort)
.build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisLookupMapper lookupMapper = new WordCountLookupMapper();
RedisState.Factory factory = new RedisState.Factory(poolConfig);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
stream.partitionPersist(factory,
fields,
new RedisStateUpdater(storeMapper).withExpire(86400000),
new Fields());
TridentState state = topology.newStaticState(factory);
stream = stream.stateQuery(state, new Fields("word"),
new RedisStateQuerier(lookupMapper),
new Fields("columnName","columnValue"));
```
RedisClusterState
```java
Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
for (String hostPort : redisHostPort.split(",")) {
String[] host_port = hostPort.split(":");
nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));
}
JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
.build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisLookupMapper lookupMapper = new WordCountLookupMapper();
RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
stream.partitionPersist(factory,
fields,
new RedisClusterStateUpdater(storeMapper).withExpire(86400000),
new Fields());
TridentState state = topology.newStaticState(factory);
stream = stream.stateQuery(state, new Fields("word"),
new RedisClusterStateQuerier(lookupMapper),
new Fields("columnName","columnValue"));
```