blob: cb5f9ad71d32ddc216451288271c6ad5dbe8b8d8 [file] [log] [blame]
package kafka;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import kafka.zookeeper.ZooKeeperClient;
import org.I0Itec.zkclient.ZkClient;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class GeodeKafkaTestCluster {
@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
private static boolean debug = true;
private static ZooKeeperLocalCluster zooKeeperLocalCluster;
private static KafkaLocalCluster kafkaLocalCluster;
private static GeodeLocalCluster geodeLocalCluster;
@BeforeClass
public static void setup() throws IOException, QuorumPeerConfig.ConfigException, InterruptedException {
startZooKeeper();
startKafka();
startGeode();
createTopic();
startWorker();
}
@AfterClass
public static void shutdown() {
KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.deleteTopic("someTopic");
kafkaLocalCluster.stop();
geodeLocalCluster.stop();
}
private static void startWorker() {
Map props = new HashMap();
props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8888");
props.put("offset.storage.file.filename", "/tmp/connect.offsets");
// fast flushing for testing.
props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
props.put("internal.key.converter.schemas.enable", "false");
props.put("internal.value.converter.schemas.enable", "false");
props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
props.put("key.converter.schemas.enable", "false");
props.put("value.converter.schemas.enable", "false");
WorkerConfig workerCfg = new StandaloneConfig(props);
MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
offBackingStore.configure(workerCfg);
Worker worker = new Worker("WORKER_ID", new SystemTime(), new Plugins(props), workerCfg, offBackingStore, new AllConnectorClientConfigOverridePolicy());
worker.start();
Herder herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg), new AllConnectorClientConfigOverridePolicy());
herder.start();
Map<String, String> sourceProps = new HashMap<>();
sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
//
herder.putConnectorConfig(
sourceProps.get(ConnectorConfig.NAME_CONFIG),
sourceProps, true, (error, result)->{
System.out.println("CALLBACK: " + result);
});
System.out.println("Worker and herder started");
}
private static void createTopic() {
KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.createTopic("someTopic",3,1, new Properties(), RackAwareMode.Disabled$.MODULE$);
}
private ClientCache createGeodeClient() {
return new ClientCacheFactory().addPoolLocator("localhost", 10334).create();
}
private static void startZooKeeper() throws IOException, QuorumPeerConfig.ConfigException {
zooKeeperLocalCluster = new ZooKeeperLocalCluster(getZooKeeperProperties());
zooKeeperLocalCluster.start();
}
private static void startKafka() throws IOException, InterruptedException, QuorumPeerConfig.ConfigException {
kafkaLocalCluster = new KafkaLocalCluster(getKafkaConfig());
kafkaLocalCluster.start();
}
private static void startGeode() throws IOException, InterruptedException {
geodeLocalCluster = new GeodeLocalCluster();
geodeLocalCluster.start();
}
private static Properties getZooKeeperProperties() throws IOException {
Properties properties = new Properties();
properties.setProperty("dataDir", (debug)? "/tmp/zookeeper" :temporaryFolder.newFolder("zookeeper").getAbsolutePath());
properties.setProperty("clientPort", "2181");
properties.setProperty("tickTime", "2000");
return properties;
}
private static Properties getKafkaConfig() throws IOException {
int BROKER_PORT = 8888;
Properties props = new Properties();
props.put("broker.id", "0");
props.put("zookeeper.connect", "localhost:2181");
props.put("host.name", "localHost");
props.put("port", BROKER_PORT);
props.put("offsets.topic.replication.factor", "1");
props.put("log.dir", (debug)? "/tmp/kafka" : temporaryFolder.newFolder("kafka").getAbsolutePath());
props.put("log.flush.interval.messages", "1");
props.put("log.flush.interval.ms", "10");
//Connector configs
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
props.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
//Specifically GeodeKafka connector configs
/*
name=file-source
# The class implementing the connector
connector.class=FileStreamSource
# Maximum number of tasks to run for this connector instance
tasks.max=1
# The input file (path relative to worker's working directory)
# This is the only setting specific to the FileStreamSource
file=test.txt
# The output topic in Kafka
topic=connect-test
*/
return props;
}
@Test
public void testX() throws InterruptedException {
ClientCache client = createGeodeClient();
Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create("someRegion");
region.put("JASON KEY", "JASON VALUE");
System.out.println("TEST COMPLETE!");
}
}