blob: cae66516ff79ba78d8f12f7dba1ebf6f19a4305f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.network;
import java.net._
import java.io._
import org.junit._
import org.scalatest.junit.JUnitSuite
import kafka.utils.TestUtils
import java.util.Random
import org.apache.log4j._
class SocketServerTest extends JUnitSuite {
Logger.getLogger("kafka").setLevel(Level.INFO)
def echo(receive: Receive): Option[Send] = {
val id = receive.buffer.getShort
Some(new BoundedByteBufferSend(receive.buffer.slice))
}
val server = new SocketServer(port = TestUtils.choosePort,
numProcessorThreads = 1,
monitoringPeriodSecs = 30,
handlerFactory = (requestId: Short, receive: Receive) => echo,
sendBufferSize = 300000,
receiveBufferSize = 300000,
maxRequestSize = 50)
server.startup()
def sendRequest(id: Short, request: Array[Byte]): Array[Byte] = {
val socket = new Socket("localhost", server.port)
val outgoing = new DataOutputStream(socket.getOutputStream)
outgoing.writeInt(request.length + 2)
outgoing.writeShort(id)
outgoing.write(request)
outgoing.flush()
val incoming = new DataInputStream(socket.getInputStream)
val len = incoming.readInt()
val response = new Array[Byte](len)
incoming.readFully(response)
socket.close()
response
}
@After
def cleanup() {
server.shutdown()
}
@Test
def simpleRequest() {
val response = new String(sendRequest(0, "hello".getBytes))
}
@Test(expected=classOf[IOException])
def tooBigRequestIsRejected() {
val tooManyBytes = new Array[Byte](server.maxRequestSize + 1)
new Random().nextBytes(tooManyBytes)
sendRequest(0, tooManyBytes)
}
}