| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.system.azureblob.avro; |
| |
| import com.azure.storage.blob.BlobAsyncClient; |
| import com.azure.storage.blob.BlobContainerAsyncClient; |
| import com.azure.storage.blob.specialized.BlockBlobAsyncClient; |
| import org.apache.samza.system.azureblob.compression.Compression; |
| import org.apache.samza.system.azureblob.compression.CompressionFactory; |
| import org.apache.samza.system.azureblob.compression.CompressionType; |
| import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.concurrent.LinkedBlockingDeque; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.avro.Schema; |
| import org.apache.avro.file.DataFileWriter; |
| import org.apache.avro.generic.GenericDatumWriter; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.avro.generic.IndexedRecord; |
| import org.apache.avro.io.BinaryEncoder; |
| import org.apache.avro.io.DatumWriter; |
| import org.apache.avro.io.EncoderFactory; |
| import org.apache.avro.specific.SpecificDatumWriter; |
| import org.apache.avro.specific.SpecificRecord; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.system.OutgoingMessageEnvelope; |
| import org.apache.samza.system.SystemStream; |
| import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.Matchers; |
| import org.powermock.api.mockito.PowerMockito; |
| import org.powermock.core.classloader.annotations.PrepareForTest; |
| import org.powermock.modules.junit4.PowerMockRunner; |
| |
| import static org.mockito.Mockito.anyString; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.doThrow; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.never; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| |
| @RunWith(PowerMockRunner.class) |
| @PrepareForTest({BlobContainerAsyncClient.class, BlockBlobAsyncClient.class, AzureBlobAvroWriter.class, AzureBlobOutputStream.class}) |
| public class TestAzureBlobAvroWriter { |
| private ThreadPoolExecutor threadPool; |
| private OutgoingMessageEnvelope ome; |
| private byte[] encodedRecord; |
| private AzureBlobAvroWriter azureBlobAvroWriter; |
| private DataFileWriter mockDataFileWriter; |
| private AzureBlobOutputStream mockAzureBlobOutputStream; |
| private BlockBlobAsyncClient mockBlockBlobAsyncClient; |
| private Compression mockCompression; |
| |
| private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory = mock(BlobMetadataGeneratorFactory.class); |
| private final Config blobMetadataGeneratorConfig = mock(Config.class); |
| private static final String STREAM_NAME = "FAKE_STREAM"; |
| private static final String VALUE = "FAKE_VALUE"; |
| private static final String SYSTEM_NAME = "FAKE_SYSTEM"; |
| private static final int THRESHOLD = 100; |
| |
| private class SpecificRecordEvent extends org.apache.avro.specific.SpecificRecordBase |
| implements org.apache.avro.specific.SpecificRecord { |
| public final org.apache.avro.Schema schema = org.apache.avro.Schema.parse( |
| "{\"type\":\"record\",\"name\":\"SpecificRecordEvent\",\"namespace\":\"org.apache.samza.events\",\"fields\":[]}"); |
| |
| public org.apache.avro.Schema getSchema() { |
| return schema; |
| } |
| |
| public java.lang.Object get(int field) { |
| return null; |
| } |
| |
| public void put(int field, Object value) {} |
| } |
| |
| private class GenericRecordEvent implements org.apache.avro.generic.GenericRecord { |
| public final org.apache.avro.Schema schema = org.apache.avro.Schema.parse( |
| "{\"type\":\"record\",\"name\":\"GenericRecordEvent\",\"namespace\":\"org.apache.samza.events\",\"fields\":[]}"); |
| |
| public org.apache.avro.Schema getSchema() { |
| return schema; |
| } |
| |
| public java.lang.Object get(String key) { |
| return null; |
| } |
| |
| public java.lang.Object get(int field) { |
| return null; |
| } |
| |
| public void put(int field, Object value) {} |
| public void put(String key, Object value) {} |
| } |
| |
| private OutgoingMessageEnvelope createOME(String streamName) { |
| SystemStream systemStream = new SystemStream(SYSTEM_NAME, streamName); |
| SpecificRecord record = new SpecificRecordEvent(); |
| return new OutgoingMessageEnvelope(systemStream, record); |
| } |
| |
| private OutgoingMessageEnvelope createOMEGenericRecord(String streamName) { |
| SystemStream systemStream = new SystemStream(SYSTEM_NAME, streamName); |
| GenericRecord record = new GenericRecordEvent(); |
| return new OutgoingMessageEnvelope(systemStream, record); |
| } |
| |
| @Before |
| public void setup() throws Exception { |
| threadPool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); |
| ome = createOME("Topic1"); |
| |
| encodedRecord = new byte[100]; |
| BlobContainerAsyncClient mockContainerAsyncClient = PowerMockito.mock(BlobContainerAsyncClient.class); |
| mockDataFileWriter = mock(DataFileWriter.class); |
| mockAzureBlobOutputStream = mock(AzureBlobOutputStream.class); |
| mockBlockBlobAsyncClient = PowerMockito.mock(BlockBlobAsyncClient.class); |
| when(mockBlockBlobAsyncClient.getBlobUrl()).thenReturn("https://samza.blob.core.windows.net/fake-blob-url"); |
| |
| mockCompression = CompressionFactory.getInstance().getCompression(CompressionType.GZIP); |
| azureBlobAvroWriter = |
| spy(new AzureBlobAvroWriter(mockContainerAsyncClient, mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD, |
| 60000, "test", mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient, |
| blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME, |
| Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false)); // keeping blob size and number of records unlimited |
| doReturn(encodedRecord).when(azureBlobAvroWriter).encodeRecord((IndexedRecord) ome.getMessage()); |
| } |
| @Test |
| public void testWrite() throws Exception { |
| int numberOfMessages = 10; |
| for (int i = 0; i < numberOfMessages; ++i) { |
| azureBlobAvroWriter.write(ome); |
| } |
| verify(mockDataFileWriter, times(numberOfMessages)).appendEncoded(ByteBuffer.wrap(encodedRecord)); |
| verify(mockAzureBlobOutputStream, times(numberOfMessages)).incrementNumberOfRecordsInBlob(); |
| } |
| |
| @Test |
| public void testWriteGenericRecord() throws Exception { |
| OutgoingMessageEnvelope omeGenericRecord = createOMEGenericRecord("Topic1"); |
| doReturn(encodedRecord).when(azureBlobAvroWriter).encodeRecord((IndexedRecord) omeGenericRecord.getMessage()); |
| int numberOfMessages = 10; |
| for (int i = 0; i < numberOfMessages; ++i) { |
| azureBlobAvroWriter.write(omeGenericRecord); |
| } |
| verify(mockDataFileWriter, times(numberOfMessages)).appendEncoded(ByteBuffer.wrap(encodedRecord)); |
| verify(mockAzureBlobOutputStream, times(numberOfMessages)).incrementNumberOfRecordsInBlob(); |
| } |
| |
| @Test |
| public void testWriteByteArray() throws Exception { |
| OutgoingMessageEnvelope omeEncoded = new OutgoingMessageEnvelope(new SystemStream(SYSTEM_NAME, "Topic1"), "randomString".getBytes()); |
| int numberOfMessages = 10; |
| azureBlobAvroWriter.write(ome); |
| for (int i = 0; i < numberOfMessages; ++i) { |
| azureBlobAvroWriter.write(omeEncoded); |
| } |
| verify(mockDataFileWriter).appendEncoded(ByteBuffer.wrap(encodedRecord)); |
| verify(mockDataFileWriter, times(numberOfMessages)).appendEncoded(ByteBuffer.wrap((byte[]) omeEncoded.getMessage())); |
| verify(mockAzureBlobOutputStream, times(numberOfMessages + 1)).incrementNumberOfRecordsInBlob(); // +1 to account for first ome which is not encoded |
| } |
| |
| @Test(expected = IllegalStateException.class) |
| public void testWriteByteArrayWithoutSchema() throws Exception { |
| azureBlobAvroWriter = |
| spy(new AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class), mock(AzureBlobWriterMetrics.class), |
| threadPool, THRESHOLD, 60000, "test", |
| null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME, |
| 1000, 100, mockCompression, false)); |
| OutgoingMessageEnvelope omeEncoded = new OutgoingMessageEnvelope(new SystemStream(SYSTEM_NAME, "Topic1"), new byte[100]); |
| azureBlobAvroWriter.write(omeEncoded); |
| } |
| |
| @Test(expected = IOException.class) |
| public void testWriteWhenDataFileWriterFails() throws Exception { |
| doThrow(new IOException("Failed")).when(mockDataFileWriter).appendEncoded(ByteBuffer.wrap(encodedRecord)); |
| azureBlobAvroWriter.write(ome); |
| } |
| |
| @Test |
| public void testClose() throws Exception { |
| azureBlobAvroWriter.close(); |
| verify(mockDataFileWriter).close(); |
| } |
| |
| @Test(expected = SamzaException.class) |
| public void testCloseWhenDataFileWriterFails() throws Exception { |
| doThrow(new IOException("Failed")).when(mockDataFileWriter).close(); |
| |
| azureBlobAvroWriter.flush(); |
| azureBlobAvroWriter.close(); |
| verify(mockAzureBlobOutputStream, never()).close(); |
| } |
| |
| @Test(expected = RuntimeException.class) |
| public void testCloseWhenOutputStreamFails() throws Exception { |
| doThrow(new IOException("DataFileWriter failed")).when(mockDataFileWriter).close(); |
| doThrow(new RuntimeException("failed")).when(mockAzureBlobOutputStream).close(); |
| |
| azureBlobAvroWriter.close(); |
| } |
| |
| @Test |
| public void testFlush() throws Exception { |
| azureBlobAvroWriter.flush(); |
| verify(mockDataFileWriter).flush(); |
| } |
| |
| @Test(expected = IOException.class) |
| public void testFlushWhenDataFileWriterFails() throws Exception { |
| doThrow(new IOException("Failed")).when(mockDataFileWriter).flush(); |
| azureBlobAvroWriter.flush(); |
| } |
| |
| @Test |
| public void testMaxBlobSizeExceeded() throws Exception { |
| String blobUrlPrefix = "test"; |
| String blobNameRegex = "test/[0-9]{4}/[0-9]{2}/[0-9]{2}/[0-9]{2}/[0-9]{2}-[0-9]{2}-.{8}.avro.gz"; |
| long maxBlobSize = 1000; |
| AzureBlobWriterMetrics mockMetrics = mock(AzureBlobWriterMetrics.class); |
| BlobContainerAsyncClient mockContainerClient = PowerMockito.mock(BlobContainerAsyncClient.class); |
| azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient, |
| mockMetrics, threadPool, THRESHOLD, 60000, blobUrlPrefix, |
| null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME, |
| maxBlobSize, 10, mockCompression, true)); |
| |
| DataFileWriter mockDataFileWriter1 = mock(DataFileWriter.class); |
| PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter1); |
| |
| BlobAsyncClient mockBlobAsyncClient1 = mock(BlobAsyncClient.class); |
| doReturn(mockBlobAsyncClient1).when(mockContainerClient).getBlobAsyncClient(Matchers.matches(blobNameRegex)); |
| BlockBlobAsyncClient mockBlockBlobAsyncClient1 = mock(BlockBlobAsyncClient.class); |
| doReturn(mockBlockBlobAsyncClient1).when(mockBlobAsyncClient1).getBlockBlobAsyncClient(); |
| |
| AzureBlobOutputStream mockAzureBlobOutputStream1 = mock(AzureBlobOutputStream.class); |
| PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient1, threadPool, |
| mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME, |
| (long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream1); |
| when(mockAzureBlobOutputStream1.getSize()).thenReturn((long) maxBlobSize - 1); |
| |
| // first OME creates the first blob |
| azureBlobAvroWriter.write(ome); |
| |
| OutgoingMessageEnvelope ome2 = createOME("Topic2"); |
| DataFileWriter mockDataFileWriter2 = mock(DataFileWriter.class); |
| PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter2); |
| |
| BlobAsyncClient mockBlobAsyncClient2 = mock(BlobAsyncClient.class); |
| doReturn(mockBlobAsyncClient2).when(mockContainerClient).getBlobAsyncClient(Matchers.matches(blobNameRegex)); |
| BlockBlobAsyncClient mockBlockBlobAsyncClient2 = mock(BlockBlobAsyncClient.class); |
| doReturn(mockBlockBlobAsyncClient2).when(mockBlobAsyncClient2).getBlockBlobAsyncClient(); |
| |
| AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class); |
| PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient2, threadPool, |
| mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME, |
| (long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream2); |
| when(mockAzureBlobOutputStream2.getSize()).thenReturn((long) maxBlobSize - 1); |
| |
| // Second OME creates the second blob because maxBlobSize is 1000 and mockAzureBlobOutputStream.getSize is 999. |
| azureBlobAvroWriter.write(ome2); |
| |
| ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class); |
| verify(mockContainerClient, times(2)).getBlobAsyncClient(argument.capture()); |
| argument.getAllValues().forEach(blobName -> { |
| Assert.assertTrue(blobName.contains(blobUrlPrefix)); |
| }); |
| List<String> allBlobNames = argument.getAllValues(); |
| Assert.assertNotEquals(allBlobNames.get(0), allBlobNames.get(1)); |
| |
| verify(mockDataFileWriter1).appendEncoded(ByteBuffer.wrap(encodeRecord((IndexedRecord) ome.getMessage()))); |
| verify(mockDataFileWriter2).appendEncoded(ByteBuffer.wrap(encodeRecord((IndexedRecord) ome2.getMessage()))); |
| |
| verify(mockDataFileWriter1).create(((IndexedRecord) ome.getMessage()).getSchema(), mockAzureBlobOutputStream1); |
| verify(mockDataFileWriter2).create(((IndexedRecord) ome2.getMessage()).getSchema(), mockAzureBlobOutputStream2); |
| } |
| |
| @Test |
| public void testRecordLimitExceeded() throws Exception { |
| String blobUrlPrefix = "test"; |
| String blobNameRegex = "test/[0-9]{4}/[0-9]{2}/[0-9]{2}/[0-9]{2}/[0-9]{2}-[0-9]{2}-.{8}.avro.gz"; |
| AzureBlobWriterMetrics mockMetrics = mock(AzureBlobWriterMetrics.class); |
| long maxBlobSize = AzureBlobAvroWriter.DATAFILEWRITER_OVERHEAD + 1000; |
| long maxRecordsPerBlob = 10; |
| BlobContainerAsyncClient mockContainerClient = PowerMockito.mock(BlobContainerAsyncClient.class); |
| azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient, |
| mockMetrics, threadPool, THRESHOLD, 60000, blobUrlPrefix, |
| null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME, |
| maxBlobSize, maxRecordsPerBlob, mockCompression, true)); |
| |
| DataFileWriter mockDataFileWriter1 = mock(DataFileWriter.class); |
| PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter1); |
| |
| BlobAsyncClient mockBlobAsyncClient1 = mock(BlobAsyncClient.class); |
| doReturn(mockBlobAsyncClient1).when(mockContainerClient).getBlobAsyncClient(Matchers.matches(blobNameRegex)); |
| BlockBlobAsyncClient mockBlockBlobAsyncClient1 = mock(BlockBlobAsyncClient.class); |
| doReturn(mockBlockBlobAsyncClient1).when(mockBlobAsyncClient1).getBlockBlobAsyncClient(); |
| |
| AzureBlobOutputStream mockAzureBlobOutputStream1 = mock(AzureBlobOutputStream.class); |
| PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient1, threadPool, |
| mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME, |
| (long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream1); |
| when(mockAzureBlobOutputStream1.getSize()).thenReturn((long) 1); |
| |
| // first OME creates the first blob and 11th OME (ome2) creates the second blob. |
| |
| for (int i = 0; i < maxRecordsPerBlob; i++) { |
| azureBlobAvroWriter.write(ome); |
| } |
| |
| OutgoingMessageEnvelope ome2 = createOME("Topic2"); |
| DataFileWriter mockDataFileWriter2 = mock(DataFileWriter.class); |
| PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter2); |
| |
| BlobAsyncClient mockBlobAsyncClient2 = mock(BlobAsyncClient.class); |
| doReturn(mockBlobAsyncClient2).when(mockContainerClient).getBlobAsyncClient(Matchers.matches(blobNameRegex)); |
| BlockBlobAsyncClient mockBlockBlobAsyncClient2 = mock(BlockBlobAsyncClient.class); |
| doReturn(mockBlockBlobAsyncClient2).when(mockBlobAsyncClient2).getBlockBlobAsyncClient(); |
| |
| AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class); |
| PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient2, threadPool, |
| mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME, |
| (long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream2); |
| when(mockAzureBlobOutputStream2.getSize()).thenReturn((long) 1); |
| |
| azureBlobAvroWriter.write(ome2); |
| |
| ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class); |
| verify(mockContainerClient, times(2)).getBlobAsyncClient(argument.capture()); |
| argument.getAllValues().forEach(blobName -> { |
| Assert.assertTrue(blobName.contains(blobUrlPrefix)); |
| }); |
| List<String> allBlobNames = argument.getAllValues(); |
| Assert.assertNotEquals(allBlobNames.get(0), allBlobNames.get(1)); |
| |
| verify(mockDataFileWriter1, times((int) maxRecordsPerBlob)).appendEncoded(ByteBuffer.wrap(encodeRecord((IndexedRecord) ome.getMessage()))); |
| verify(mockDataFileWriter2).appendEncoded(ByteBuffer.wrap(encodeRecord((IndexedRecord) ome2.getMessage()))); |
| |
| verify(mockDataFileWriter1).create(((IndexedRecord) ome.getMessage()).getSchema(), mockAzureBlobOutputStream1); |
| verify(mockDataFileWriter2).create(((IndexedRecord) ome2.getMessage()).getSchema(), mockAzureBlobOutputStream2); |
| } |
| |
| @Test |
| public void testMultipleBlobClose() throws Exception { |
| String blobUrlPrefix = "test"; |
| long maxBlobSize = AzureBlobAvroWriter.DATAFILEWRITER_OVERHEAD + 1000; |
| long maxRecordsPerBlob = 10; |
| BlobContainerAsyncClient mockContainerClient = PowerMockito.mock(BlobContainerAsyncClient.class); |
| azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient, |
| mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD, 60000, blobUrlPrefix, |
| mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME, |
| maxBlobSize, maxRecordsPerBlob, mockCompression, false)); |
| |
| DataFileWriter<IndexedRecord> mockDataFileWriter2 = mock(DataFileWriter.class); |
| AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class); |
| |
| when(mockAzureBlobOutputStream.getSize()).thenReturn((long) 1); |
| BlobAsyncClient mockBlobAsyncClient = mock(BlobAsyncClient.class); |
| doReturn(mockBlobAsyncClient).when(mockContainerClient).getBlobAsyncClient(anyString()); |
| doReturn(mockBlockBlobAsyncClient).when(mockBlobAsyncClient).getBlockBlobAsyncClient(); |
| PowerMockito.whenNew(AzureBlobOutputStream.class).withAnyArguments().thenReturn(mockAzureBlobOutputStream2); |
| PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter2); |
| for (int i = 0; i <= maxRecordsPerBlob; i++) { |
| azureBlobAvroWriter.write(ome); |
| } |
| // first OME creates the first blob and 11th OME creates the second blob. |
| |
| azureBlobAvroWriter.close(); |
| verify(mockDataFileWriter).close(); |
| verify(mockDataFileWriter2).close(); |
| } |
| |
| @Test |
| public void testEncodeRecord() throws Exception { |
| azureBlobAvroWriter = spy(new AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class), |
| mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD, |
| 60000, "test", mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient, |
| blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME, |
| Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false)); |
| IndexedRecord record = new GenericRecordEvent(); |
| Assert.assertTrue(Arrays.equals(encodeRecord(record), azureBlobAvroWriter.encodeRecord(record))); |
| } |
| |
| @Test |
| public void testMultipleThreadWrites() throws Exception { |
| Thread t1 = writeInThread(ome, azureBlobAvroWriter, 10); |
| OutgoingMessageEnvelope ome2 = createOMEGenericRecord("TOPIC2"); |
| Thread t2 = writeInThread(ome2, azureBlobAvroWriter, 10); |
| |
| t1.start(); |
| t2.start(); |
| t1.join(60000); |
| t2.join(60000); |
| |
| verify(mockDataFileWriter, times(10)).appendEncoded(ByteBuffer.wrap(encodedRecord)); |
| verify(mockDataFileWriter, times(10)).appendEncoded(ByteBuffer.wrap(encodeRecord((IndexedRecord) ome2.getMessage()))); |
| verify(mockAzureBlobOutputStream, times(20)).incrementNumberOfRecordsInBlob(); |
| } |
| |
| @Test |
| public void testMultipleThreadWriteFlush() throws Exception { |
| Thread t1 = writeInThread(ome, azureBlobAvroWriter, 10); |
| Thread t2 = flushInThread(azureBlobAvroWriter); |
| |
| t1.start(); |
| t2.start(); |
| t1.join(60000); |
| t2.join(60000); |
| |
| verify(mockDataFileWriter, times(10)).appendEncoded(ByteBuffer.wrap(encodedRecord)); |
| verify(mockAzureBlobOutputStream, times(10)).incrementNumberOfRecordsInBlob(); |
| verify(mockDataFileWriter).flush(); |
| } |
| |
| @Test |
| public void testMultipleThreadWriteFlushInBoth() throws Exception { |
| Thread t1 = writeFlushInThread(ome, azureBlobAvroWriter, 10); |
| OutgoingMessageEnvelope ome2 = createOMEGenericRecord("TOPIC2"); |
| Thread t2 = writeFlushInThread(ome2, azureBlobAvroWriter, 10); |
| |
| t1.start(); |
| t2.start(); |
| t1.join(60000); |
| t2.join(60000); |
| |
| verify(mockDataFileWriter, times(10)).appendEncoded(ByteBuffer.wrap(encodedRecord)); |
| verify(mockDataFileWriter, times(10)).appendEncoded(ByteBuffer.wrap(encodeRecord((IndexedRecord) ome2.getMessage()))); |
| verify(mockDataFileWriter, times(2)).flush(); |
| verify(mockAzureBlobOutputStream, times(20)).incrementNumberOfRecordsInBlob(); |
| } |
| |
| @Test |
| public void testMultipleThreadWriteFlushFinallyClose() throws Exception { |
| Thread t1 = writeFlushInThread(ome, azureBlobAvroWriter, 10); |
| OutgoingMessageEnvelope ome2 = createOMEGenericRecord("TOPIC2"); |
| Thread t2 = writeFlushInThread(ome2, azureBlobAvroWriter, 10); |
| |
| t1.start(); |
| t2.start(); |
| t1.join(60000); |
| t2.join(60000); |
| azureBlobAvroWriter.close(); |
| |
| verify(mockDataFileWriter, times(10)).appendEncoded(ByteBuffer.wrap(encodedRecord)); |
| verify(mockDataFileWriter, times(10)).appendEncoded(ByteBuffer.wrap(encodeRecord((IndexedRecord) ome2.getMessage()))); |
| verify(mockDataFileWriter, times(2)).flush(); |
| verify(mockDataFileWriter).close(); |
| verify(mockAzureBlobOutputStream, times(20)).incrementNumberOfRecordsInBlob(); |
| } |
| |
| private byte[] encodeRecord(IndexedRecord record) throws Exception { |
| ByteArrayOutputStream out = new ByteArrayOutputStream(); |
| Schema schema = record.getSchema(); |
| EncoderFactory encoderfactory = new EncoderFactory(); |
| BinaryEncoder encoder = encoderfactory.binaryEncoder(out, null); |
| DatumWriter<IndexedRecord> writer; |
| if (record instanceof SpecificRecord) { |
| writer = new SpecificDatumWriter<>(schema); |
| } else { |
| writer = new GenericDatumWriter<>(schema); |
| } |
| writer.write(record, encoder); |
| encoder.flush(); //encoder may buffer |
| return out.toByteArray(); |
| } |
| |
| private Thread writeInThread(OutgoingMessageEnvelope ome, AzureBlobAvroWriter azureBlobAvroWriter, |
| int numberOfSends) { |
| Thread t = new Thread() { |
| @Override |
| public void run() { |
| try { |
| for (int i = 0; i < numberOfSends; i++) { |
| azureBlobAvroWriter.write(ome); |
| } |
| } catch (IOException e) { |
| throw new SamzaException(e); |
| } |
| } |
| }; |
| return t; |
| } |
| |
| private Thread flushInThread(AzureBlobAvroWriter azureBlobAvroWriter) { |
| Thread t = new Thread() { |
| @Override |
| public void run() { |
| try { |
| azureBlobAvroWriter.flush(); |
| } catch (IOException e) { |
| throw new SamzaException(e); |
| } |
| } |
| }; |
| return t; |
| } |
| |
| private Thread writeFlushInThread(OutgoingMessageEnvelope ome, AzureBlobAvroWriter azureBlobAvroWriter, |
| int numberOfSends) { |
| Thread t = new Thread() { |
| @Override |
| public void run() { |
| try { |
| for (int i = 0; i < numberOfSends; i++) { |
| azureBlobAvroWriter.write(ome); |
| } |
| azureBlobAvroWriter.flush(); |
| } catch (IOException e) { |
| throw new SamzaException(e); |
| } |
| } |
| }; |
| return t; |
| } |
| } |