blob: 20ca1930e726416a25e03779380e8907247709d2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.javaapi
import kafka.serializer.Encoder
import kafka.producer.async.QueueItem
import kafka.utils.Logging
private[javaapi] object Implicits extends Logging {
implicit def javaMessageSetToScalaMessageSet(messageSet: kafka.javaapi.message.ByteBufferMessageSet):
kafka.message.ByteBufferMessageSet = messageSet.underlying
implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
kafka.javaapi.message.ByteBufferMessageSet = {
new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getInitialOffset,
messageSet.getErrorCode)
}
implicit def toJavaSyncProducer(producer: kafka.producer.SyncProducer): kafka.javaapi.producer.SyncProducer = {
debug("Implicit instantiation of Java Sync Producer")
new kafka.javaapi.producer.SyncProducer(producer)
}
implicit def toSyncProducer(producer: kafka.javaapi.producer.SyncProducer): kafka.producer.SyncProducer = {
debug("Implicit instantiation of Sync Producer")
producer.underlying
}
implicit def toScalaEventHandler[T](eventHandler: kafka.javaapi.producer.async.EventHandler[T])
: kafka.producer.async.EventHandler[T] = {
new kafka.producer.async.EventHandler[T] {
override def init(props: java.util.Properties) { eventHandler.init(props) }
override def handle(events: Seq[QueueItem[T]], producer: kafka.producer.SyncProducer, encoder: Encoder[T]) {
import collection.JavaConversions._
eventHandler.handle(asList(events), producer, encoder)
}
override def close { eventHandler.close }
}
}
implicit def toJavaEventHandler[T](eventHandler: kafka.producer.async.EventHandler[T])
: kafka.javaapi.producer.async.EventHandler[T] = {
new kafka.javaapi.producer.async.EventHandler[T] {
override def init(props: java.util.Properties) { eventHandler.init(props) }
override def handle(events: java.util.List[QueueItem[T]], producer: kafka.javaapi.producer.SyncProducer,
encoder: Encoder[T]) {
import collection.JavaConversions._
eventHandler.handle(asBuffer(events), producer, encoder)
}
override def close { eventHandler.close }
}
}
implicit def toScalaCbkHandler[T](cbkHandler: kafka.javaapi.producer.async.CallbackHandler[T])
: kafka.producer.async.CallbackHandler[T] = {
new kafka.producer.async.CallbackHandler[T] {
import collection.JavaConversions._
override def init(props: java.util.Properties) { cbkHandler.init(props)}
override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = {
cbkHandler.beforeEnqueue(data)
}
override def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) {
cbkHandler.afterEnqueue(data, added)
}
override def afterDequeuingExistingData(data: QueueItem[T] = null): scala.collection.mutable.Seq[QueueItem[T]] = {
cbkHandler.afterDequeuingExistingData(data)
}
override def beforeSendingData(data: Seq[QueueItem[T]] = null): scala.collection.mutable.Seq[QueueItem[T]] = {
asList(cbkHandler.beforeSendingData(asList(data)))
}
override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[T]] = {
asBuffer(cbkHandler.lastBatchBeforeClose)
}
override def close { cbkHandler.close }
}
}
implicit def toJavaCbkHandler[T](cbkHandler: kafka.producer.async.CallbackHandler[T])
: kafka.javaapi.producer.async.CallbackHandler[T] = {
new kafka.javaapi.producer.async.CallbackHandler[T] {
import collection.JavaConversions._
override def init(props: java.util.Properties) { cbkHandler.init(props)}
override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = {
cbkHandler.beforeEnqueue(data)
}
override def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) {
cbkHandler.afterEnqueue(data, added)
}
override def afterDequeuingExistingData(data: QueueItem[T] = null)
: java.util.List[QueueItem[T]] = {
asList(cbkHandler.afterDequeuingExistingData(data))
}
override def beforeSendingData(data: java.util.List[QueueItem[T]] = null)
: java.util.List[QueueItem[T]] = {
asBuffer(cbkHandler.beforeSendingData(asBuffer(data)))
}
override def lastBatchBeforeClose: java.util.List[QueueItem[T]] = {
asList(cbkHandler.lastBatchBeforeClose)
}
override def close { cbkHandler.close }
}
}
implicit def toMultiFetchResponse(response: kafka.javaapi.MultiFetchResponse): kafka.api.MultiFetchResponse =
response.underlying
implicit def toJavaMultiFetchResponse(response: kafka.api.MultiFetchResponse): kafka.javaapi.MultiFetchResponse =
new kafka.javaapi.MultiFetchResponse(response.buffer, response.numSets, response.offsets)
}