blob: 92789455165471f8a5afa03d8eeb4fbfa0f14fc1 [file] [log] [blame]
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.StringUtils;
import io.druid.segment.TestHelper;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
public class SQLMetadataSegmentManagerTest
{
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private SQLMetadataSegmentManager manager;
private SQLMetadataSegmentPublisher publisher;
private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
private final DataSegment segment1 = new DataSegment(
"wikipedia",
Intervals.of("2012-03-15T00:00:00.000/2012-03-16T00:00:00.000"),
"2012-03-16T00:36:30.848Z",
ImmutableMap.<String, Object>of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia/index/y=2012/m=03/d=15/2012-03-16T00:36:30.848Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
private final DataSegment segment2 = new DataSegment(
"wikipedia",
Intervals.of("2012-01-05T00:00:00.000/2012-01-06T00:00:00.000"),
"2012-01-06T22:19:12.565Z",
ImmutableMap.<String, Object>of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia/index/y=2012/m=01/d=05/2012-01-06T22:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
@Before
public void setUp() throws Exception
{
TestDerbyConnector connector = derbyConnectorRule.getConnector();
manager = new SQLMetadataSegmentManager(
jsonMapper,
Suppliers.ofInstance(new MetadataSegmentManagerConfig()),
derbyConnectorRule.metadataTablesConfigSupplier(),
connector
);
publisher = new SQLMetadataSegmentPublisher(
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
connector
);
connector.createSegmentTable();
publisher.publishSegment(segment1);
publisher.publishSegment(segment2);
}
@After
public void teardown()
{
if (manager.isStarted()) {
manager.stop();
}
}
@Test
public void testPoll()
{
manager.start();
manager.poll();
Assert.assertEquals(
ImmutableList.of("wikipedia"),
manager.getAllDatasourceNames()
);
Assert.assertEquals(
ImmutableSet.of(segment1, segment2),
ImmutableSet.copyOf(manager.getInventoryValue("wikipedia").getSegments())
);
}
@Test
public void testPollWithCurroptedSegment()
{
//create a corrupted segment entry in segments table, which tests
//that overall loading of segments from database continues to work
//even in one of the entries are corrupted.
publisher.publishSegment(
"corrupt-segment-id",
"corrupt-datasource",
"corrupt-create-date",
"corrupt-start-date",
"corrupt-end-date",
true,
"corrupt-version",
true,
StringUtils.toUtf8("corrupt-payload")
);
EmittingLogger.registerEmitter(new NoopServiceEmitter());
manager.start();
manager.poll();
Assert.assertEquals(
"wikipedia", Iterables.getOnlyElement(manager.getInventory()).getName()
);
}
@Test
public void testGetUnusedSegmentsForInterval() throws Exception
{
manager.start();
manager.poll();
Assert.assertTrue(manager.removeDatasource("wikipedia"));
Assert.assertEquals(
ImmutableList.of(segment2.getInterval()),
manager.getUnusedSegmentIntervals("wikipedia", Intervals.of("1970/3000"), 1)
);
Assert.assertEquals(
ImmutableList.of(segment2.getInterval(), segment1.getInterval()),
manager.getUnusedSegmentIntervals("wikipedia", Intervals.of("1970/3000"), 5)
);
}
@Test
public void testRemoveDataSource() throws IOException
{
manager.start();
manager.poll();
final String newDataSource = "wikipedia2";
final DataSegment newSegment = new DataSegment(
newDataSource,
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
publisher.publishSegment(newSegment);
Assert.assertNull(manager.getInventoryValue(newDataSource));
Assert.assertTrue(manager.removeDatasource(newDataSource));
}
@Test
public void testRemoveDataSegment() throws IOException
{
manager.start();
manager.poll();
final String newDataSource = "wikipedia2";
final DataSegment newSegment = new DataSegment(
newDataSource,
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
publisher.publishSegment(newSegment);
Assert.assertNull(manager.getInventoryValue(newDataSource));
Assert.assertTrue(manager.removeSegment(newDataSource, newSegment.getIdentifier()));
}
@Test
public void testStopAndStart()
{
// Simulate successive losing and getting the coordinator leadership
manager.start();
manager.stop();
manager.start();
manager.stop();
}
}