blob: 45827505150495be1f7f8b758d4be7dac83b9f08 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata;
import static org.testng.Assert.assertTrue;
import io.etcd.jetcd.launcher.EtcdCluster;
import io.etcd.jetcd.launcher.EtcdClusterFactory;
import java.io.File;
import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.pulsar.tests.TestRetrySupport;
import org.assertj.core.util.Files;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
public abstract class BaseMetadataStoreTest extends TestRetrySupport {
protected TestZKServer zks;
protected EtcdCluster etcdCluster;
@BeforeClass(alwaysRun = true)
@Override
public final void setup() throws Exception {
incrementSetupNumber();
zks = new TestZKServer();
etcdCluster = EtcdClusterFactory.buildCluster("test", 1, false);
etcdCluster.start();
}
@AfterClass(alwaysRun = true)
@Override
public final void cleanup() throws Exception {
markCurrentSetupNumberCleaned();
if (zks != null) {
zks.close();
zks = null;
}
if (etcdCluster != null) {
etcdCluster.close();
}
}
private static String createTempFolder() {
File temp = Files.newTemporaryFolder();
temp.deleteOnExit();
return temp.getAbsolutePath();
}
@DataProvider(name = "impl")
public Object[][] implementations() {
// A Supplier<String> must be used for the Zookeeper connection string parameter. The retried test run will
// use the same arguments as the failed attempt.
// The Zookeeper test server gets restarted by TestRetrySupport before the retry.
// The new connection string won't be available to the test method unless a
// Supplier<String> lambda is used for providing the value.
return new Object[][]{
{ "ZooKeeper", stringSupplier(() -> zks.getConnectionString()) },
{ "Memory", stringSupplier(() -> "memory://" + UUID.randomUUID()) },
{ "RocksDB", stringSupplier(() -> "rocksdb://" + createTempFolder()) },
{"Etcd", stringSupplier(() -> "etcd:" + etcdCluster.getClientEndpoints().stream().map(x -> x.toString())
.collect(Collectors.joining(",")))},
};
}
public static Supplier<String> stringSupplier(Supplier<String> supplier) {
return supplier;
}
protected String newKey() {
return "/key-" + System.nanoTime();
}
static void assertException(CompletionException e, Class<?> clazz) {
assertException(e.getCause(), clazz);
}
static void assertException(Throwable t, Class<?> clazz) {
assertTrue(clazz.isInstance(t), String.format("Exception %s is not of type %s", t.getClass(), clazz));
}
}