blob: 3cecadb91d64844ceb639ff68bf609c8da121c33 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.restart;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.tests.system.NiFiInstance;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class FlowFileRestorationIT extends NiFiSystemIT {
@Test
public void testDataInMissingQueueRestoredWhenQueueRestored() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generator = getClientUtil().createProcessor("GenerateFlowFile");
getClientUtil().updateProcessorProperties(generator, Collections.singletonMap("File Size", "1 KB"));
getClientUtil().updateProcessorSchedulingPeriod(generator, "100 min");
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
final ConnectionEntity connection = getClientUtil().createConnection(generator, terminate, "success");
getNifiClient().getProcessorClient().startProcessor(generator);
waitForQueueCount(connection.getId(), 1);
getNifiClient().getProcessorClient().stopProcessor(generator);
final byte[] flowFileContents = getFlowFileContents(connection.getId(), 0);
assertEquals(1024, flowFileContents.length);
final NiFiInstance nifiInstance = getNiFiInstance();
nifiInstance.stop();
final File nifiHome = nifiInstance.getInstanceDirectory();
final File confDir = new File(nifiHome, "conf");
final File flowXmlGz = new File(confDir, "flow.xml.gz");
assertTrue(flowXmlGz.delete());
final File flowJsonGz = new File(confDir, "flow.json.gz");
final byte[] flowJsonGzBytes = Files.readAllBytes(flowJsonGz.toPath());
assertTrue(flowJsonGz.delete());
nifiInstance.start();
try {
getNifiClient().getConnectionClient().getConnection(connection.getId());
Assert.fail("Didn't expect to retrieve a connection");
} catch (final NiFiClientException nfce) {
// Expected because the connection no longer exists.
}
// Stop the instance, restore the flow.xml.gz, and restart
nifiInstance.stop();
Files.write(flowJsonGz.toPath(), flowJsonGzBytes, StandardOpenOption.CREATE);
nifiInstance.start();
// Ensure that there's a FlowFile queued up and that its contents are still accessible and have not changed.
final ConnectionEntity retrievedConnection = getNifiClient().getConnectionClient().getConnection(connection.getId());
assertNotNull(retrievedConnection);
waitForQueueCount(connection.getId(), 1);
final byte[] contentsAfterRestart = getFlowFileContents(connection.getId(), 0);
assertArrayEquals(flowFileContents, contentsAfterRestart);
}
private byte[] getFlowFileContents(final String connectionId, final int flowFileIndex) throws IOException, NiFiClientException {
final byte[] flowFileContents;
try (final InputStream in = getClientUtil().getFlowFileContent(connectionId, flowFileIndex);
final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
StreamUtils.copy(in, baos);
return baos.toByteArray();
}
}
}