blob: 33d3e1163fc8fe865695ef44963d5d69e631efff [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.dataloads.bulk;
import org.apache.commons.collections.IteratorUtils;
import org.apache.metron.domain.Configuration;
import org.easymock.EasyMock;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.hppc.ObjectObjectOpenHashMap;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.powermock.api.easymock.PowerMock.replayAll;
import static org.powermock.api.easymock.PowerMock.verifyAll;
@RunWith(PowerMockRunner.class)
@PrepareForTest(DeleteIndexResponse.class)
public class ElasticsearchDataPrunerTest {
private Date testDate;
private DateFormat dateFormat = new SimpleDateFormat("yyyy.MM.dd.HH");
private Configuration configuration;
private Client indexClient = mock(Client.class);
private AdminClient adminClient = mock(AdminClient.class);
private IndicesAdminClient indicesAdminClient = mock(FilterClient.IndicesAdmin.class);
private DeleteIndexRequestBuilder deleteIndexRequestBuilder = mock(DeleteIndexRequestBuilder.class);
private DeleteIndexRequest deleteIndexRequest = mock(DeleteIndexRequest.class);
private ActionFuture<DeleteIndexResponse> deleteIndexAction = mock(ActionFuture.class);
private DeleteIndexResponse deleteIndexResponse = PowerMock.createMock(DeleteIndexResponse.class);
private ByteArrayOutputStream outContent;
private ByteArrayOutputStream errContent;
@Before
public void setUp() throws Exception {
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.MONTH, Calendar.MARCH);
calendar.set(Calendar.YEAR, 2016);
calendar.set(Calendar.DATE, 31);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND,0);
testDate = calendar.getTime();
when(indexClient.admin()).thenReturn(adminClient);
when(adminClient.indices()).thenReturn(indicesAdminClient);
when(indicesAdminClient.prepareDelete(Matchers.<String>anyVararg())).thenReturn(deleteIndexRequestBuilder);
when(indicesAdminClient.delete((DeleteIndexRequest) any())).thenReturn(deleteIndexAction);
when(deleteIndexRequestBuilder.request()).thenReturn(deleteIndexRequest);
when(deleteIndexAction.actionGet()).thenReturn(deleteIndexResponse);
File resourceFile = new File("../Metron-Testing/src/main/resources/sample/config/");
Path resourcePath = Paths.get(resourceFile.getCanonicalPath());
configuration = new Configuration(resourcePath);
outContent = new ByteArrayOutputStream();
errContent = new ByteArrayOutputStream();
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));
}
@Test(expected = IndexMissingException.class)
public void testWillThrowOnMissingIndex() throws Exception {
when(indicesAdminClient.delete((DeleteIndexRequest) any())).thenThrow(new IndexMissingException(new Index("Test Exception")));
ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 30, configuration, indexClient,"*");
pruner.deleteIndex(adminClient, "baz");
}
@Test
public void testDeletesCorrectIndexes() throws Exception {
//Mock Cluster Admin
ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
ClusterStateRequestBuilder clusterStateRequestBuilder = mock(ClusterStateRequestBuilder.class);
ClusterStateResponse clusterStateResponse = mock(ClusterStateResponse.class);
ClusterState clusterState = mock(ClusterState.class);
ObjectObjectOpenHashMap<String, IndexMetaData> clusterIndexes = new ObjectObjectOpenHashMap();
MetaData clusterMetadata = mock(MetaData.class);
when(adminClient.cluster()).thenReturn(clusterAdminClient);
when(clusterAdminClient.prepareState()).thenReturn(clusterStateRequestBuilder);
when(clusterStateRequestBuilder.get()).thenReturn(clusterStateResponse);
when(clusterStateResponse.getState()).thenReturn(clusterState);
when(clusterState.getMetaData()).thenReturn(clusterMetadata);
int numDays = 5;
Date indexDate = new Date();
indexDate.setTime(testDate.getTime() - TimeUnit.DAYS.toMillis(numDays));
for (int i = 0; i < numDays * 24; i++) {
String indexName = "sensor_index_" + dateFormat.format(indexDate);
clusterIndexes.put(indexName, null);
indexDate.setTime(indexDate.getTime() + TimeUnit.HOURS.toMillis(1));
}
when(clusterMetadata.getIndices()).thenReturn(ImmutableOpenMap.copyOf(clusterIndexes));
EasyMock.expect(deleteIndexResponse.isAcknowledged()).andReturn(true);
replayAll();
ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 1, configuration, indexClient, "sensor_index_");
pruner.indexClient = indexClient;
Long deleteCount = pruner.prune();
assertEquals("Should have pruned 24 indices", 24L, deleteCount.longValue());
verifyAll();
}
@Test
public void testFilter() throws Exception {
ObjectObjectOpenHashMap<String, IndexMetaData> indexNames = new ObjectObjectOpenHashMap();
SimpleDateFormat dateChecker = new SimpleDateFormat("yyyyMMdd");
int numDays = 5;
String[] expectedIndices = new String[24];
Date indexDate = new Date();
indexDate.setTime(testDate.getTime() - TimeUnit.DAYS.toMillis(numDays));
for (int i = 0, j=0; i < numDays * 24; i++) {
String indexName = "sensor_index_" + dateFormat.format(indexDate);
//Delete 20160330
if( dateChecker.format(indexDate).equals("20160330") ){
expectedIndices[j++] = indexName;
}
indexNames.put(indexName, null);
indexDate.setTime(indexDate.getTime() + TimeUnit.HOURS.toMillis(1));
}
ImmutableOpenMap<String, IndexMetaData> testIndices = ImmutableOpenMap.copyOf(indexNames);
ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 1, configuration, indexClient, "sensor_index_");
pruner.indexClient = indexClient;
Iterable<String> filteredIndices = pruner.getFilteredIndices(testIndices);
Object[] indexArray = IteratorUtils.toArray(filteredIndices.iterator());
Arrays.sort(indexArray);
Arrays.sort(expectedIndices);
assertArrayEquals(expectedIndices,indexArray);
}
}