blob: b62d45348b3c099d6a5e28960d85411e413f5386 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.jackson.SegmentizerModule;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.join.BroadcastTableJoinableFactory;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.Joinable;
import org.apache.druid.segment.loading.BroadcastJoinableMMappedQueryableSegmentizerFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalLoadSpec;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.loading.SegmentLocalCacheLoader;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.loading.SegmentizerFactory;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public class SegmentManagerBroadcastJoinIndexedTableTest extends InitializedNullHandlingTest
{
private static final String TABLE_NAME = "test";
private static final String PREFIX = "j0";
private static final JoinConditionAnalysis JOIN_CONDITION_ANALYSIS = JoinConditionAnalysis.forExpression(
StringUtils.format("market == \"%s.market\"", PREFIX),
PREFIX,
ExprMacroTable.nil()
);
private static final Set<String> KEY_COLUMNS =
ImmutableSet.of("market", "longNumericNull", "doubleNumericNull", "floatNumericNull", "partial_null_column");
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
private LocalDataSegmentPuller segmentPuller;
private ObjectMapper objectMapper;
private IndexIO indexIO;
private File segmentCacheDir;
private File segmentDeepStorageDir;
private SegmentLocalCacheManager segmentCacheManager;
private SegmentManager segmentManager;
private BroadcastTableJoinableFactory joinableFactory;
@Before
public void setup() throws IOException
{
segmentPuller = new LocalDataSegmentPuller();
objectMapper = new DefaultObjectMapper()
.registerModule(new SegmentizerModule())
.registerModule(
new SimpleModule().registerSubtypes(new NamedType(LocalLoadSpec.class, "local"))
);
indexIO = new IndexIO(objectMapper, () -> 0);
objectMapper.setInjectableValues(
new InjectableValues.Std().addValue(LocalDataSegmentPuller.class, segmentPuller)
.addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE)
.addValue(ObjectMapper.class.getName(), objectMapper)
.addValue(IndexIO.class, indexIO)
);
segmentCacheDir = temporaryFolder.newFolder();
segmentDeepStorageDir = temporaryFolder.newFolder();
segmentCacheManager = new SegmentLocalCacheManager(
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Collections.singletonList(
new StorageLocationConfig(segmentCacheDir, null, null)
);
}
},
objectMapper
);
segmentManager = new SegmentManager(new SegmentLocalCacheLoader(segmentCacheManager, indexIO, objectMapper));
joinableFactory = new BroadcastTableJoinableFactory(segmentManager);
EmittingLogger.registerEmitter(new NoopServiceEmitter());
}
@After
public void teardown() throws IOException
{
FileUtils.deleteDirectory(segmentCacheDir);
}
@Test
public void testLoadIndexedTable() throws IOException, SegmentLoadingException
{
final DataSource dataSource = new GlobalTableDataSource(TABLE_NAME);
Assert.assertFalse(joinableFactory.isDirectlyJoinable(dataSource));
final String version = DateTimes.nowUtc().toString();
IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv");
final String interval = "2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z";
DataSegment segment = createSegment(data, interval, version);
Assert.assertTrue(segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP));
Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource));
Optional<Joinable> maybeJoinable = makeJoinable(dataSource);
Assert.assertTrue(maybeJoinable.isPresent());
Joinable joinable = maybeJoinable.get();
// cardinality currently tied to number of rows,
Assert.assertEquals(1210, joinable.getCardinality("market"));
Assert.assertEquals(1210, joinable.getCardinality("placement"));
Assert.assertEquals(
Optional.of(ImmutableSet.of("preferred")),
joinable.getCorrelatedColumnValues(
"market",
"spot",
"placement",
Long.MAX_VALUE,
false
)
);
Optional<byte[]> bytes = joinableFactory.computeJoinCacheKey(dataSource, JOIN_CONDITION_ANALYSIS);
Assert.assertTrue(bytes.isPresent());
assertSegmentIdEquals(segment.getId(), bytes.get());
// dropping the segment should make the table no longer available
segmentManager.dropSegment(segment);
maybeJoinable = makeJoinable(dataSource);
Assert.assertFalse(maybeJoinable.isPresent());
bytes = joinableFactory.computeJoinCacheKey(dataSource, JOIN_CONDITION_ANALYSIS);
Assert.assertFalse(bytes.isPresent());
}
@Test
public void testLoadMultipleIndexedTableOverwrite() throws IOException, SegmentLoadingException
{
final DataSource dataSource = new GlobalTableDataSource(TABLE_NAME);
Assert.assertFalse(joinableFactory.isDirectlyJoinable(dataSource));
// larger interval overwrites smaller interval
final String version = DateTimes.nowUtc().toString();
final String version2 = DateTimes.nowUtc().plus(1000L).toString();
final String interval = "2011-01-12T00:00:00.000Z/2011-03-28T00:00:00.000Z";
final String interval2 = "2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z";
IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.top");
IncrementalIndex data2 = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.bottom");
DataSegment segment1 = createSegment(data, interval, version);
DataSegment segment2 = createSegment(data2, interval2, version2);
Assert.assertTrue(segmentManager.loadSegment(segment1, false, SegmentLazyLoadFailCallback.NOOP));
Assert.assertTrue(segmentManager.loadSegment(segment2, false, SegmentLazyLoadFailCallback.NOOP));
Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource));
Optional<Joinable> maybeJoinable = makeJoinable(dataSource);
Assert.assertTrue(maybeJoinable.isPresent());
Joinable joinable = maybeJoinable.get();
// cardinality currently tied to number of rows,
Assert.assertEquals(733, joinable.getCardinality("market"));
Assert.assertEquals(733, joinable.getCardinality("placement"));
Assert.assertEquals(
Optional.of(ImmutableSet.of("preferred")),
joinable.getCorrelatedColumnValues(
"market",
"spot",
"placement",
Long.MAX_VALUE,
false
)
);
Optional<byte[]> cacheKey = joinableFactory.computeJoinCacheKey(dataSource, JOIN_CONDITION_ANALYSIS);
Assert.assertTrue(cacheKey.isPresent());
assertSegmentIdEquals(segment2.getId(), cacheKey.get());
segmentManager.dropSegment(segment2);
// if new segment is dropped for some reason that probably never happens, old table should still exist..
maybeJoinable = makeJoinable(dataSource);
Assert.assertTrue(maybeJoinable.isPresent());
joinable = maybeJoinable.get();
// cardinality currently tied to number of rows,
Assert.assertEquals(478, joinable.getCardinality("market"));
Assert.assertEquals(478, joinable.getCardinality("placement"));
Assert.assertEquals(
Optional.of(ImmutableSet.of("preferred")),
joinable.getCorrelatedColumnValues(
"market",
"spot",
"placement",
Long.MAX_VALUE,
false
)
);
cacheKey = joinableFactory.computeJoinCacheKey(dataSource, JOIN_CONDITION_ANALYSIS);
Assert.assertTrue(cacheKey.isPresent());
assertSegmentIdEquals(segment1.getId(), cacheKey.get());
}
@Test
public void testLoadMultipleIndexedTable() throws IOException, SegmentLoadingException
{
final DataSource dataSource = new GlobalTableDataSource(TABLE_NAME);
Assert.assertFalse(joinableFactory.isDirectlyJoinable(dataSource));
final String version = DateTimes.nowUtc().toString();
final String version2 = DateTimes.nowUtc().plus(1000L).toString();
final String interval = "2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z";
final String interval2 = "2011-01-12T00:00:00.000Z/2011-03-28T00:00:00.000Z";
IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.bottom");
IncrementalIndex data2 = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.top");
Assert.assertTrue(segmentManager.loadSegment(createSegment(data, interval, version), false, SegmentLazyLoadFailCallback.NOOP));
Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource));
Optional<Joinable> maybeJoinable = makeJoinable(dataSource);
Assert.assertTrue(maybeJoinable.isPresent());
Joinable joinable = maybeJoinable.get();
// cardinality currently tied to number of rows,
Assert.assertEquals(733, joinable.getCardinality("market"));
Assert.assertEquals(733, joinable.getCardinality("placement"));
Assert.assertEquals(
Optional.of(ImmutableSet.of("preferred")),
joinable.getCorrelatedColumnValues(
"market",
"spot",
"placement",
Long.MAX_VALUE,
false
)
);
// add another segment with smaller interval, only partially overshadows so there will be 2 segments in timeline
Assert.assertTrue(segmentManager.loadSegment(createSegment(data2, interval2, version2), false, SegmentLazyLoadFailCallback.NOOP));
expectedException.expect(ISE.class);
expectedException.expectMessage(
StringUtils.format(
"Currently only single segment datasources are supported for broadcast joins, dataSource[%s] has multiple segments. Reingest the data so that it is entirely contained within a single segment to use in JOIN queries.",
TABLE_NAME
)
);
// this will explode because datasource has multiple segments which is an invalid state for the joinable factory
makeJoinable(dataSource);
}
@Test
public void emptyCacheKeyForUnsupportedCondition()
{
final DataSource dataSource = new GlobalTableDataSource(TABLE_NAME);
JoinConditionAnalysis condition = EasyMock.mock(JoinConditionAnalysis.class);
EasyMock.expect(condition.canHashJoin()).andReturn(false);
EasyMock.replay(condition);
Assert.assertNull(joinableFactory.build(dataSource, condition).orElse(null));
}
private Optional<Joinable> makeJoinable(DataSource dataSource)
{
return joinableFactory.build(dataSource, JOIN_CONDITION_ANALYSIS);
}
private DataSegment createSegment(IncrementalIndex data, String interval, String version) throws IOException
{
final DataSegment tmpSegment = new DataSegment(
TABLE_NAME,
Intervals.of(interval),
version,
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
new NumberedShardSpec(0, 0),
9,
100
);
final String storageDir = DataSegmentPusher.getDefaultStorageDir(tmpSegment, false);
final File segmentDir = new File(segmentDeepStorageDir, storageDir);
org.apache.commons.io.FileUtils.forceMkdir(segmentDir);
IndexMerger indexMerger =
new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
SegmentizerFactory factory = new BroadcastJoinableMMappedQueryableSegmentizerFactory(indexIO, KEY_COLUMNS);
indexMerger.persist(
data,
Intervals.of(interval),
segmentDir,
new IndexSpec(
null,
null,
null,
null,
factory
),
null
);
final File factoryJson = new File(segmentDir, "factory.json");
objectMapper.writeValue(factoryJson, factory);
return tmpSegment.withLoadSpec(
ImmutableMap.of("type", "local", "path", segmentDir.getAbsolutePath())
);
}
private void assertSegmentIdEquals(SegmentId id, byte[] bytes)
{
// Call byteBuffer.get to skip the type keys
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
byteBuffer.get(); // skip the cache key prefix
byteBuffer.get();
long start = byteBuffer.getLong();
byteBuffer.get();
long end = byteBuffer.getLong();
byteBuffer.get();
String version = StringUtils.fromUtf8(byteBuffer, StringUtils.estimatedBinaryLengthAsUTF8(id.getVersion()));
byteBuffer.get();
String dataSource = StringUtils.fromUtf8(byteBuffer, StringUtils.estimatedBinaryLengthAsUTF8(id.getDataSource()));
byteBuffer.get();
int partition = byteBuffer.getInt();
Assert.assertEquals(id, SegmentId.of(dataSource, Intervals.utc(start, end), version, partition));
}
}