blob: 42a00718b569e001cfe10096f6f4d2586c907204 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.ceresdb;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.ceresdb.common.util.MetricsUtil;
import io.ceresdb.common.util.UnsignedUtil;
import io.ceresdb.common.util.internal.ThrowUtil;
import io.ceresdb.models.Err;
import io.ceresdb.models.FieldValue;
import io.ceresdb.models.QueryOk;
import io.ceresdb.models.QueryRequest;
import io.ceresdb.models.Record;
import io.ceresdb.models.Result;
import io.ceresdb.models.Rows;
import io.ceresdb.models.Series;
import io.ceresdb.models.SqlResult;
import io.ceresdb.models.TagValue;
import io.ceresdb.models.WriteOk;
import io.ceresdb.options.CeresDBOptions;
import io.ceresdb.rpc.RpcOptions;
/**
*
* @author jiachun.fjc
*/
public class CeresDBTest {
private static final Logger LOG = LoggerFactory.getLogger(CeresDBTest.class);
private String TEST_TABLE_NAME = "all_type_test_table_%d";
private CeresDBOptions opts;
private CeresDBClient client;
@Before
public void before() {
final RpcOptions rpcOpts = RpcOptions.newDefault();
rpcOpts.setBlockOnLimit(false);
rpcOpts.setInitialLimit(32);
rpcOpts.setLimitKind(RpcOptions.LimitKind.Gradient);
rpcOpts.setLogOnLimitChange(true);
this.opts = CeresDBOptions.newBuilder("127.0.0.1", 8831, 5000) //
.tenant("public", "sub_test", "test_token") //
.rpcOptions(rpcOpts) //
.writeMaxRetries(0) //
.readMaxRetries(1) //
.build();
this.client = new CeresDBClient();
final boolean ret = this.client.init(this.opts);
TEST_TABLE_NAME = String.format(TEST_TABLE_NAME, System.currentTimeMillis());
final Management management = this.client.management();
final SqlResult existsResult = management.executeSql("EXISTS TABLE %s", TEST_TABLE_NAME);
LOG.info("EXISTS TABLE before: {}.", existsResult);
final SqlResult result = management.executeSql(String.format("CREATE TABLE %s(" + //
"ts TIMESTAMP NOT NULL," + //
"c1 STRING TAG NOT NULL," + //
"c2 INT64 TAG NULL," + //
"c3 DOUBLE NULL," + //
"c4 STRING NULL," + //
"c5 INT64 NULL," + //
"c6 FLOAT NULL," + //
"c7 INT32 NULL," + //
"c8 INT16 NULL," + //
"c9 INT8 NULL," + //
"c10 BOOLEAN NULL," + //
"c11 UINT64 NULL," + //
"c12 UINT32 NULL," + //
"c13 UINT16 NULL," + //
"c14 UINT8 NULL," + //
"c15 TIMESTAMP NULL," + //
"c16 VARBINARY NULL," + //
"TIMESTAMP KEY(ts)) ENGINE=Analytic WITH (ttl='7d')",
TEST_TABLE_NAME));
LOG.info("Start CeresDB client {}, with options: {}, create table {}: {}.", result(ret), this.opts,
TEST_TABLE_NAME, result);
final SqlResult existsResult2 = management.executeSql("EXISTS TABLE %s", TEST_TABLE_NAME);
LOG.info("EXISTS TABLE after: {}.", existsResult2);
}
private static String result(final boolean result) {
return result ? "success" : "failed";
}
@After
public void after() {
final SqlResult descResult = this.client.management().executeSql("DROP TABLE %s", TEST_TABLE_NAME);
LOG.info("DROP TABLE: {}.", descResult);
MetricsUtil.reportImmediately();
this.client.shutdownGracefully();
}
public CeresDBOptions getOpts() {
return opts;
}
@Ignore
@Test
public void comprehensiveTest() throws ExecutionException, InterruptedException {
final Calendar time = Calendar.getInstance();
final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSZ");
final String timeString = format.format(time.getTime());
System.out.println("time=" + timeString + " " + time.getTimeInMillis() + " " + time.getTime());
final int writeCount = ThreadLocalRandom.current().nextInt(10);
final Result<WriteOk, Err> writeR = write(time, writeCount);
LOG.info("#comprehensiveTest write result={}.", writeR);
Assert.assertTrue(writeR.isOk());
final QueryRequest req = QueryRequest.newBuilder() //
.ql("select * from %s where ts < to_timestamp_millis('%s')", TEST_TABLE_NAME, timeString) //
.build();
final Result<QueryOk, Err> queryR = this.client.query(req).get();
LOG.info("#comprehensiveTest query result={}.", queryR);
Assert.assertTrue(queryR.isOk());
final QueryOk ok = queryR.getOk();
ok.mapToRecord().forEach(rd -> {
LOG.info("Field descriptor: {}", rd.getFieldDescriptors());
LOG.info("Data: ts={}, c1={}, c2={}, c3={}, c4={}, c5={}, c6={}, c7={}, c8={}, c9={}," + //
"c10={}, c11={}, c12={}, c13={}, c14={}, c15={}, c16={}", rd.getTimestamp("ts"), //
rd.getString("c1"), //
rd.getInt64("c2"), //
rd.getDouble("c3"), //
rd.getString("c4"), //
rd.getInt64("c5"), //
rd.getFloat("c6"), //
rd.getInteger("c7"), //
rd.getInt16("c8"), //
rd.getInt8("c9"), //
rd.getBoolean("c10"), //
rd.getUInt64("c11"), //
rd.getUInt32("c12"), //
rd.getUInt16("c13"), //
rd.getUInt8("c14"), //
rd.getTimestamp("c15"), rd.getBytes("c16"));
});
final Management management = this.client.management();
final SqlResult alterResult1 = management.executeSql("ALTER TABLE %s ADD COLUMN (c18 UINT64, c19 STRING TAG)",
TEST_TABLE_NAME);
LOG.info("ALTER TABLE 1: {}.", alterResult1);
final SqlResult alterResult2 = management.executeSql("ALTER TABLE %s ADD COLUMN c20 STRING TAG",
TEST_TABLE_NAME);
LOG.info("ALTER TABLE 2: {}.", alterResult2);
final SqlResult descResult = management.executeSql("DESCRIBE %s", TEST_TABLE_NAME);
LOG.info("DESCRIBE TABLE: {}.", descResult);
final SqlResult showResult = management.executeSql("SHOW CREATE TABLE %s", TEST_TABLE_NAME);
LOG.info("SHOW CREATE TABLE: {}.", showResult);
final SqlResult queryResult = management.executeSql("SELECT * FROM %s", TEST_TABLE_NAME);
LOG.info("QUERY TABLE: {}.", queryResult);
}
private Result<WriteOk, Err> write(final Calendar time, final int writeCount) {
try {
return writeAsync(time, writeCount).get();
} catch (final Throwable t) {
ThrowUtil.throwException(t);
}
return WriteOk.emptyOk().mapToResult();
}
private CompletableFuture<Result<WriteOk, Err>> writeAsync(final Calendar time, final int writeCount) {
return this.client.write(makeRows(time, writeCount));
}
private Collection<Rows> makeRows(final Calendar time, final int count) {
final long timestamp = time.getTimeInMillis();
final Collection<Rows> rows = new ArrayList<>();
for (long ts = (timestamp - count); ts < timestamp; ts++) {
final Rows.Builder builder = Series.newBuilder(TEST_TABLE_NAME).tag("c1", "first_c1")
.tag("c2", TagValue.withInt64(12)).toRowsBuilder() //
.fields(ts, in -> {
in.put("c3", FieldValue.withDouble(0.1));
in.put("c4", FieldValue.withString("string value"));
in.put("c5", FieldValue.withInt64(64));
in.put("c6", FieldValue.withFloat(1.0f));
in.put("c7", FieldValue.withInt(32));
in.put("c8", FieldValue.withInt16(16));
in.put("c9", FieldValue.withInt8(8));
in.put("c10", FieldValue.withBoolean(true));
in.put("c11",
FieldValue.withUInt64(UnsignedUtil.getUInt64(Long.MAX_VALUE).add(BigInteger.ONE)));
in.put("c12", FieldValue.withUInt32(33));
in.put("c13", FieldValue.withUInt16(17));
in.put("c14", FieldValue.withUInt8(9));
in.put("c15", FieldValue.withTimestamp(time.getTimeInMillis()));
in.put("c16", FieldValue.withVarbinary("test".getBytes(StandardCharsets.UTF_8)));
});
rows.add(builder.build());
}
return rows;
}
@Ignore
@Test
public void holdConnectionTest() throws ExecutionException, InterruptedException {
comprehensiveTest();
for (int i = 0; i < 1000; i++) {
Thread.sleep(1000);
}
}
@SuppressWarnings("all")
@Ignore
@Test
public void loopToWriteTest() throws Exception {
final int concurrence = 32;
final List<CompletableFuture<Result<WriteOk, Err>>> fs = new ArrayList<>();
for (;;) {
try {
for (int i = 0; i < concurrence; i++) {
fs.add(writeAsync(Calendar.getInstance(), 32));
}
for (final CompletableFuture<Result<WriteOk, Err>> f : fs) {
f.get();
}
} catch (final Exception e) {
e.printStackTrace();
} finally {
fs.clear();
}
}
}
@Ignore
@Test
public void streamWriteTest() {
final StreamWriteBuf<Rows, WriteOk> writeBuf = this.client.streamWrite(TEST_TABLE_NAME);
final CompletableFuture<WriteOk> future = writeBuf.write(makeRows(Calendar.getInstance(), 2)) //
.write(makeRows(Calendar.getInstance(), 3)) //
.flush() //
.writeAndFlush(makeRows(Calendar.getInstance(), 10)) //
.write(makeRows(Calendar.getInstance(), 10)) //
.writeAndFlush(makeRows(Calendar.getInstance(), 10)) //
.completed();
Assert.assertEquals(35, future.join().getSuccess());
}
@Ignore
@Test
public void streamQueryTest() {
final Calendar time = Calendar.getInstance();
final StreamWriteBuf<Rows, WriteOk> writeBuf = this.client.streamWrite(TEST_TABLE_NAME);
for (int i = 0; i < 1000; i++) {
time.add(Calendar.MILLISECOND, 1);
writeBuf.writeAndFlush(makeRows(time, 1));
}
final CompletableFuture<WriteOk> writeOk = writeBuf.completed();
Assert.assertEquals(1000, writeOk.join().getSuccess());
final QueryRequest req = QueryRequest.newBuilder().ql("select * from %s", TEST_TABLE_NAME).build();
final Iterator<Record> it = this.client.blockingStreamQuery(req, 3, TimeUnit.SECONDS);
int i = 0;
while (it.hasNext()) {
LOG.info("The {} row, timestamp={}", ++i, it.next().getTimestamp("ts"));
}
Assert.assertEquals(1000, i);
}
}