blob: fb5dbe6a8aa9ebfbb9d9794f877760e2c0144ccd [file] [log] [blame]
//package org.apache.s2graph.core.utils
//
//import java.util.Properties
//
//import io.reactivex._
//import io.reactivex.schedulers.Schedulers
//import org.apache.kafka.clients.consumer.KafkaConsumer
//import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
//import org.apache.s2graph.core.ExceptionHandler
//
//
//trait Sync[VALUE] {
// type KV = (String, VALUE)
//
// val EmptyValue: VALUE
// val SyncId: String
// val SkipSelfBroadcast: Boolean
//
// val delim = "___" // ! do not change
//
// // public
// val emitter = io.reactivex.processors.PublishProcessor.create[KV]()
//
// def send(key: String): Unit = send(key, EmptyValue)
//
// def send(key: String, value: VALUE): Unit = sendImpl(encodeKey(key), value)
//
// def shutdown(): Unit = {
// emitter.onComplete()
// }
//
// protected def emit(kv: KV): Unit = {
// val (encodedKey, value) = kv
// val (sid, key) = decodeKey(encodedKey)
//
// if (SkipSelfBroadcast && sid == SyncId) {
// logger.syncInfo(s"[Message ignore] sent by self: ${SyncId}")
// // pass
// } else {
// emitter.onNext(key -> value)
// logger.syncInfo(s"[Message emit success]: selfId: ${SyncId}, from: ${sid}")
// }
// }
//
// protected def sendImpl(key: String, value: VALUE): Unit
//
// private def encodeKey(key: String): String = s"${SyncId}${delim}${key}"
//
// private def decodeKey(key: String): (String, String) = {
// val Array(cid, decodedKey) = key.split(delim)
// cid -> decodedKey
// }
//}
//
//class MemorySync(syncId: String) extends Sync[Array[Byte]] {
//
// override val EmptyValue: Array[Byte] = Array.empty[Byte]
//
// override val SyncId: String = syncId
//
// override val SkipSelfBroadcast: Boolean = false
//
// override protected def sendImpl(key: String, value: Array[Byte]): Unit = emit(key -> value)
//}
//
//object KafkaSync {
//
// import java.util.Properties
//
// val keySerializer = "org.apache.kafka.common.serialization.StringSerializer"
// val valueSerializer = "org.apache.kafka.common.serialization.ByteArraySerializer"
// val keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"
// val valueDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
//
// val maxMessageSize = "15728640" // 15 MB
//
// def producerConfig(brokers: String): Properties = {
// val props = ExceptionHandler.toKafkaProp(brokers)
//
// props.setProperty("key.serializer", keySerializer)
// props.setProperty("value.serializer", valueSerializer)
// props.setProperty("message.max.bytes", maxMessageSize)
// props.setProperty("max.request.size", maxMessageSize)
//
//
// props
// }
//
// def consumerConfig(brokers: String, groupId: String): Properties = {
// val props = new Properties()
//
// props.put("bootstrap.servers", brokers)
// props.put("group.id", groupId)
// props.put("enable.auto.commit", "false")
// props.put("key.deserializer", keyDeserializer)
// props.put("value.deserializer", valueDeserializer)
// props.put("max.partition.fetch.bytes", maxMessageSize)
// props.put("fetch.message.max.bytes", maxMessageSize)
//
// props
// }
//}
//
//class KafkaSync(topic: String,
// syncId: String,
// producerConfig: Properties,
// consumerConfig: Properties,
// skipSelfBroadcast: Boolean = true,
// seekTo: String = "end"
// ) extends Sync[Array[Byte]] {
//
// type VALUE = Array[Byte]
//
// val consumerTimeout: Int = 1000
//
// override val EmptyValue = Array.empty[Byte]
//
// override val SyncId = syncId
//
// override val SkipSelfBroadcast: Boolean = skipSelfBroadcast
//
// lazy val producer = new KafkaProducer[String, VALUE](producerConfig)
//
// lazy val consumer = {
// import scala.collection.JavaConverters._
//
// val kc = new KafkaConsumer[String, VALUE](consumerConfig)
// kc.subscribe(Seq(topic).asJava)
// kc.poll(consumerTimeout * 10) // Just for meta info sync
//
// if (seekTo == "end") {
// kc.seekToEnd(kc.assignment())
// } else if (seekTo == "beginning") {
// kc.seekToBeginning(kc.assignment())
// } else {
// // pass
// }
//
// kc
// }
//
// // Emit event from kafka consumer: Flowable is just for while loop, not for Observabl
// Flowable.create(new FlowableOnSubscribe[KV] {
// import scala.collection.JavaConverters._
//
// override def subscribe(e: FlowableEmitter[KV]) = {
// while (true) {
// val ls = consumer.poll(consumerTimeout).asScala.map(record => record.key() -> record.value())
// ls.foreach(emit)
//
// if (ls.nonEmpty) {
// consumer.commitSync()
// logger.syncInfo(s"[Kafka consume success]: message size: ${ls.size}]")
// }
// }
//
// e.onComplete()
// }
// }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.single()).subscribe()
//
// override protected def sendImpl(key: String, value: VALUE): Unit = {
// val record = new ProducerRecord[String, VALUE](topic, key, value)
//
// producer.send(record, new Callback {
// override def onCompletion(metadata: RecordMetadata, exception: Exception) = {
// if (exception != null) {
// logger.syncInfo(s"[Kafka produce failed]: from: key: ${key} e: ${exception}")
// } else {
// logger.syncInfo(s"[Kafka produce success]: from: ${SyncId} ${metadata}")
// }
// }
// })
// }
//
// override def shutdown(): Unit = {
// super.shutdown()
//
// producer.close()
// consumer.close()
// }
//}