blob: 36babe472dce79c4d1d1f7fd14b1ef86f3d29ded [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.redis
import org.apache.gearpump.Message
import org.apache.gearpump.redis.RedisMessage.Geo.GEOADD
import org.apache.gearpump.redis.RedisMessage.Hashes._
import org.apache.gearpump.redis.RedisMessage.HyperLogLog._
import org.apache.gearpump.redis.RedisMessage.Keys._
import org.apache.gearpump.redis.RedisMessage.Lists._
import org.apache.gearpump.redis.RedisMessage.Sets._
import org.apache.gearpump.redis.RedisMessage.SortedSets._
import org.apache.gearpump.redis.RedisMessage.String._
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.task.TaskContext
import org.apache.gearpump.util.LogUtil
import redis.clients.jedis.Jedis
import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT}
/**
* Save message in Redis Instance
*
* @param host
* @param port
* @param timeout
* @param database
* @param password
*/
class RedisSink(
host: String = DEFAULT_HOST,
port: Int = DEFAULT_PORT,
timeout: Int = DEFAULT_TIMEOUT,
database: Int = DEFAULT_DATABASE,
password: String = "") extends DataSink {
private val LOG = LogUtil.getLogger(getClass)
@transient private lazy val client = new Jedis(host, port, timeout)
override def open(context: TaskContext): Unit = {
client.select(database)
if (password != null && password.length != 0) {
client.auth(password)
}
}
override def write(message: Message): Unit = {
message.value match {
// GEO
case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member)
// Hashes
case msg: HDEL => client.hdel(msg.key, msg.field)
case msg: HINCRBY => client.hincrBy(msg.key, msg.field, msg.increment)
case msg: HINCRBYFLOAT => client.hincrByFloat(msg.key, msg.field, msg.increment)
case msg: HSET => client.hset(msg.key, msg.field, msg.value)
case msg: HSETNX => client.hsetnx(msg.key, msg.field, msg.value)
// HyperLogLog
case msg: PFADD => client.pfadd(msg.key, msg.element)
// Lists
case msg: LPUSH => client.lpush(msg.key, msg.value)
case msg: LPUSHX => client.lpushx(msg.key, msg.value)
case msg: LSET => client.lset(msg.key, msg.index, msg.value)
case msg: RPUSH => client.rpush(msg.key, msg.value)
case msg: RPUSHX => client.rpushx(msg.key, msg.value)
// Keys
case msg: DEL => client.del(msg.message)
case msg: EXPIRE => client.expire(msg.key, msg.seconds)
case msg: EXPIREAT => client.expireAt(msg.key, msg.timestamp)
case msg: MIGRATE => client.migrate(msg.host, msg.port, msg.key, msg.database, msg.timeout)
case msg: MOVE => client.move(msg.key, msg.db)
case msg: PERSIST => client.persist(msg.key)
case msg: PEXPIRE => client.pexpire(msg.key, msg.milliseconds)
case msg: PEXPIREAT => client.pexpireAt(msg.key, msg.timestamp)
case msg: RENAME => client.rename(msg.key, msg.newKey)
case msg: RENAMENX => client.renamenx(msg.key, msg.newKey)
// Sets
case msg: SADD => client.sadd(msg.key, msg.members)
case msg: SMOVE => client.smove(msg.source, msg.destination, msg.member)
case msg: SREM => client.srem(msg.key, msg.member)
// String
case msg: APPEND => client.append(msg.key, msg.value)
case msg: DECR => client.decr(msg.key)
case msg: DECRBY => client.decrBy(msg.key, msg.decrement)
case msg: INCR => client.incr(msg.key)
case msg: INCRBY => client.incrBy(msg.key, msg.increment)
case msg: INCRBYFLOAT => client.incrByFloat(msg.key, msg.increment)
case msg: SET => client.set(msg.key, msg.value)
case msg: SETBIT => client.setbit(msg.key, msg.offset, msg.value)
case msg: SETEX => client.setex(msg.key, msg.seconds, msg.value)
case msg: SETNX => client.setnx(msg.key, msg.value)
case msg: SETRANGE => client.setrange(msg.key, msg.offset, msg.value)
// Sorted Set
case msg: ZADD => client.zadd(msg.key, msg.score, msg.member)
case msg: ZINCRBY => client.zincrby(msg.key, msg.score, msg.member)
case msg: ZREM => client.zrem(msg.key, msg.member)
case msg: ZREMRANGEBYLEX => client.zremrangeByLex(msg.key, msg.min, msg.max)
case msg: ZREMRANGEBYRANK => client.zremrangeByRank(msg.key, msg.start, msg.stop)
case msg: ZREMRANGEBYSCORE => client.zremrangeByScore(msg.key, msg.min, msg.max)
case msg: ZSCORE => client.zscore(msg.key, msg.member)
}
}
override def close(): Unit = {
client.close()
}
}