| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.reporting; |
| |
| import org.apache.nifi.attribute.expression.language.StandardPropertyValue; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.components.PropertyValue; |
| import org.apache.nifi.components.ValidationContext; |
| import org.apache.nifi.context.PropertyContext; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.remote.Transaction; |
| import org.apache.nifi.remote.TransferDirection; |
| import org.apache.nifi.remote.client.SiteToSiteClient; |
| import org.apache.nifi.reporting.s2s.SiteToSiteUtils; |
| import org.apache.nifi.reporting.s2s.SiteToSiteUtils.NiFiUrlValidator; |
| import org.apache.nifi.util.MockPropertyValue; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| import org.mockito.stubbing.Answer; |
| |
| import javax.json.Json; |
| import javax.json.JsonObject; |
| import javax.json.JsonReader; |
| import javax.json.JsonValue; |
| import java.io.ByteArrayInputStream; |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Mockito.when; |
| |
| public class TestSiteToSiteBulletinReportingTask { |
| |
| @Test |
| public void testUrls() { |
| final ValidationContext context = Mockito.mock(ValidationContext.class); |
| Mockito.when(context.newPropertyValue(Mockito.anyString())).then((Answer<PropertyValue>) invocation -> { |
| String value = (String) invocation.getArguments()[0]; |
| return new StandardPropertyValue(value, null, null); |
| }); |
| |
| assertTrue(new NiFiUrlValidator().validate("url", "http://localhost:8080/nifi", context).isValid()); |
| assertTrue(new NiFiUrlValidator().validate("url", "http://localhost:8080", context).isValid()); |
| assertFalse(new NiFiUrlValidator().validate("url", "", context).isValid()); |
| assertTrue(new NiFiUrlValidator().validate("url", "https://localhost:8080/nifi", context).isValid()); |
| assertTrue(new NiFiUrlValidator().validate("url", "https://localhost:8080/nifi,https://localhost:8080/nifi", context).isValid()); |
| assertTrue(new NiFiUrlValidator().validate("url", "https://localhost:8080/nifi, https://localhost:8080/nifi", context).isValid()); |
| assertFalse(new NiFiUrlValidator().validate("url", "http://localhost:8080/nifi, https://localhost:8080/nifi", context).isValid()); |
| assertTrue(new NiFiUrlValidator().validate("url", "http://localhost:8080/nifi,http://localhost:8080/nifi", context).isValid()); |
| assertTrue(new NiFiUrlValidator().validate("url", "http://localhost:8080/nifi,http://localhost:8080", context).isValid()); |
| } |
| |
| @Test |
| public void testSerializedForm() throws IOException, InitializationException { |
| // creating the list of bulletins |
| final List<Bulletin> bulletins = new ArrayList<>(); |
| bulletins.add(BulletinFactory.createBulletin("group-id", "group-name", "source-id", ComponentType.PROCESSOR, "source-name", "category", "severity", "message", "group-path", "flowFileUuid")); |
| |
| // mock the access to the list of bulletins |
| final ReportingContext context = Mockito.mock(ReportingContext.class); |
| final BulletinRepository repository = Mockito.mock(BulletinRepository.class); |
| Mockito.when(context.getBulletinRepository()).thenReturn(repository); |
| Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins); |
| |
| // creating reporting task |
| final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask(); |
| |
| // settings properties and mocking access to properties |
| final Map<PropertyDescriptor, String> properties = new HashMap<>(); |
| for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { |
| properties.put(descriptor, descriptor.getDefaultValue()); |
| } |
| properties.put(SiteToSiteUtils.BATCH_SIZE, "1000"); |
| properties.put(SiteToSiteUtils.PLATFORM, "nifi"); |
| |
| Mockito.doAnswer((Answer<PropertyValue>) invocation -> { |
| final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class); |
| return new MockPropertyValue(properties.get(descriptor)); |
| }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); |
| |
| // setup the mock initialization context |
| final ComponentLog logger = Mockito.mock(ComponentLog.class); |
| final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); |
| Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); |
| Mockito.when(initContext.getLogger()).thenReturn(logger); |
| |
| task.initialize(initContext); |
| task.onTrigger(context); |
| |
| // test checking |
| assertEquals(1, task.dataSent.size()); |
| final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); |
| JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); |
| JsonObject bulletinJson = jsonReader.readArray().getJsonObject(0); |
| assertEquals("message", bulletinJson.getString("bulletinMessage")); |
| assertEquals("group-name", bulletinJson.getString("bulletinGroupName")); |
| assertEquals("group-path", bulletinJson.getString("bulletinGroupPath")); |
| assertEquals("flowFileUuid", bulletinJson.getString("bulletinFlowFileUuid")); |
| } |
| |
| @Test |
| public void testSerializedFormWithNullValues() throws IOException, InitializationException { |
| // creating the list of bulletins |
| final List<Bulletin> bulletins = new ArrayList<>(); |
| bulletins.add(BulletinFactory.createBulletin("group-id", "group-name", "source-id", "source-name", "category", "severity", "message")); |
| |
| // mock the access to the list of bulletins |
| final ReportingContext context = Mockito.mock(ReportingContext.class); |
| final BulletinRepository repository = Mockito.mock(BulletinRepository.class); |
| Mockito.when(context.getBulletinRepository()).thenReturn(repository); |
| Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins); |
| |
| // creating reporting task |
| final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask(); |
| |
| // settings properties and mocking access to properties |
| final Map<PropertyDescriptor, String> properties = new HashMap<>(); |
| for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { |
| properties.put(descriptor, descriptor.getDefaultValue()); |
| } |
| properties.put(SiteToSiteUtils.BATCH_SIZE, "1000"); |
| properties.put(SiteToSiteUtils.PLATFORM, "nifi"); |
| properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES, "true"); |
| |
| Mockito.doAnswer((Answer<PropertyValue>) invocation -> { |
| final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class); |
| return new MockPropertyValue(properties.get(descriptor)); |
| }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); |
| |
| // setup the mock initialization context |
| final ComponentLog logger = Mockito.mock(ComponentLog.class); |
| final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); |
| Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); |
| Mockito.when(initContext.getLogger()).thenReturn(logger); |
| |
| task.initialize(initContext); |
| task.onTrigger(context); |
| |
| final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); |
| JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); |
| JsonObject bulletinJson = jsonReader.readArray().getJsonObject(0); |
| assertEquals(JsonValue.NULL, bulletinJson.get("bulletinSourceType")); |
| } |
| |
| private static final class MockSiteToSiteBulletinReportingTask extends SiteToSiteBulletinReportingTask { |
| |
| public MockSiteToSiteBulletinReportingTask() throws IOException { |
| super(); |
| } |
| |
| final List<byte[]> dataSent = new ArrayList<>(); |
| |
| @Override |
| public void setup(PropertyContext reportContext) { |
| if(siteToSiteClient == null) { |
| final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class); |
| final Transaction transaction = Mockito.mock(Transaction.class); |
| |
| try { |
| Mockito.doAnswer((Answer<Object>) invocation -> { |
| final byte[] data = invocation.getArgument(0, byte[].class); |
| dataSent.add(data); |
| return null; |
| }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class)); |
| |
| when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction); |
| } catch (final Exception e) { |
| e.printStackTrace(); |
| Assert.fail(e.toString()); |
| } |
| siteToSiteClient = client; |
| } |
| } |
| } |
| } |