blob: d2637f7dfc7ee7e5c769f1b04d91597dabfe2691 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.redis;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
import org.apache.flink.util.TestLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisSentinelPool;
import redis.embedded.RedisCluster;
import redis.embedded.util.JedisUtil;
import java.io.IOException;
import java.util.Set;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class RedisSentinelClusterTest extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(RedisSentinelClusterTest.class);
private static RedisCluster cluster;
private static final String REDIS_MASTER = "master";
private static final String TEST_KEY = "testKey";
private static final String TEST_VALUE = "testValue";
private JedisSentinelPool jedisSentinelPool;
private FlinkJedisSentinelConfig jedisSentinelConfig;
@BeforeClass
public static void setUpCluster(){
cluster = RedisCluster.builder().ephemeralSentinels().quorumSize(1)
.replicationGroup(REDIS_MASTER, 1)
.build();
cluster.start();
LOG.info("Started redis cluster {}", cluster);
}
@Before
public void setUp() {
Set<String> hosts = JedisUtil.sentinelHosts(cluster);
jedisSentinelConfig = new FlinkJedisSentinelConfig.Builder().setMasterName(REDIS_MASTER)
.setSentinels(hosts).build();
jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
jedisSentinelConfig.getSentinels());
}
@Test
public void testRedisSentinelOperation() {
RedisCommandsContainer redisContainer = RedisCommandsContainerBuilder.build(jedisSentinelConfig);
Jedis jedis = null;
try{
jedis = jedisSentinelPool.getResource();
redisContainer.set(TEST_KEY, TEST_VALUE);
assertEquals(TEST_VALUE, jedis.get(TEST_KEY));
}finally {
if (jedis != null){
jedis.close();
}
}
}
@After
public void tearDown() throws IOException {
if (jedisSentinelPool != null) {
jedisSentinelPool.close();
}
}
@AfterClass
public static void tearDownCluster() throws IOException {
if (cluster != null) {
cluster.stop();
LOG.info("Stopped redis cluster");
}
}
}