end to end test "passes" Not sure why we dont get all the events or don't get any events until enough polls are called
diff --git a/build.gradle b/build.gradle
index c9500b0..87a2c36 100644
--- a/build.gradle
+++ b/build.gradle
@@ -13,8 +13,8 @@
dependencies {
- compile 'org.apache.geode:geode-core:1.11.0'
- compile 'org.apache.geode:geode-cq:1.11.0'
+ compile 'org.apache.geode:geode-core:1.10.0'
+ compile 'org.apache.geode:geode-cq:1.10.0'
compile(group: 'org.apache.kafka', name: 'connect-api', version: '2.3.1')
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0'
@@ -27,4 +27,7 @@
testCompile group: 'junit', name: 'junit', version: '4.12'
+ testImplementation 'org.awaitility:awaitility:4.0.2'
+
+
}
diff --git a/src/main/java/kafka/GeodeKafkaSource.java b/src/main/java/kafka/GeodeKafkaSource.java
index 8b88e81..95afc50 100644
--- a/src/main/java/kafka/GeodeKafkaSource.java
+++ b/src/main/java/kafka/GeodeKafkaSource.java
@@ -10,11 +10,23 @@
import java.util.List;
import java.util.Map;
-public class GeodeKafkaSource extends SourceConnector {
+import static kafka.GeodeConnectorConfig.BATCH_SIZE;
+import static kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
+import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
+import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
+import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
+import static kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
+import static kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
+import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
+import static kafka.GeodeConnectorConfig.LOCATORS;
+import static kafka.GeodeConnectorConfig.QUEUE_SIZE;
+import static kafka.GeodeConnectorConfig.REGIONS;
+import static kafka.GeodeConnectorConfig.TOPICS;
+import static kafka.GeodeKafkaSourceTask.TASK_ID;
- public static String REGION_NAME = "GEODE_REGION_NAME";
- private String regionName;
- private static String TOPICS = "TOPICS";
+public class GeodeKafkaSource extends SourceConnector {
private Map<String, String> sharedProps;
private static final ConfigDef CONFIG_DEF = new ConfigDef();
@@ -34,9 +46,11 @@
taskProps.putAll(sharedProps);
// use the same props for all tasks at the moment
- for (int i = 0; i < maxTasks; i++)
+ for (int i = 0; i < maxTasks; i++) {
+ //TODO partition regions and topics
+ taskProps.put(TASK_ID, "" + i);
taskConfigs.add(taskProps);
-
+ }
return taskConfigs;
}
@@ -48,7 +62,17 @@
@Override
public void start(Map<String, String> props) {
- sharedProps = props;
+ sharedProps = computeMissingConfigurations(props);
+ }
+
+ private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
+ props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR);
+ props.computeIfAbsent(DURABLE_CLIENT_TIME_OUT, (key) -> DEFAULT_DURABLE_CLIENT_TIMEOUT);
+ props.computeIfAbsent(DURABLE_CLIENT_ID_PREFIX, (key) -> DEFAULT_DURABLE_CLIENT_ID);
+ props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE);
+ props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE);
+ props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX);
+ return props;
}
@Override
@@ -60,4 +84,8 @@
public String version() {
return AppInfoParser.getVersion();
}
+
+ public Map<String, String> getSharedProps() {
+ return sharedProps;
+ }
}
diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java b/src/main/java/kafka/GeodeKafkaSourceTask.java
index 9608f07..c463e6f 100644
--- a/src/main/java/kafka/GeodeKafkaSourceTask.java
+++ b/src/main/java/kafka/GeodeKafkaSourceTask.java
@@ -4,134 +4,196 @@
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
-import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
-import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static kafka.GeodeConnectorConfig.BATCH_SIZE;
+import static kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
+import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
+import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
+import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
+import static kafka.GeodeConnectorConfig.QUEUE_SIZE;
+import static kafka.GeodeConnectorConfig.REGION_NAME;
public class GeodeKafkaSourceTask extends SourceTask {
- private static String REGION_NAME = "REGION_NAME";
- private static String OFFSET = "OFFSET";
- private static String topics[];
- private int batchSize;
- private int queueSize;
- private static BlockingQueue<CqEvent> eventBuffer;
- private Map<String, String> sourcePartition;
- private Map<String, Long> offset;
- private ClientCache clientCache;
+ //property string to pass in to identify task
+ public static final String TASK_ID = "GEODE_TASK_ID";
+ private static final String TASK_PREFIX = "TASK";
+ private static final String DOT = ".";
+ private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
- @Override
- public String version() {
- return null;
- }
+ private int taskId;
+ private ClientCache clientCache;
+ private List<String> regionNames;
+ private List<String> topics;
+ private Map<String, Map<String, String>> sourcePartitions;
+ private static BlockingQueue<GeodeEvent> eventBuffer;
+ private int batchSize;
- @Override
- public void start(Map<String, String> props) {
- System.out.println("JASON task start");
- batchSize = 100;
- queueSize = 100000;
- String regionName = "someRegion";
- eventBuffer = new LinkedBlockingQueue<>(queueSize);
- topics = new String[] {"someTopic"};
- sourcePartition = new HashMap<>();
- sourcePartition.put(REGION_NAME, regionName);
- offset = new HashMap<>();
- offset.put("OFFSET", 0L);
-
- installOnGeode("localHost", 10334, "someRegion");
- System.out.println("JASON task start end");
- }
-
- @Override
- public List<SourceRecord> poll() throws InterruptedException {
-// System.out.println("JASON polling");
- ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
- ArrayList<CqEvent> events = new ArrayList<>(batchSize);
- if (eventBuffer.drainTo(events, batchSize) > 0) {
- for (CqEvent event : events) {
-
- for (String topic : topics)
- records.add(new SourceRecord(sourcePartition, offset, topic, null, event));
- }
-
- System.out.println("JASON we polled and returning records" + records.size());
- return records;
+ private static Map<String, Long> createOffset() {
+ Map<String, Long> offset = new HashMap<>();
+ offset.put("OFFSET", 0L);
+ return offset;
}
-// System.out.println("JASON we didn't poll any records");
- return null;
- }
-
- @Override
- public void stop() {
- clientCache.close(true);
- }
-
- private void installOnGeode(String locatorHost, int locatorPort, String regionName) {
- clientCache = new ClientCacheFactory().set("durable-client-id", "someClient")
- .set("durable-client-timeout", "200")
- .setPoolSubscriptionEnabled(true).addPoolLocator(locatorHost, locatorPort).create();
- CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
- cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener());
- System.out.println("JASON installing on Geode");
- CqAttributes cqAttributes = cqAttributesFactory.create();
- try {
- System.out.println("JASON installing new cq");
- clientCache.getQueryService().newCq("kafkaCQFor" + regionName, "select * from /" + regionName, cqAttributes,
- true).execute();
- System.out.println("JASON finished installing cq");
- } catch (CqExistsException e) {
- System.out.println("UHH");
- e.printStackTrace();
- } catch (CqException | RegionNotFoundException e) {
- System.out.println("UHH e");
- e.printStackTrace();
- }
- catch (Exception e) {
- System.out.println("UHHHHHH " + e);
- }
- System.out.println("JASON task calling ready for events");
- clientCache.readyForEvents();
- System.out.println("JASON task ready for events");
- }
-
- private static class GeodeKafkaSourceListener implements CqListener {
-
@Override
- public void onEvent(CqEvent aCqEvent) {
- try {
- System.out.println("JASON cqEvent and putting into eventBuffer");
- eventBuffer.offer(aCqEvent, 2, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
+ public String version() {
+ return null;
+ }
- while (true) {
- try {
- if (!eventBuffer.offer(aCqEvent, 2, TimeUnit.SECONDS))
- break;
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- System.out.println("GeodeKafkaSource Queue is full");
+ @Override
+ public void start(Map<String, String> props) {
+ try {
+ System.out.println("JASON task start");
+ taskId = Integer.parseInt(props.get(TASK_ID));
+ batchSize = Integer.parseInt(props.get(BATCH_SIZE));
+ int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
+ eventBuffer = new LinkedBlockingQueue<>(queueSize);
+
+ //grouping will be done in the source and not the task
+ regionNames = parseNames(props.get(GeodeConnectorConfig.REGIONS));
+ topics = parseNames(props.get(GeodeConnectorConfig.TOPICS));
+ sourcePartitions = createSourcePartitionsMap(regionNames);
+
+ String durableClientId = props.get(DURABLE_CLIENT_ID_PREFIX);
+ if (!durableClientId.equals("")) {
+ durableClientId += taskId;
+ }
+ System.out.println("JASON durable client id is:" + durableClientId);
+ String durableClientTimeout = props.get(DURABLE_CLIENT_TIME_OUT);
+ String cqPrefix = props.get(CQ_PREFIX);
+
+ List<LocatorHostPort> locators = parseLocators(props.get(GeodeConnectorConfig.LOCATORS));
+ installOnGeode(taskId, eventBuffer, locators, regionNames, durableClientId, durableClientTimeout, cqPrefix);
+ System.out.println("JASON task start finished");
}
- }
+ catch (Exception e) {
+ System.out.println("Exception:" + e);
+ e.printStackTrace();
+ throw e;
+ }
}
@Override
- public void onError(CqEvent aCqEvent) {
+ public List<SourceRecord> poll() throws InterruptedException {
+ ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
+ ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
+ if (eventBuffer.drainTo(events, batchSize) > 0) {
+ for (GeodeEvent event : events) {
+ for (String topic : topics) {
+ records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent()));
+// records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, "STRING"));
+ }
+ }
+ return records;
+ }
+
+ return null;
}
- }
+
+ @Override
+ public void stop() {
+ clientCache.close(true);
+ }
+
+ ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) {
+ ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName)
+ .set("durable-client-timeout", durableClientTimeOut)
+ .setPoolSubscriptionEnabled(true);
+ for (LocatorHostPort locator: locators) {
+ ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create();
+ }
+ return ccf.create();
+ }
+
+ void installOnGeode(int taskId, BlockingQueue<GeodeEvent> eventBuffer, List<LocatorHostPort> locators, List<String> regionNames, String durableClientId, String durableClientTimeout, String cqPrefix) {
+ boolean isDurable = isDurable(durableClientId);
+
+ clientCache = createClientCache(locators, durableClientId, durableClientTimeout);
+ for (String region : regionNames) {
+ installListenersToRegion(taskId, eventBuffer, region, cqPrefix, isDurable);
+ }
+ if (isDurable) {
+ clientCache.readyForEvents();
+ }
+ }
+
+ void installListenersToRegion(int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) {
+ CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
+ cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName));
+ System.out.println("JASON installing on Geode");
+ CqAttributes cqAttributes = cqAttributesFactory.create();
+ try {
+ System.out.println("JASON installing new cq");
+ clientCache.getQueryService().newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
+ isDurable).execute();
+ System.out.println("JASON finished installing cq");
+ } catch (CqExistsException e) {
+ System.out.println("UHH");
+ e.printStackTrace();
+ } catch (CqException | RegionNotFoundException e) {
+ System.out.println("UHH e");
+ e.printStackTrace();
+ } catch (Exception e) {
+ System.out.println("UHHHHHH " + e);
+ }
+ }
+
+
+ List<String> parseNames(String names) {
+ return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
+ }
+
+ List<LocatorHostPort> parseLocators(String locators) {
+ return Arrays.stream(locators.split(",")).map((s) -> {
+ String locatorString = s.trim();
+ return parseLocator(locatorString);
+ }).collect(Collectors.toList());
+ }
+
+ private LocatorHostPort parseLocator(String locatorString) {
+ String[] splits = locatorString.split("\\[");
+ String locator = splits[0];
+ int port = Integer.parseInt(splits[1].replace("]", ""));
+ return new LocatorHostPort(locator, port);
+ }
+
+ boolean isDurable(String durableClientId) {
+ return !durableClientId.equals("");
+ }
+
+ String generateCqName(int taskId, String cqPrefix, String regionName) {
+ return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
+ }
+
+ /**
+ * converts a list of regions names into a map of source partitions
+ *
+ * @param regionNames list of regionNames
+ * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
+ */
+ Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) {
+ return regionNames.stream().map(regionName -> {
+ Map<String, String> sourcePartition = new HashMap<>();
+ sourcePartition.put(REGION_NAME, regionName);
+ return sourcePartition;
+ }).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s));
+ }
}
diff --git a/src/test/java/kafka/GeodeKafkaTestCluster.java b/src/test/java/kafka/GeodeKafkaTestCluster.java
index 2b0293f..85c0fc4 100644
--- a/src/test/java/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/kafka/GeodeKafkaTestCluster.java
@@ -1,11 +1,8 @@
package kafka;
-import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
-import kafka.zookeeper.ZooKeeperClient;
-import org.I0Itec.zkclient.ZkClient;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
@@ -16,20 +13,9 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
-import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.isolation.Plugins;
-import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
-import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
-import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
-import org.apache.kafka.connect.storage.StringConverter;
-import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -40,10 +26,11 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.awaitility.Awaitility.await;
public class GeodeKafkaTestCluster {
@@ -51,6 +38,9 @@
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
private static boolean debug = true;
+ public static String TEST_TOPICS = "someTopic";
+ public static String TEST_REGIONS = "someRegion";
+
private static ZooKeeperLocalCluster zooKeeperLocalCluster;
private static KafkaLocalCluster kafkaLocalCluster;
private static GeodeLocalCluster geodeLocalCluster;
@@ -63,7 +53,7 @@
startKafka();
startGeode();
createTopic();
- Thread.sleep(5000);
+
startWorker();
consumer = createConsumer();
Thread.sleep(5000);
@@ -74,9 +64,8 @@
workerAndHerderCluster.stop();
KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
-
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
- adminZkClient.deleteTopic("someTopic");
+ adminZkClient.deleteTopic(TEST_TOPICS);
kafkaLocalCluster.stop();
geodeLocalCluster.stop();
@@ -92,8 +81,12 @@
private static void createTopic() {
KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
+
+ Properties topicProperties = new Properties();
+ topicProperties.put("flush.messages", "1");
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
- adminZkClient.createTopic("someTopic",3,1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ adminZkClient.createTopic(TEST_TOPICS,3
+ ,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
}
private ClientCache createGeodeClient() {
@@ -125,15 +118,15 @@
private static Properties getKafkaConfig() throws IOException {
- int BROKER_PORT = 8888;
+ int BROKER_PORT = 9092;
Properties props = new Properties();
props.put("broker.id", "0");
+ props.put("log4j.configuration", "/Users/jhuynh/Pivotal/kafka/config/connect-log4j.properties");
props.put("zookeeper.connect", "localhost:2181");
props.put("host.name", "localHost");
props.put("port", BROKER_PORT);
props.put("offsets.topic.replication.factor", "1");
- props.put("log.dir", (debug)? "/tmp/kafka" : temporaryFolder.newFolder("kafka").getAbsolutePath());
props.put("log.flush.interval.messages", "1");
props.put("log.flush.interval.ms", "10");
@@ -144,22 +137,6 @@
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
//Specifically GeodeKafka connector configs
-
-
-
-/*
-name=file-source
-# The class implementing the connector
-connector.class=FileStreamSource
-# Maximum number of tasks to run for this connector instance
-tasks.max=1
-# The input file (path relative to worker's working directory)
-# This is the only setting specific to the FileStreamSource
-file=test.txt
-# The output topic in Kafka
-topic=connect-test
- */
-
return props;
}
@@ -167,37 +144,38 @@
//consumer props, less important, just for testing?
public static Consumer<String,String> createConsumer() {
final Properties props = new Properties();
- props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8888");
+ props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"myGroup");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- // Create the consumer using props.
+
+ // Create the consumer using props.
final Consumer<String, String> consumer =
new KafkaConsumer<>(props);
// Subscribe to the topic.
- consumer.subscribe(Collections.singletonList("someTopic"));
+ consumer.subscribe(Collections.singletonList(TEST_TOPICS));
return consumer;
}
@Test
- public void testX() throws InterruptedException {
+ public void endToEndSourceTest() {
ClientCache client = createGeodeClient();
- Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create("someRegion");
- region.put("JASON KEY", "JASON VALUE");
- System.out.println("PUT COMPLETE!");
- region.get("JASON KEY");
-// client.close();
+ Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS);
-
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
- for (ConsumerRecord<String, String> record: records) {
- System.out.println("JASON we consumed a record:" + record);
- }
- System.out.println("TEST COMPLETE!");
-
+ //right now just verify something makes it end to end
+ AtomicInteger valueReceived = new AtomicInteger(0);
+ await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ region.put("KEY", "VALUE" + System.currentTimeMillis());
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(4));
+ for (ConsumerRecord<String, String> record: records) {
+// System.out.println("WE consumed a record:" + record);
+ valueReceived.incrementAndGet();
+ }
+ return valueReceived.get() > 0;
+ });
}
}
diff --git a/src/test/java/kafka/LocatorLauncherWrapper.java b/src/test/java/kafka/LocatorLauncherWrapper.java
index b4340c4..c1a7075 100644
--- a/src/test/java/kafka/LocatorLauncherWrapper.java
+++ b/src/test/java/kafka/LocatorLauncherWrapper.java
@@ -16,7 +16,8 @@
// String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath();
// properties.setProperty(ConfigurationPropert/**/ies.STATISTIC_ARCHIVE_FILE, statsFile);
properties.setProperty(ConfigurationProperties.NAME, "locator1");
- Locator.startLocatorAndDS(10334, new File("/Users/jhuynh/Pivotal/geode-kafka-connector/"), properties);
+
+ Locator.startLocatorAndDS(10334, new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log"), properties);
while (true) {
}
diff --git a/src/test/java/kafka/ServerLauncherWrapper.java b/src/test/java/kafka/ServerLauncherWrapper.java
index 7493d6b..933824e 100644
--- a/src/test/java/kafka/ServerLauncherWrapper.java
+++ b/src/test/java/kafka/ServerLauncherWrapper.java
@@ -37,7 +37,7 @@
.set(ConfigurationProperties.LOCATORS, locatorString)
.set(ConfigurationProperties.NAME,
"server-1")
- .set(ConfigurationProperties.LOG_FILE, "/Users/jhuynh/Pivotal/geode-kafka-connector/")
+ .set(ConfigurationProperties.LOG_FILE, "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log")
.set(ConfigurationProperties.LOG_LEVEL, "info")
// .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile)
.create();