| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samoa.streams.kafka; |
| |
| import java.util.Properties; |
| import java.util.Random; |
| import org.apache.samoa.instances.Attribute; |
| import org.apache.samoa.instances.DenseInstance; |
| import org.apache.samoa.instances.Instance; |
| import org.apache.samoa.instances.Instances; |
| import org.apache.samoa.instances.InstancesHeader; |
| import org.apache.samoa.learners.InstanceContentEvent; |
| import org.apache.samoa.moa.core.FastVector; |
| |
| /** |
| * |
| * @author pwawrzyniak |
| */ |
| public class TestUtilsForKafka { |
| |
| protected static InstanceContentEvent getData(Random instanceRandom, int numAtts, InstancesHeader header) { |
| double[] attVals = new double[numAtts + 1]; |
| double sum = 0.0; |
| double sumWeights = 0.0; |
| for (int i = 0; i < numAtts; i++) { |
| attVals[i] = instanceRandom.nextDouble(); |
| |
| } |
| int classLabel; |
| if (sum >= sumWeights * 0.5) { |
| classLabel = 1; |
| } else { |
| classLabel = 0; |
| } |
| |
| Instance inst = new DenseInstance(1.0, attVals); |
| inst.setDataset(header); |
| inst.setClassValue(classLabel); |
| |
| return new InstanceContentEvent(0, inst, true, false); |
| } |
| |
| @SuppressWarnings({"rawtypes", "unchecked"}) |
| protected static InstancesHeader generateHeader(int numAttributes) { |
| FastVector attributes = new FastVector(); |
| for (int i = 0; i < numAttributes; i++) { |
| attributes.addElement(new Attribute("att" + (i + 1))); |
| } |
| |
| FastVector classLabels = new FastVector(); |
| for (int i = 0; i < numAttributes; i++) { |
| classLabels.addElement("class" + (i + 1)); |
| } |
| attributes.addElement(new Attribute("class", classLabels)); |
| InstancesHeader streamHeader = new InstancesHeader(new Instances("test-kafka", attributes, 0)); |
| streamHeader.setClassIndex(streamHeader.numAttributes() - 1); |
| return streamHeader; |
| } |
| |
| |
| protected static Properties getProducerProperties(String BROKERHOST, String BROKERPORT) { |
| return getProducerProperties("test", BROKERHOST, BROKERPORT); |
| } |
| |
| /** |
| * |
| * @param clientId |
| * @return |
| */ |
| protected static Properties getProducerProperties(String clientId, String BROKERHOST, String BROKERPORT) { |
| Properties producerProps = new Properties(); |
| producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); |
| producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); |
| producerProps.setProperty("group.id", "test"); |
| producerProps.setProperty("client.id", clientId); |
| return producerProps; |
| } |
| |
| protected static Properties getConsumerProperties(String BROKERHOST, String BROKERPORT) { |
| Properties consumerProps = new Properties(); |
| consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); |
| consumerProps.put("enable.auto.commit", "true"); |
| consumerProps.put("auto.commit.interval.ms", "1000"); |
| consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); |
| consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); |
| consumerProps.setProperty("group.id", "test"); |
| consumerProps.setProperty("auto.offset.reset", "earliest"); |
| return consumerProps; |
| } |
| |
| protected static Properties getConsumerProducerProperties(String BROKERHOST, String BROKERPORT) { |
| Properties props = new Properties(); |
| props.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); |
| props.put("enable.auto.commit", "true"); |
| props.put("auto.commit.interval.ms", "1000"); |
| props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); |
| props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); |
| props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); |
| props.setProperty("group.id", "burrito"); |
| props.setProperty("auto.offset.reset", "earliest"); |
| props.setProperty("client.id", "burrito"); |
| return props; |
| } |
| } |