| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.system.hdfs |
| |
| |
| import java.text.SimpleDateFormat |
| import java.util.Date |
| |
| import org.apache.avro.file.DataFileReader |
| import org.apache.avro.reflect.{ReflectData, ReflectDatumReader} |
| import org.apache.hadoop.conf.Configuration |
| import org.apache.hadoop.fs._ |
| import org.apache.hadoop.hdfs.{DFSConfigKeys, MiniDFSCluster} |
| import org.apache.hadoop.io.SequenceFile.Reader |
| import org.apache.hadoop.io.{BytesWritable, LongWritable, SequenceFile, Text} |
| import org.apache.samza.config.Config |
| import org.apache.samza.config.factories.PropertiesConfigFactory |
| import org.apache.samza.system.hdfs.HdfsConfig._ |
| import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream} |
| import org.apache.samza.util.Logging |
| import org.junit.Assert._ |
| import org.junit.{AfterClass, Ignore, Test} |
| |
| |
| object TestHdfsSystemProducerTestSuite { |
| val TEST = "test" |
| val BLOCK_SIZE = 128 * 1024 * 1024 // 128MB |
| val BATCH_SIZE = 512 // in bytes, to get multiple file splits in one of the tests |
| val PAUSE = 500 // in millis |
| val JOB_NAME = "samza-hdfs-test-job" // write some data as BytesWritable |
| val BATCH_JOB_NAME = "samza-hdfs-test-batch-job" // write enough binary data to force the producer to split partfiles |
| val TEXT_JOB_NAME = "samza-hdfs-test-job-text" // write some data as String |
| val AVRO_JOB_NAME = "samza-hdfs-test-job-avro" // write some data as Avro |
| val TEXT_BATCH_JOB_NAME = "samza-hdfs-test-batch-job-text" // force a file split, understanding that Text does some compressing |
| val AVRO_BATCH_JOB_NAME = "samza-hdfs-test-batch-job-avro" // force a file split, understanding that Avro does some compressing |
| val TEST_DATE = (new SimpleDateFormat("yyyy_MM_dd-HH")).format(new Date) |
| |
| // Test data |
| val EXPECTED = Array[String]("small_data", "medium_data", "large_data") |
| val LUMP = new scala.util.Random().nextString(BATCH_SIZE) |
| |
| case class AvroTestClass(a1: Long, b2: String) { |
| def this() = this(0L, "") |
| } |
| |
| val hdfsFactory = new TestHdfsSystemFactory() |
| val propsFactory = new PropertiesConfigFactory() |
| val cluster = getMiniCluster |
| |
| def testWritePath(job: String): Path = new Path( |
| Seq("/user/", System.getProperty("user.name"), job, TEST_DATE).mkString("/") |
| ) |
| |
| def getMiniCluster: Option[MiniDFSCluster] = { |
| val conf = new Configuration(false) |
| conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE) |
| |
| val cluster = Some(new MiniDFSCluster.Builder(conf).numDataNodes(3).build) |
| cluster.get.waitActive |
| |
| cluster |
| } |
| |
| def buildProducer(name: String, cluster: MiniDFSCluster): Option[HdfsSystemProducer] @unchecked = { |
| Some( |
| hdfsFactory.getProducer( |
| name, |
| propsFactory.getConfig(getClass.getResource(String.format("/%s.properties", name)).toURI), |
| new HdfsSystemProducerMetrics(name), |
| cluster |
| ) |
| ) |
| } |
| |
| def getReader(dfs: FileSystem, path: Path): SequenceFile.Reader = { |
| new SequenceFile.Reader(dfs.getConf, Reader.file(path)) |
| } |
| |
| @AfterClass |
| def tearDownMiniCluster: Unit = cluster.map{ _.shutdown } |
| |
| } |
| |
| @Ignore |
| class TestHdfsSystemProducerTestSuite extends Logging { |
| import org.apache.samza.system.hdfs.TestHdfsSystemProducerTestSuite._ |
| |
| @Test |
| def testHdfsSystemProducerBinaryWrite { |
| var producer: Option[HdfsSystemProducer] = None |
| |
| try { |
| producer = buildProducer(JOB_NAME, cluster.get) |
| producer.get.register(TEST) |
| producer.get.start |
| |
| Thread.sleep(PAUSE) |
| |
| val systemStream = new SystemStream(JOB_NAME, TEST) |
| EXPECTED.map { _.getBytes("UTF-8") }.map { |
| buffer: Array[Byte] => producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, buffer)) |
| } |
| |
| producer.get.stop |
| producer = None |
| |
| val results = cluster.get.getFileSystem.listStatus(testWritePath(JOB_NAME)) |
| val bytesWritten = results.toList.foldLeft(0L) { (acc, status) => acc + status.getLen } |
| assertTrue(results.length == 1) |
| assertTrue(bytesWritten > 0L) |
| |
| results.foreach { r => |
| val reader = getReader(cluster.get.getFileSystem, r.getPath) |
| val key = new BytesWritable |
| val value = new BytesWritable |
| (0 to 2).foreach { i => |
| reader.next(key, value) |
| assertEquals(EXPECTED(i), new String(value.copyBytes, "UTF-8")) |
| } |
| } |
| } finally { |
| producer.map { _.stop } |
| } |
| } |
| |
| @Test |
| def testHdfsSystemProducerWriteBinaryBatches { |
| var producer: Option[HdfsSystemProducer] = None |
| |
| try { |
| producer = buildProducer(BATCH_JOB_NAME, cluster.get) |
| |
| producer.get.start |
| producer.get.register(TEST) |
| Thread.sleep(PAUSE) |
| |
| val systemStream = new SystemStream(BATCH_JOB_NAME, TEST) |
| (1 to 6).map { i => LUMP.getBytes("UTF-8") }.map { |
| buffer => producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, buffer)) |
| } |
| |
| producer.get.stop |
| producer = None |
| |
| val results = cluster.get.getFileSystem.listStatus(testWritePath(BATCH_JOB_NAME)) |
| |
| assertEquals(6, results.length) |
| results.foreach { r => |
| val reader = getReader(cluster.get.getFileSystem, r.getPath) |
| val key = new BytesWritable |
| val value = new BytesWritable |
| (1 to BATCH_SIZE).foreach { i => |
| reader.next(key, value) |
| val data = value.copyBytes |
| assertEquals(LUMP, new String(data, "UTF-8")) |
| assertEquals(LUMP.getBytes("UTF-8").length, data.length) |
| } |
| } |
| |
| } finally { |
| producer.map { _.stop } |
| } |
| } |
| |
| @Test |
| def testHdfsSystemProducerTextWrite { |
| var producer: Option[HdfsSystemProducer] = None |
| |
| try { |
| producer = buildProducer(TEXT_JOB_NAME, cluster.get) |
| producer.get.register(TEST) |
| producer.get.start |
| |
| Thread.sleep(PAUSE) |
| |
| val systemStream = new SystemStream(TEXT_JOB_NAME, TEST) |
| EXPECTED.map { |
| buffer: String => producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, buffer)) |
| } |
| |
| producer.get.stop |
| producer = None |
| |
| val results = cluster.get.getFileSystem.listStatus(testWritePath(TEXT_JOB_NAME)) |
| val bytesWritten = results.toList.foldLeft(0L) { (acc, status) => acc + status.getLen } |
| assertTrue(results.length == 1) |
| assertTrue(bytesWritten > 0L) |
| |
| results.foreach { r => |
| val reader = getReader(cluster.get.getFileSystem, r.getPath) |
| val key = new LongWritable |
| val value = new Text |
| (0 to 2).foreach { i => |
| reader.next(key, value) |
| assertEquals(EXPECTED(i), value.toString) |
| } |
| } |
| } finally { |
| producer.map { _.stop } |
| } |
| } |
| |
| @Test |
| def testHdfsSystemProducerWriteTextBatches { |
| var producer: Option[HdfsSystemProducer] = None |
| |
| try { |
| producer = buildProducer(TEXT_BATCH_JOB_NAME, cluster.get) |
| |
| producer.get.start |
| producer.get.register(TEST) |
| Thread.sleep(PAUSE) |
| |
| val systemStream = new SystemStream(TEXT_BATCH_JOB_NAME, TEST) |
| (1 to 6).map { |
| i => producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, LUMP)) |
| } |
| |
| producer.get.stop |
| producer = None |
| |
| val results = cluster.get.getFileSystem.listStatus(testWritePath(TEXT_BATCH_JOB_NAME)) |
| |
| assertEquals(6, results.length) |
| results.foreach { r => |
| val reader = getReader(cluster.get.getFileSystem, r.getPath) |
| val key = new LongWritable |
| val value = new Text |
| (1 to BATCH_SIZE).foreach { i => |
| reader.next(key, value) |
| val data = value.toString |
| assertEquals(LUMP, data) |
| assertEquals(LUMP.length, data.length) |
| } |
| } |
| |
| } finally { |
| producer.map { _.stop } |
| } |
| } |
| |
| @Test |
| def testHdfsSystemProducerAvroWrite { |
| var producer: Option[HdfsSystemProducer] = None |
| |
| try { |
| producer = buildProducer(AVRO_JOB_NAME, cluster.get) |
| producer.get.register(TEST) |
| producer.get.start |
| |
| Thread.sleep(PAUSE) |
| |
| val systemStream = new SystemStream(AVRO_JOB_NAME, TEST) |
| val atc = new AvroTestClass(128038204592345678L, "alkjdsfafloiqulkjasoiuqlklakdsflkja") |
| producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, atc)) |
| |
| producer.get.stop |
| producer = None |
| |
| val dfs = cluster.get.getFileSystem |
| val results = dfs.listStatus(testWritePath(AVRO_JOB_NAME)) |
| val bytesWritten = results.toList.foldLeft(0L) { (acc, status) => acc + status.getLen } |
| assertTrue(results.length == 1) |
| assertTrue(bytesWritten > 0L) |
| |
| val atf = new AvroFSInput(FileContext.getFileContext(), results.head.getPath) |
| val schema = ReflectData.get().getSchema(atc.getClass) |
| val datumReader = new ReflectDatumReader[Object](schema) |
| val tfReader = new DataFileReader[Object](atf, datumReader) |
| val atc2 = tfReader.next().asInstanceOf[AvroTestClass] |
| |
| assertTrue(atc == atc2) |
| |
| } finally { |
| producer.map { _.stop } |
| } |
| } |
| |
| @Test |
| def testHdfsSystemProducerWriteAvroBatches { |
| var producer: Option[HdfsSystemProducer] = None |
| |
| try { |
| producer = buildProducer(AVRO_BATCH_JOB_NAME, cluster.get) |
| |
| producer.get.start |
| producer.get.register(TEST) |
| Thread.sleep(PAUSE) |
| |
| val systemStream = new SystemStream(AVRO_BATCH_JOB_NAME, TEST) |
| val atc = new AvroTestClass(128038204592345678L, "alkjdsfafloiqulkjasoiuqlklakdsflkja") |
| |
| (1 to 20).map { |
| i => producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, atc)) |
| } |
| |
| producer.get.stop |
| producer = None |
| |
| val dfs = cluster.get.getFileSystem |
| val results = dfs.listStatus(testWritePath(AVRO_BATCH_JOB_NAME)) |
| // systems.samza-hdfs-test-batch-job-text.producer.hdfs.write.batch.size.records=10 |
| assertEquals(2, results.length) |
| |
| results.foreach { r => |
| val atf = new AvroFSInput(FileContext.getFileContext(), r.getPath) |
| val schema = ReflectData.get().getSchema(atc.getClass) |
| val datumReader = new ReflectDatumReader[Object](schema) |
| val tfReader = new DataFileReader[Object](atf, datumReader) |
| val atc2 = tfReader.next().asInstanceOf[AvroTestClass] |
| assertTrue(atc == atc2) |
| } |
| |
| } finally { |
| producer.map { _.stop } |
| } |
| } |
| |
| } |
| |
| |
| class TestHdfsSystemProducer(systemName: String, config: HdfsConfig, clientId: String, metrics: HdfsSystemProducerMetrics, mini: MiniDFSCluster) |
| extends HdfsSystemProducer(systemName, clientId, config, metrics) { |
| override val dfs = mini.getFileSystem |
| } |
| |
| |
| class TestHdfsSystemFactory extends HdfsSystemFactory { |
| def getProducer(systemName: String, config: Config, metrics: HdfsSystemProducerMetrics, cluster: MiniDFSCluster) = { |
| new TestHdfsSystemProducer(systemName, config, "test", metrics, cluster) |
| } |
| } |