blob: 2e3dad2d308e1a95e19db8bb5ee1b5d3e476868e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro;
import org.apache.avro.Protocol.Message;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.ipc.SocketServer;
import org.apache.avro.ipc.SocketTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.generic.GenericRequestor;
import org.apache.avro.ipc.generic.GenericResponder;
import org.apache.avro.util.Utf8;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class TestProtocolGeneric {
private static final Logger LOG
= LoggerFactory.getLogger(TestProtocolGeneric.class);
protected static final File FILE = new File("../../../share/test/schemas/simple.avpr");
protected static final Protocol PROTOCOL;
static {
try {
PROTOCOL = Protocol.parse(FILE);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static boolean throwUndeclaredError;
protected static class TestResponder extends GenericResponder {
public TestResponder() { super(PROTOCOL); }
public Object respond(Message message, Object request)
throws AvroRemoteException {
GenericRecord params = (GenericRecord)request;
if ("hello".equals(message.getName())) {
LOG.info("hello: "+params.get("greeting"));
return new Utf8("goodbye");
}
if ("echo".equals(message.getName())) {
Object record = params.get("record");
LOG.info("echo: "+record);
return record;
}
if ("echoBytes".equals(message.getName())) {
Object data = params.get("data");
LOG.info("echoBytes: "+data);
return data;
}
if ("error".equals(message.getName())) {
if (throwUndeclaredError) throw new RuntimeException("foo");
GenericRecord error =
new GenericData.Record(PROTOCOL.getType("TestError"));
error.put("message", new Utf8("an error"));
throw new AvroRemoteException(error);
}
throw new AvroRuntimeException("unexpected message: "+message.getName());
}
}
protected static SocketServer server;
protected static Transceiver client;
protected static GenericRequestor requestor;
@Before
public void testStartServer() throws Exception {
if (server != null) return;
server = new SocketServer(new TestResponder(), new InetSocketAddress(0));
server.start();
client = new SocketTransceiver(new InetSocketAddress(server.getPort()));
requestor = new GenericRequestor(PROTOCOL, client);
}
@Test
public void testHello() throws IOException {
GenericRecord params =
new GenericData.Record(PROTOCOL.getMessages().get("hello").getRequest());
params.put("greeting", new Utf8("bob"));
Utf8 response = (Utf8)requestor.request("hello", params);
assertEquals(new Utf8("goodbye"), response);
}
@Test
public void testEcho() throws IOException {
GenericRecord record =
new GenericData.Record(PROTOCOL.getType("TestRecord"));
record.put("name", new Utf8("foo"));
record.put("kind", new GenericData.EnumSymbol
(PROTOCOL.getType("Kind"), "BAR"));
record.put("hash", new GenericData.Fixed
(PROTOCOL.getType("MD5"),
new byte[]{0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5}));
GenericRecord params =
new GenericData.Record(PROTOCOL.getMessages().get("echo").getRequest());
params.put("record", record);
Object echoed = requestor.request("echo", params);
assertEquals(record, echoed);
}
@Test
public void testEchoBytes() throws IOException {
Random random = new Random();
int length = random.nextInt(1024*16);
GenericRecord params =
new GenericData.Record(PROTOCOL.getMessages().get("echoBytes").getRequest());
ByteBuffer data = ByteBuffer.allocate(length);
random.nextBytes(data.array());
data.flip();
params.put("data", data);
Object echoed = requestor.request("echoBytes", params);
assertEquals(data, echoed);
}
@Test
public void testError() throws IOException {
GenericRecord params =
new GenericData.Record(PROTOCOL.getMessages().get("error").getRequest());
AvroRemoteException error = null;
try {
requestor.request("error", params);
} catch (AvroRemoteException e) {
error = e;
}
assertNotNull(error);
assertEquals("an error", ((GenericRecord)error.getValue()).get("message").toString());
}
@Test
public void testUndeclaredError() throws IOException {
this.throwUndeclaredError = true;
RuntimeException error = null;
GenericRecord params =
new GenericData.Record(PROTOCOL.getMessages().get("error").getRequest());
try {
requestor.request("error", params);
} catch (RuntimeException e) {
error = e;
} finally {
this.throwUndeclaredError = false;
}
assertNotNull(error);
assertTrue(error.toString().contains("foo"));
}
@Test
/** Construct and use a different protocol whose "hello" method has an extra
argument to check that schema is sent to parse request. */
public void testHandshake() throws IOException {
Protocol protocol = new Protocol("Simple", "org.apache.avro.test");
List<Field> fields = new ArrayList<Field>();
fields.add(new Schema.Field("extra", Schema.create(Schema.Type.BOOLEAN),
null, null));
fields.add(new Schema.Field("greeting", Schema.create(Schema.Type.STRING),
null, null));
Protocol.Message message =
protocol.createMessage("hello",
null /* doc */,
Schema.createRecord(fields),
Schema.create(Schema.Type.STRING),
Schema.createUnion(new ArrayList<Schema>()));
protocol.getMessages().put("hello", message);
Transceiver t
= new SocketTransceiver(new InetSocketAddress(server.getPort()));
try {
GenericRequestor r = new GenericRequestor(protocol, t);
GenericRecord params = new GenericData.Record(message.getRequest());
params.put("extra", Boolean.TRUE);
params.put("greeting", new Utf8("bob"));
Utf8 response = (Utf8)r.request("hello", params);
assertEquals(new Utf8("goodbye"), response);
} finally {
t.close();
}
}
@Test
/** Construct and use a different protocol whose "echo" response has an extra
field to check that correct schema is used to parse response. */
public void testResponseChange() throws IOException {
List<Field> fields = new ArrayList<Field>();
for (Field f : PROTOCOL.getType("TestRecord").getFields())
fields.add(new Field(f.name(), f.schema(), null, null));
fields.add(new Field("extra", Schema.create(Schema.Type.BOOLEAN),
null, true));
Schema record =
Schema.createRecord("TestRecord", null, "org.apache.avro.test", false);
record.setFields(fields);
Protocol protocol = new Protocol("Simple", "org.apache.avro.test");
List<Field> params = new ArrayList<Field>();
params.add(new Field("record", record, null, null));
Protocol.Message message =
protocol.createMessage("echo", null, Schema.createRecord(params),
record,
Schema.createUnion(new ArrayList<Schema>()));
protocol.getMessages().put("echo", message);
Transceiver t
= new SocketTransceiver(new InetSocketAddress(server.getPort()));
try {
GenericRequestor r = new GenericRequestor(protocol, t);
GenericRecord args = new GenericData.Record(message.getRequest());
GenericRecord rec = new GenericData.Record(record);
rec.put("name", new Utf8("foo"));
rec.put("kind", new GenericData.EnumSymbol
(PROTOCOL.getType("Kind"), "BAR"));
rec.put("hash", new GenericData.Fixed
(PROTOCOL.getType("MD5"),
new byte[]{0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5}));
rec.put("extra", Boolean.TRUE);
args.put("record", rec);
GenericRecord response = (GenericRecord)r.request("echo", args);
assertEquals(rec, response);
} finally {
t.close();
}
}
@AfterClass
public static void testStopServer() throws IOException {
client.close();
server.close();
}
}