| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package io.ceresdb; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ForkJoinPool; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import org.apache.avro.Schema; |
| import org.apache.avro.generic.GenericData; |
| import org.apache.avro.generic.GenericDatumWriter; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.avro.io.BinaryEncoder; |
| import org.apache.avro.io.DatumWriter; |
| import org.apache.avro.io.EncoderFactory; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.mockito.Mock; |
| import org.mockito.Mockito; |
| import org.mockito.runners.MockitoJUnitRunner; |
| |
| import io.ceresdb.common.Endpoint; |
| import io.ceresdb.common.util.internal.ThrowUtil; |
| import io.ceresdb.errors.IteratorException; |
| import io.ceresdb.errors.StreamException; |
| import io.ceresdb.models.Err; |
| import io.ceresdb.models.QueryOk; |
| import io.ceresdb.models.QueryRequest; |
| import io.ceresdb.models.Record; |
| import io.ceresdb.models.Result; |
| import io.ceresdb.options.QueryOptions; |
| import io.ceresdb.proto.Common; |
| import io.ceresdb.proto.Storage; |
| import io.ceresdb.rpc.Context; |
| import io.ceresdb.rpc.Observer; |
| |
| import com.google.protobuf.ByteStringHelper; |
| |
| /** |
| * @author jiachun.fjc |
| */ |
| @RunWith(value = MockitoJUnitRunner.class) |
| public class QueryClientTest { |
| |
| private QueryClient queryClient; |
| @Mock |
| private RouterClient routerClient; |
| |
| @Before |
| public void before() { |
| final QueryOptions queryOpts = new QueryOptions(); |
| queryOpts.setAsyncPool(ForkJoinPool.commonPool()); |
| queryOpts.setRouterClient(this.routerClient); |
| |
| this.queryClient = new QueryClient(); |
| this.queryClient.init(queryOpts); |
| } |
| |
| @After |
| public void after() { |
| this.queryClient.shutdownGracefully(); |
| this.routerClient.shutdownGracefully(); |
| } |
| |
| @Test |
| public void queryOkNoRouteTest() throws ExecutionException, InterruptedException { |
| final Common.ResponseHeader header = Common.ResponseHeader.newBuilder() // |
| .setCode(Result.SUCCESS) // |
| .build(); |
| final Storage.QueryResponse resp = Storage.QueryResponse.newBuilder().setHeader(header) // |
| .addRows(ByteStringHelper.wrap(new byte[] { 1, 2, 3 })) // |
| .build(); |
| final Endpoint ep = Endpoint.of("127.0.0.1", 8081); |
| |
| Mockito.when(this.routerClient.invoke(Mockito.eq(ep), Mockito.any(), Mockito.any())) // |
| .thenReturn(Utils.completedCf(resp)); |
| Mockito.when(this.routerClient.routeFor(Mockito.any())) // |
| .thenReturn(Utils.completedCf(new HashMap<>())); |
| Mockito.when(this.routerClient.clusterRoute()) // |
| .thenReturn(Route.of(ep)); |
| |
| final QueryRequest req = QueryRequest.newBuilder().forMetrics("query_test_table") // |
| .ql("select number from query_test_table") // |
| .build(); |
| final CompletableFuture<Result<QueryOk, Err>> f = this.queryClient.query(req, Context.newDefault()); |
| |
| final Result<QueryOk, Err> r = f.get(); |
| |
| Assert.assertTrue(r.isOk()); |
| |
| final QueryOk queryOk = r.getOk(); |
| |
| final Stream<String> strs = queryOk.map(Arrays::toString); |
| |
| Assert.assertEquals(Collections.singletonList("[1, 2, 3]"), strs.collect(Collectors.toList())); |
| } |
| |
| @Test |
| public void queryOkByValidRouteTest() throws ExecutionException, InterruptedException { |
| final Common.ResponseHeader header = Common.ResponseHeader.newBuilder() // |
| .setCode(Result.SUCCESS) // |
| .build(); |
| final Storage.QueryResponse resp = Storage.QueryResponse.newBuilder().setHeader(header) // |
| .addRows(ByteStringHelper.wrap(new byte[] { 1, 2, 3 })) // |
| .build(); |
| final Endpoint ep = Endpoint.of("127.0.0.1", 8081); |
| |
| Mockito.when(this.routerClient.invoke(Mockito.eq(ep), Mockito.any(), Mockito.any())) // |
| .thenReturn(Utils.completedCf(resp)); |
| Mockito.when(this.routerClient.routeFor(Mockito.eq(Collections.singletonList("query_test_table")))) // |
| .thenReturn(Utils.completedCf(new HashMap<String, Route>() { |
| private static final long serialVersionUID = -6260265905617276356L; |
| |
| { |
| put("query_test_table", Route.of("query_test_table", ep)); |
| } |
| })); |
| |
| final QueryRequest req = QueryRequest.newBuilder().forMetrics("query_test_table") // |
| .ql("select number from query_test_table") // |
| .build(); |
| final CompletableFuture<Result<QueryOk, Err>> f = this.queryClient.query(req, Context.newDefault()); |
| |
| final Result<QueryOk, Err> r = f.get(); |
| |
| Assert.assertTrue(r.isOk()); |
| |
| final QueryOk queryOk = r.getOk(); |
| |
| final Stream<String> strs = queryOk.map(Arrays::toString); |
| |
| Assert.assertEquals(Collections.singletonList("[1, 2, 3]"), strs.collect(Collectors.toList())); |
| } |
| |
| @Test |
| public void queryFailedTest() throws ExecutionException, InterruptedException { |
| final Common.ResponseHeader header = Common.ResponseHeader.newBuilder() // |
| .setCode(500) // failed code |
| .build(); |
| final Storage.QueryResponse resp = Storage.QueryResponse.newBuilder().setHeader(header) // |
| .build(); |
| final Endpoint ep = Endpoint.of("127.0.0.1", 8081); |
| |
| Mockito.when(this.routerClient.invoke(Mockito.eq(ep), Mockito.any(), Mockito.any())) // |
| .thenReturn(Utils.completedCf(resp)); |
| Mockito.when(this.routerClient.routeFor(Mockito.eq(Collections.singletonList("query_test_table")))) // |
| .thenReturn(Utils.completedCf(new HashMap<String, Route>() { |
| private static final long serialVersionUID = 4517371149948738282L; |
| |
| { |
| put("query_test_table", Route.of("query_test_table", ep)); |
| } |
| })); |
| Mockito.when(this.routerClient.routeRefreshFor(Mockito.any())) |
| .thenReturn(Utils.completedCf(new HashMap<String, Route>() { |
| private static final long serialVersionUID = 2347114952231996366L; |
| |
| { |
| put("query_test_table", Route.of("query_test_table", ep)); |
| } |
| })); |
| |
| final QueryRequest req = QueryRequest.newBuilder().forMetrics("query_test_table") // |
| .ql("select number from query_test_table") // |
| .build(); |
| final CompletableFuture<Result<QueryOk, Err>> f = this.queryClient.query(req, Context.newDefault()); |
| |
| final Result<QueryOk, Err> r = f.get(); |
| |
| Assert.assertFalse(r.isOk()); |
| |
| final Err err = r.getErr(); |
| |
| Assert.assertEquals(ep, err.getErrTo()); |
| Assert.assertEquals(Collections.singletonList("query_test_table"), err.getFailedMetrics()); |
| } |
| |
| @Test |
| public void queryByAvroAndReturnGenericRecordTest() throws ExecutionException, InterruptedException, IOException { |
| final Result<QueryOk, Err> r = queryByAvro(); |
| Assert.assertTrue(r.isOk()); |
| final QueryOk queryOk = r.getOk(); |
| queryOk.mapToRecord().forEach(this::checkRecord); |
| } |
| |
| private void checkRecord(final Record row) { |
| Assert.assertEquals(7, row.getFieldCount()); |
| Assert.assertEquals("f1", row.getFieldDescriptors().get(0).getName()); |
| Assert.assertEquals("f2", row.getFieldDescriptors().get(1).getName()); |
| Assert.assertEquals("f3", row.getFieldDescriptors().get(2).getName()); |
| Assert.assertEquals("f4", row.getFieldDescriptors().get(3).getName()); |
| Assert.assertEquals("f5", row.getFieldDescriptors().get(4).getName()); |
| Assert.assertEquals("f6", row.getFieldDescriptors().get(5).getName()); |
| Assert.assertEquals("f7", row.getFieldDescriptors().get(6).getName()); |
| |
| Assert.assertEquals(Record.Type.String, row.getFieldDescriptors().get(0).getType().getType()); |
| Assert.assertEquals(Record.Type.Int, row.getFieldDescriptors().get(1).getType().getType()); |
| Assert.assertEquals(Record.Type.Long, row.getFieldDescriptors().get(2).getType().getType()); |
| Assert.assertEquals(Record.Type.Float, row.getFieldDescriptors().get(3).getType().getType()); |
| Assert.assertEquals(Record.Type.Double, row.getFieldDescriptors().get(4).getType().getType()); |
| Assert.assertEquals(Record.Type.Boolean, row.getFieldDescriptors().get(5).getType().getType()); |
| Assert.assertEquals(Record.Type.Null, row.getFieldDescriptors().get(6).getType().getType()); |
| |
| Assert.assertEquals("a string value", row.getString("f1")); |
| Assert.assertEquals(new Integer(100), row.getInteger("f2")); |
| Assert.assertEquals(new Long(200), row.getLong("f3")); |
| Assert.assertEquals(new Float(2.6), row.getFloat("f4")); |
| Assert.assertEquals(new Double(3.75), row.getDouble("f5")); |
| Assert.assertEquals(Boolean.TRUE, row.getBoolean("f6")); |
| Assert.assertNull(row.get("f7")); |
| |
| Assert.assertEquals("a string value", row.getString(0)); |
| Assert.assertEquals(new Integer(100), row.getInteger(1)); |
| Assert.assertEquals(new Long(200), row.getLong(2)); |
| Assert.assertEquals(new Float(2.6), row.getFloat(3)); |
| Assert.assertEquals(new Double(3.75), row.getDouble(4)); |
| Assert.assertEquals(Boolean.TRUE, row.getBoolean(5)); |
| Assert.assertNull(row.get(6)); |
| } |
| |
| @Test |
| public void queryByAvroAndReturnObjectsTest() throws ExecutionException, InterruptedException, IOException { |
| final Result<QueryOk, Err> r = queryByAvro(); |
| Assert.assertTrue(r.isOk()); |
| final QueryOk queryOk = r.getOk(); |
| queryOk.mapToArray().forEach(row -> { |
| Assert.assertEquals(7, row.length); |
| Assert.assertEquals("a string value", String.valueOf(row[0])); |
| Assert.assertEquals(100, row[1]); |
| Assert.assertEquals(200L, row[2]); |
| Assert.assertEquals((float) 2.6, row[3]); |
| Assert.assertEquals(3.75, row[4]); |
| Assert.assertEquals(Boolean.TRUE, row[5]); |
| Assert.assertNull(row[6]); |
| }); |
| } |
| |
| private Result<QueryOk, Err> queryByAvro() throws IOException, ExecutionException, InterruptedException { |
| final Storage.QueryResponse resp = mockQueryResponse(); |
| final Endpoint ep = Endpoint.of("127.0.0.1", 8081); |
| |
| Mockito.when(this.routerClient.invoke(Mockito.eq(ep), Mockito.any(), Mockito.any())) // |
| .thenReturn(Utils.completedCf(resp)); |
| Mockito.when(this.routerClient.routeFor(Mockito.any())) // |
| .thenReturn(Utils.completedCf(new HashMap<>())); |
| Mockito.when(this.routerClient.clusterRoute()) // |
| .thenReturn(Route.of(ep)); |
| |
| final QueryRequest req = QueryRequest.newBuilder().forMetrics("query_test_table") // |
| .ql("select number from query_test_table") // |
| .build(); |
| final CompletableFuture<Result<QueryOk, Err>> f = this.queryClient.query(req, Context.newDefault()); |
| |
| return f.get(); |
| } |
| |
| private QueryOk mockQueryOk() { |
| try { |
| Thread.sleep(20); |
| return queryByAvro().unwrapOr(QueryOk.emptyOk()); |
| } catch (final Throwable t) { |
| ThrowUtil.throwException(t); |
| } |
| return null; |
| } |
| |
| private Storage.QueryResponse mockQueryResponse() throws IOException { |
| final Common.ResponseHeader header = Common.ResponseHeader.newBuilder() // |
| .setCode(Result.SUCCESS) // |
| .build(); |
| final String userSchema = "{\"type\":\"record\"," + "\"name\":\"my_record\"," + "\"fields\":[" // |
| + "{\"name\":\"f1\",\"type\":\"string\"}," // |
| + "{\"name\":\"f2\",\"type\":\"int\"}," // |
| + "{\"name\":\"f3\",\"type\":\"long\"}," // |
| + "{\"name\":\"f4\",\"type\":\"float\"}," // |
| + "{\"name\":\"f5\",\"type\":\"double\"}," // |
| + "{\"name\":\"f6\",\"type\":\"boolean\"}," // |
| + "{\"name\":\"f7\",\"type\":\"null\"}" // |
| + "]}"; |
| final Schema.Parser parser = new Schema.Parser(); |
| final Schema schema = parser.parse(userSchema); |
| final GenericRecord record = new GenericData.Record(schema); |
| record.put("f1", "a string value"); |
| record.put("f2", 100); |
| record.put("f3", 200L); |
| record.put("f4", 2.6f); |
| record.put("f5", 3.75d); |
| record.put("f6", true); |
| final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null); |
| datumWriter.write(record, encoder); |
| encoder.flush(); |
| baos.flush(); |
| |
| return Storage.QueryResponse.newBuilder() // |
| .setHeader(header) // |
| .setSchemaType(Storage.QueryResponse.SchemaType.AVRO) // |
| .setSchemaContent(userSchema) // |
| .addRows(ByteStringHelper.wrap(baos.toByteArray())) // |
| .build(); |
| } |
| |
| @Test |
| public void blockingStreamQueryTest() { |
| final QueryOk data = mockQueryOk(); |
| final int respCount = 10; |
| final BlockingStreamIterator streams = new BlockingStreamIterator(1000, TimeUnit.MILLISECONDS); |
| final Observer<QueryOk> obs = streams.getObserver(); |
| new Thread(() -> { |
| for (int i = 0; i < respCount; i++) { |
| if (i == 0) { |
| obs.onNext(data); |
| } else { |
| obs.onNext(mockQueryOk()); |
| } |
| } |
| obs.onCompleted(); |
| }).start(); |
| |
| final Iterator<Record> it = new RecordIterator(streams); |
| |
| int i = 0; |
| while (it.hasNext()) { |
| i++; |
| checkRecord(it.next()); |
| } |
| Assert.assertEquals(respCount, i); |
| } |
| |
| @Test(expected = IteratorException.class) |
| public void blockingStreamQueryOnErrTest() { |
| final int respCount = 10; |
| final BlockingStreamIterator streams = new BlockingStreamIterator(1000, TimeUnit.MILLISECONDS); |
| final Observer<QueryOk> obs = streams.getObserver(); |
| new Thread(() -> { |
| for (int i = 0; i < respCount; i++) { |
| if (i == 5) { |
| obs.onError(new StreamException("fail to stream query")); |
| } else { |
| obs.onNext(mockQueryOk()); |
| } |
| } |
| obs.onCompleted(); |
| }).start(); |
| |
| final Iterator<Record> it = new RecordIterator(streams); |
| |
| int i = 0; |
| while (it.hasNext()) { |
| i++; |
| checkRecord(it.next()); |
| } |
| Assert.assertEquals(respCount, i); |
| } |
| } |