blob: b6da8db66ae1cce3fe28be682ea4a5f6086eb927 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32;
import static org.apache.beam.sdk.schemas.Schema.toSchema;
import com.alibaba.fastjson.JSON;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
/**
* This is an integration test for KafkaCSVTable. There should be a kafka server running and the
* address should be passed to it. (https://issues.apache.org/jira/projects/BEAM/issues/BEAM-7523)
*/
@Ignore("https://issues.apache.org/jira/projects/BEAM/issues/BEAM-7523")
public class KafkaCSVTableIT {
@Rule public transient TestPipeline pipeline = TestPipeline.create();
private static final Schema TEST_TABLE_SCHEMA =
Schema.builder()
.addNullableField("order_id", Schema.FieldType.INT32)
.addNullableField("member_id", Schema.FieldType.INT32)
.addNullableField("item_name", Schema.FieldType.INT32)
.build();
@BeforeClass
public static void prepare() {
PipelineOptionsFactory.register(KafkaOptions.class);
}
@Test
@SuppressWarnings("FutureReturnValueIgnored")
public void testFake2() throws BeamKafkaTable.NoEstimationException {
KafkaOptions kafkaOptions = pipeline.getOptions().as(KafkaOptions.class);
Table table =
Table.builder()
.name("kafka_table")
.comment("kafka" + " table")
.location("")
.schema(
Stream.of(
Schema.Field.nullable("order_id", INT32),
Schema.Field.nullable("member_id", INT32),
Schema.Field.nullable("item_name", INT32))
.collect(toSchema()))
.type("kafka")
.properties(JSON.parseObject(getKafkaPropertiesString(kafkaOptions)))
.build();
BeamKafkaTable kafkaTable = (BeamKafkaTable) new KafkaTableProvider().buildBeamSqlTable(table);
produceSomeRecordsWithDelay(100, 20);
double rate1 = kafkaTable.computeRate(20);
produceSomeRecordsWithDelay(100, 10);
double rate2 = kafkaTable.computeRate(20);
Assert.assertTrue(rate2 > rate1);
}
private String getKafkaPropertiesString(KafkaOptions kafkaOptions) {
return "{ \"bootstrap.servers\" : \""
+ kafkaOptions.getKafkaBootstrapServerAddress()
+ "\",\"topics\":[\""
+ kafkaOptions.getKafkaTopic()
+ "\"] }";
}
static final transient Map<Long, Boolean> FLAG = new ConcurrentHashMap<>();
@Test
public void testFake() throws InterruptedException {
KafkaOptions kafkaOptions = pipeline.getOptions().as(KafkaOptions.class);
pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
String createTableString =
"CREATE EXTERNAL TABLE kafka_table(\n"
+ "order_id INTEGER, \n"
+ "member_id INTEGER, \n"
+ "item_name INTEGER \n"
+ ") \n"
+ "TYPE 'kafka' \n"
+ "LOCATION '"
+ "'\n"
+ "TBLPROPERTIES '"
+ getKafkaPropertiesString(kafkaOptions)
+ "'";
TableProvider tb = new KafkaTableProvider();
BeamSqlEnv env = BeamSqlEnv.inMemory(tb);
env.executeDdl(createTableString);
PCollection<Row> queryOutput =
BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM kafka_table"));
queryOutput
.apply(ParDo.of(new FakeKvPair()))
.apply(
"waitForSuccess",
ParDo.of(
new StreamAssertEqual(
ImmutableSet.of(
row(TEST_TABLE_SCHEMA, 0, 1, 0),
row(TEST_TABLE_SCHEMA, 1, 2, 1),
row(TEST_TABLE_SCHEMA, 2, 3, 2)))));
queryOutput.apply(logRecords(""));
pipeline.run();
TimeUnit.MILLISECONDS.sleep(3000);
produceSomeRecords(3);
for (int i = 0; i < 200; i++) {
if (FLAG.getOrDefault(pipeline.getOptions().getOptionsId(), false)) {
return;
}
TimeUnit.MILLISECONDS.sleep(60);
}
Assert.fail();
}
private static MapElements<Row, Void> logRecords(String suffix) {
return MapElements.via(
new SimpleFunction<Row, Void>() {
@Override
public @Nullable Void apply(Row input) {
System.out.println(input.getValues() + suffix);
return null;
}
});
}
/** This is made because DoFn with states should get KV as input. */
public static class FakeKvPair extends DoFn<Row, KV<String, Row>> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of("fake_key", c.element()));
}
}
/** This DoFn will set a flag if all the elements are seen. */
public static class StreamAssertEqual extends DoFn<KV<String, Row>, Void> {
private final Set<Row> expected;
StreamAssertEqual(Set<Row> expected) {
super();
this.expected = expected;
}
@DoFn.StateId("seenValues")
private final StateSpec<BagState<Row>> seenRows = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId("seenValues") BagState<Row> seenValues,
@StateId("count") ValueState<Integer> countState) {
// I don't think doing this will be safe in parallel
int count = MoreObjects.firstNonNull(countState.read(), 0);
count = count + 1;
countState.write(count);
seenValues.add(context.element().getValue());
if (count >= expected.size()) {
if (StreamSupport.stream(seenValues.read().spliterator(), false)
.collect(Collectors.toSet())
.containsAll(expected)) {
System.out.println("in second if");
FLAG.put(context.getPipelineOptions().getOptionsId(), true);
}
}
}
}
private Row row(Schema schema, Object... values) {
return Row.withSchema(schema).addValues(values).build();
}
@SuppressWarnings("FutureReturnValueIgnored")
private void produceSomeRecords(int num) {
Producer<String, String> producer = new KafkaProducer<String, String>(producerProps());
String topicName = pipeline.getOptions().as(KafkaOptions.class).getKafkaTopic();
for (int i = 0; i < num; i++) {
producer.send(
new ProducerRecord<String, String>(
topicName, "k" + i, i + "," + ((i % 3) + 1) + "," + i));
}
producer.flush();
producer.close();
}
@SuppressWarnings("FutureReturnValueIgnored")
private void produceSomeRecordsWithDelay(int num, int delayMilis) {
Producer<String, String> producer = new KafkaProducer<String, String>(producerProps());
String topicName = pipeline.getOptions().as(KafkaOptions.class).getKafkaTopic();
for (int i = 0; i < num; i++) {
producer.send(
new ProducerRecord<String, String>(
topicName, "k" + i, i + "," + ((i % 3) + 1) + "," + i));
try {
TimeUnit.MILLISECONDS.sleep(delayMilis);
} catch (InterruptedException e) {
throw new RuntimeException("Could not wait for producing", e);
}
}
producer.flush();
producer.close();
}
private Properties producerProps() {
KafkaOptions options = pipeline.getOptions().as(KafkaOptions.class);
Properties props = new Properties();
props.put("bootstrap.servers", options.getKafkaBootstrapServerAddress());
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("buffer.memory", 33554432);
props.put("acks", "all");
props.put("request.required.acks", "1");
props.put("retries", 0);
props.put("linger.ms", 1);
return props;
}
/** Pipeline options specific for this test. */
public interface KafkaOptions extends PipelineOptions {
@Description("Kafka server address")
@Validation.Required
@Default.String("localhost:9092")
String getKafkaBootstrapServerAddress();
void setKafkaBootstrapServerAddress(String address);
@Description("Kafka topic")
@Validation.Required
@Default.String("test")
String getKafkaTopic();
void setKafkaTopic(String topic);
}
}