global table only if joinable (#10041)
* global table if only joinable
* oops
* fix style, add more tests
* Update sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
* better information schema columns, distinguish broadcast from joinable
* fix javadoc
* fix mistake
Co-authored-by: Jihoon Son <jihoonson@apache.org>
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
index 971f8ed..23dc65f 100644
--- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
@@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Injector;
import com.google.inject.Module;
@@ -64,6 +65,7 @@
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.initialization.ServerConfig;
@@ -377,6 +379,7 @@
baseClient,
null /* local client; unused in this test, so pass in null */,
warehouse,
+ new MapJoinableFactory(ImmutableMap.of()),
retryConfig,
jsonMapper,
serverConfig,
diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java
index c3edd1c..d12a8ee 100644
--- a/processing/src/main/java/org/apache/druid/query/DataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/DataSource.java
@@ -71,6 +71,15 @@
/**
* Returns true if all servers have a full copy of this datasource. True for things like inline, lookup, etc, or
* for queries of those.
+ *
+ * Currently this is coupled with joinability - if this returns true then the query engine expects there exists a
+ * {@link org.apache.druid.segment.join.JoinableFactory} which might build a
+ * {@link org.apache.druid.segment.join.Joinable} for this datasource directly. If a subquery 'inline' join is
+ * required to join this datasource on the right hand side, then this value must be false for now.
+ *
+ * In the future, instead of directly using this method, the query planner and engine should consider
+ * {@link org.apache.druid.segment.join.JoinableFactory#isDirectlyJoinable(DataSource)} when determining if the
+ * right hand side is directly joinable, which would allow decoupling this property from joins.
*/
boolean isGlobal();
diff --git a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
index 4237e50..5b34f76 100644
--- a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
+++ b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
@@ -205,7 +205,7 @@
/**
* Returns true if all servers have the ability to compute this datasource. These datasources depend only on
- * globally broadcast data, like lookups or inline data.
+ * globally broadcast data, like lookups or inline data or broadcast segments.
*/
public boolean isGlobal()
{
diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java
index fc63f1c..723aba5 100644
--- a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java
@@ -31,6 +31,13 @@
public interface JoinableFactory
{
/**
+ * Returns true if a {@link Joinable} **may** be created for a given {@link DataSource}, but is not a guarantee that
+ * {@link #build} will return a non-empty result. Successfully building a {@link Joinable} might require specific
+ * criteria of the {@link JoinConditionAnalysis}.
+ */
+ boolean isDirectlyJoinable(DataSource dataSource);
+
+ /**
* Create a Joinable object. This may be an expensive operation involving loading data, creating a hash table, etc.
*
* @param dataSource the datasource to join on
diff --git a/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java
index beb8106..abf4b6a 100644
--- a/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java
@@ -44,6 +44,17 @@
}
@Override
+ public boolean isDirectlyJoinable(DataSource dataSource)
+ {
+ JoinableFactory factory = joinableFactories.get(dataSource.getClass());
+ if (factory == null) {
+ return false;
+ } else {
+ return factory.isDirectlyJoinable(dataSource);
+ }
+ }
+
+ @Override
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
{
JoinableFactory factory = joinableFactories.get(dataSource.getClass());
diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
index ae36d17..f7c4f4b 100644
--- a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
@@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.DataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.extraction.MapLookupExtractor;
@@ -155,13 +156,24 @@
final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
ImmutableList.of(clause),
- (dataSource, condition) -> {
- if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) {
- return Optional.of(
- LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false))
- );
- } else {
- return Optional.empty();
+ new JoinableFactory()
+ {
+ @Override
+ public boolean isDirectlyJoinable(DataSource dataSource)
+ {
+ return dataSource.equals(lookupDataSource);
+ }
+
+ @Override
+ public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
+ {
+ if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) {
+ return Optional.of(
+ LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false))
+ );
+ } else {
+ return Optional.empty();
+ }
}
},
new AtomicLong(),
diff --git a/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java
index cf03360..1d00e71 100644
--- a/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java
@@ -65,6 +65,8 @@
target = new MapJoinableFactory(
ImmutableMap.of(NoopDataSource.class, noopJoinableFactory));
}
+
+
@Test
public void testBuildDataSourceNotRegisteredShouldReturnAbsent()
{
@@ -89,4 +91,18 @@
Optional<Joinable> joinable = target.build(noopDataSource, condition);
Assert.assertEquals(mockJoinable, joinable.get());
}
+
+ @Test
+ public void testIsDirectShouldBeFalseForNotRegistered()
+ {
+ Assert.assertFalse(target.isDirectlyJoinable(inlineDataSource));
+ }
+
+ @Test
+ public void testIsDirectlyJoinableShouldBeTrueForRegisteredThatIsJoinable()
+ {
+ EasyMock.expect(noopJoinableFactory.isDirectlyJoinable(noopDataSource)).andReturn(true).anyTimes();
+ EasyMock.replay(noopJoinableFactory);
+ Assert.assertTrue(target.isDirectlyJoinable(noopDataSource));
+ }
}
diff --git a/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java b/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java
index ff13804..1583b02 100644
--- a/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java
+++ b/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java
@@ -33,6 +33,12 @@
}
@Override
+ public boolean isDirectlyJoinable(DataSource dataSource)
+ {
+ return false;
+ }
+
+ @Override
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
{
return Optional.empty();
diff --git a/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java b/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java
index 5945d42..4eee53f 100644
--- a/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java
@@ -37,6 +37,15 @@
public class InlineJoinableFactory implements JoinableFactory
{
@Override
+ public boolean isDirectlyJoinable(DataSource dataSource)
+ {
+ // this should always be true if this is access through MapJoinableFactory, but check just in case...
+ // further, this should not ever be legitimately called, because this method is used to avoid subquery joins
+ // which use the InlineJoinableFactory
+ return dataSource instanceof InlineDataSource;
+ }
+
+ @Override
public Optional<Joinable> build(final DataSource dataSource, final JoinConditionAnalysis condition)
{
final InlineDataSource inlineDataSource = (InlineDataSource) dataSource;
diff --git a/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java b/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java
index a6fd209..2dab0a6 100644
--- a/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java
@@ -43,6 +43,13 @@
}
@Override
+ public boolean isDirectlyJoinable(DataSource dataSource)
+ {
+ // this should always be true if this is access through MapJoinableFactory, but check just in case...
+ return dataSource instanceof LookupDataSource;
+ }
+
+ @Override
public Optional<Joinable> build(final DataSource dataSource, final JoinConditionAnalysis condition)
{
final LookupDataSource lookupDataSource = (LookupDataSource) dataSource;
diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
index fa35ff7..c7318a6 100644
--- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
@@ -32,6 +32,7 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FluentQueryRunnerBuilder;
+import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.PostProcessingOperator;
import org.apache.druid.query.Query;
@@ -47,9 +48,11 @@
import org.apache.druid.query.RetryQueryRunner;
import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
import org.joda.time.Interval;
@@ -77,6 +80,7 @@
private final QuerySegmentWalker clusterClient;
private final QuerySegmentWalker localClient;
private final QueryToolChestWarehouse warehouse;
+ private final JoinableFactory joinableFactory;
private final RetryQueryRunnerConfig retryConfig;
private final ObjectMapper objectMapper;
private final ServerConfig serverConfig;
@@ -88,6 +92,7 @@
QuerySegmentWalker clusterClient,
QuerySegmentWalker localClient,
QueryToolChestWarehouse warehouse,
+ JoinableFactory joinableFactory,
RetryQueryRunnerConfig retryConfig,
ObjectMapper objectMapper,
ServerConfig serverConfig,
@@ -99,6 +104,7 @@
this.clusterClient = clusterClient;
this.localClient = localClient;
this.warehouse = warehouse;
+ this.joinableFactory = joinableFactory;
this.retryConfig = retryConfig;
this.objectMapper = objectMapper;
this.serverConfig = serverConfig;
@@ -112,6 +118,7 @@
CachingClusteredClient clusterClient,
LocalQuerySegmentWalker localClient,
QueryToolChestWarehouse warehouse,
+ JoinableFactory joinableFactory,
RetryQueryRunnerConfig retryConfig,
ObjectMapper objectMapper,
ServerConfig serverConfig,
@@ -124,6 +131,7 @@
(QuerySegmentWalker) clusterClient,
(QuerySegmentWalker) localClient,
warehouse,
+ joinableFactory,
retryConfig,
objectMapper,
serverConfig,
@@ -137,10 +145,13 @@
{
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
- // First, do an inlining dry run to see if any inlining is necessary, without actually running the queries.
+ // transform TableDataSource to GlobalTableDataSource when eligible
+ // before further transformation to potentially inline
+ final DataSource freeTradeDataSource = globalizeIfPossible(query.getDataSource());
+ // do an inlining dry run to see if any inlining is necessary, without actually running the queries.
final int maxSubqueryRows = QueryContexts.getMaxSubqueryRows(query, serverConfig.getMaxSubqueryRows());
final DataSource inlineDryRun = inlineIfNecessary(
- query.getDataSource(),
+ freeTradeDataSource,
toolChest,
new AtomicInteger(),
maxSubqueryRows,
@@ -156,7 +167,7 @@
// Now that we know the structure is workable, actually do the inlining (if necessary).
final Query<T> newQuery = query.withDataSource(
inlineIfNecessary(
- query.getDataSource(),
+ freeTradeDataSource,
toolChest,
new AtomicInteger(),
maxSubqueryRows,
@@ -187,10 +198,15 @@
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
- // Inlining isn't done for segments-based queries.
+ // Inlining isn't done for segments-based queries, but we still globalify the table datasources if possible
+ final Query<T> freeTradeQuery = query.withDataSource(globalizeIfPossible(query.getDataSource()));
if (canRunQueryUsingClusterWalker(query)) {
- return decorateClusterRunner(query, clusterClient.getQueryRunnerForSegments(query, specs));
+ return new QuerySwappingQueryRunner<>(
+ decorateClusterRunner(freeTradeQuery, clusterClient.getQueryRunnerForSegments(freeTradeQuery, specs)),
+ query,
+ freeTradeQuery
+ );
} else {
// We don't expect end-users to see this message, since it only happens when specific segments are requested;
// this is not typical end-user behavior.
@@ -235,6 +251,27 @@
|| toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery()));
}
+
+ private DataSource globalizeIfPossible(
+ final DataSource dataSource
+ )
+ {
+ if (dataSource instanceof TableDataSource) {
+ GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(((TableDataSource) dataSource).getName());
+ if (joinableFactory.isDirectlyJoinable(maybeGlobal)) {
+ return maybeGlobal;
+ }
+ return dataSource;
+ } else {
+ List<DataSource> currentChildren = dataSource.getChildren();
+ List<DataSource> newChildren = new ArrayList<>(currentChildren.size());
+ for (DataSource child : currentChildren) {
+ newChildren.add(globalizeIfPossible(child));
+ }
+ return dataSource.withChildren(newChildren);
+ }
+ }
+
/**
* Replace QueryDataSources with InlineDataSources when necessary and possible. "Necessary" is defined as:
*
diff --git a/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java b/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java
index d1be698..2a5bf3e 100644
--- a/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java
@@ -80,6 +80,13 @@
Assert.assertEquals(3, joinable.getCardinality("long"));
}
+ @Test
+ public void testIsDirectlyJoinable()
+ {
+ Assert.assertTrue(factory.isDirectlyJoinable(inlineDataSource));
+ Assert.assertFalse(factory.isDirectlyJoinable(new TableDataSource("foo")));
+ }
+
private static JoinConditionAnalysis makeCondition(final String condition)
{
return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil());
diff --git a/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java b/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java
index 44ed4b2..6e0e737 100644
--- a/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java
@@ -125,6 +125,13 @@
Assert.assertEquals(Joinable.CARDINALITY_UNKNOWN, joinable.getCardinality("v"));
}
+ @Test
+ public void testIsDirectlyJoinable()
+ {
+ Assert.assertTrue(factory.isDirectlyJoinable(lookupDataSource));
+ Assert.assertFalse(factory.isDirectlyJoinable(new TableDataSource("foo")));
+ }
+
private static JoinConditionAnalysis makeCondition(final String condition)
{
return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil());
diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
index f224b5f..565fee9 100644
--- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
+++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
@@ -33,6 +33,7 @@
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Druids;
+import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
@@ -70,7 +71,9 @@
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.InlineJoinableFactory;
+import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinType;
+import org.apache.druid.segment.join.Joinable;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
@@ -96,6 +99,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
/**
* Tests ClientQuerySegmentWalker.
@@ -112,6 +116,7 @@
private static final String FOO = "foo";
private static final String BAR = "bar";
private static final String MULTI = "multi";
+ private static final String GLOBAL = "broadcast";
private static final Interval INTERVAL = Intervals.of("2000/P1Y");
private static final String VERSION = "A";
@@ -219,6 +224,40 @@
}
@Test
+ public void testTimeseriesOnAutomaticGlobalTable()
+ {
+ final TimeseriesQuery query =
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(GLOBAL)
+ .granularity(Granularities.ALL)
+ .intervals(Collections.singletonList(INTERVAL))
+ .aggregators(new LongSumAggregatorFactory("sum", "n"))
+ .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false))
+ .build();
+
+ // expect global/joinable datasource to be automatically translated into a GlobalTableDataSource
+ final TimeseriesQuery expectedClusterQuery =
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(new GlobalTableDataSource(GLOBAL))
+ .granularity(Granularities.ALL)
+ .intervals(Collections.singletonList(INTERVAL))
+ .aggregators(new LongSumAggregatorFactory("sum", "n"))
+ .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false))
+ .build();
+
+ testQuery(
+ query,
+ ImmutableList.of(ExpectedQuery.cluster(expectedClusterQuery)),
+ ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L})
+ );
+
+ Assert.assertEquals(1, scheduler.getTotalRun().get());
+ Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(1, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(1, scheduler.getTotalReleased().get());
+ }
+
+ @Test
public void testTimeseriesOnInline()
{
final TimeseriesQuery query =
@@ -606,6 +645,20 @@
final JoinableFactory joinableFactory = new MapJoinableFactory(
ImmutableMap.<Class<? extends DataSource>, JoinableFactory>builder()
.put(InlineDataSource.class, new InlineJoinableFactory())
+ .put(GlobalTableDataSource.class, new JoinableFactory()
+ {
+ @Override
+ public boolean isDirectlyJoinable(DataSource dataSource)
+ {
+ return ((GlobalTableDataSource) dataSource).getName().equals(GLOBAL);
+ }
+
+ @Override
+ public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
+ {
+ return Optional.empty();
+ }
+ })
.build()
);
@@ -651,7 +704,8 @@
ImmutableMap.of(
FOO, makeTimeline(FOO, FOO_INLINE),
BAR, makeTimeline(BAR, BAR_INLINE),
- MULTI, makeTimeline(MULTI, MULTI_VALUE_INLINE)
+ MULTI, makeTimeline(MULTI, MULTI_VALUE_INLINE),
+ GLOBAL, makeTimeline(GLOBAL, FOO_INLINE)
),
joinableFactory,
conglomerate,
@@ -669,6 +723,7 @@
ClusterOrLocal.LOCAL
),
conglomerate,
+ joinableFactory,
serverConfig
);
}
diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
index 1370ed7..445d8d6 100644
--- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java
+++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
@@ -95,6 +95,7 @@
final QuerySegmentWalker clusterWalker,
final QuerySegmentWalker localWalker,
final QueryRunnerFactoryConglomerate conglomerate,
+ final JoinableFactory joinableFactory,
final ServerConfig serverConfig
)
{
@@ -110,6 +111,7 @@
return conglomerate.findFactory(query).getToolchest();
}
},
+ joinableFactory,
new RetryQueryRunnerConfig(),
TestHelper.makeJsonMapper(),
serverConfig,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java
index 1655d73..6faf219 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java
@@ -337,6 +337,8 @@
private static boolean computeRightRequiresSubquery(final DruidRel<?> right)
{
// Right requires a subquery unless it's a scan or mapping on top of a global datasource.
+ // ideally this would involve JoinableFactory.isDirectlyJoinable to check that the global datasources
+ // are in fact possibly joinable, but for now isGlobal is coupled to joinability
return !(DruidRels.isScanOrMapping(right, false)
&& DruidRels.dataSourceIfLeafRel(right).filter(DataSource::isGlobal).isPresent());
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 6762aaf..b264749 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -56,6 +56,7 @@
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.DruidServerMetadata;
@@ -104,6 +105,7 @@
private final PlannerConfig config;
private final SegmentManager segmentManager;
private final ViewManager viewManager;
+ private final JoinableFactory joinableFactory;
private final ExecutorService cacheExec;
private final ConcurrentMap<String, DruidTable> tables;
@@ -148,6 +150,7 @@
final QueryLifecycleFactory queryLifecycleFactory,
final TimelineServerView serverView,
final SegmentManager segmentManager,
+ final JoinableFactory joinableFactory,
final PlannerConfig config,
final ViewManager viewManager,
final Escalator escalator
@@ -156,6 +159,7 @@
this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory");
Preconditions.checkNotNull(serverView, "serverView");
this.segmentManager = segmentManager;
+ this.joinableFactory = joinableFactory;
this.config = Preconditions.checkNotNull(config, "config");
this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager");
this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d");
@@ -278,10 +282,11 @@
for (String dataSource : dataSourcesToRebuild) {
final DruidTable druidTable = buildDruidTable(dataSource);
final DruidTable oldTable = tables.put(dataSource, druidTable);
+ final String description = druidTable.getDataSource().isGlobal() ? "global dataSource" : "dataSource";
if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
- log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature());
+ log.info("%s [%s] has new signature: %s.", description, dataSource, druidTable.getRowSignature());
} else {
- log.debug("dataSource [%s] signature is unchanged.", dataSource);
+ log.debug("%s [%s] signature is unchanged.", description, dataSource);
}
}
@@ -627,12 +632,21 @@
columnTypes.forEach(builder::add);
final TableDataSource tableDataSource;
- if (segmentManager.getDataSourceNames().contains(dataSource)) {
- tableDataSource = new GlobalTableDataSource(dataSource);
+
+ // to be a GlobalTableDataSource instead of a TableDataSource, it must appear on all servers (inferred by existing
+ // in the segment cache, which in this case belongs to the broker meaning only broadcast segments live here)
+ // to be joinable, it must be possibly joinable according to the factory. we only consider broadcast datasources
+ // at this time, and isGlobal is currently strongly coupled with joinable, so only make a global table datasource
+ // if also joinable
+ final GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(dataSource);
+ final boolean isJoinable = joinableFactory.isDirectlyJoinable(maybeGlobal);
+ final boolean isBroadcast = segmentManager.getDataSourceNames().contains(dataSource);
+ if (isBroadcast && isJoinable) {
+ tableDataSource = maybeGlobal;
} else {
tableDataSource = new TableDataSource(dataSource);
}
- return new DruidTable(tableDataSource, builder.build());
+ return new DruidTable(tableDataSource, builder.build(), isJoinable, isBroadcast);
}
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java
index bf84ea1..8ee93a2 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java
@@ -53,6 +53,7 @@
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.table.RowSignatures;
import javax.annotation.Nullable;
@@ -83,6 +84,8 @@
.add("TABLE_SCHEMA", ValueType.STRING)
.add("TABLE_NAME", ValueType.STRING)
.add("TABLE_TYPE", ValueType.STRING)
+ .add("IS_JOINABLE", ValueType.STRING)
+ .add("IS_BROADCAST", ValueType.STRING)
.build();
private static final RowSignature COLUMNS_SIGNATURE = RowSignature
.builder()
@@ -109,6 +112,9 @@
return Collections.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(datasourceName));
};
+ private static final String INFO_TRUE = "YES";
+ private static final String INFO_FALSE = "NO";
+
private final SchemaPlus rootSchema;
private final Map<String, Table> tableMap;
private final AuthorizerMapper authorizerMapper;
@@ -217,18 +223,27 @@
return Iterables.filter(
Iterables.concat(
FluentIterable.from(authorizedTableNames).transform(
- new Function<String, Object[]>()
- {
- @Override
- public Object[] apply(final String tableName)
- {
- return new Object[]{
- CATALOG_NAME, // TABLE_CATALOG
- schemaName, // TABLE_SCHEMA
- tableName, // TABLE_NAME
- subSchema.getTable(tableName).getJdbcTableType().toString() // TABLE_TYPE
- };
+ tableName -> {
+ final Table table = subSchema.getTable(tableName);
+ final boolean isJoinable;
+ final boolean isBroadcast;
+ if (table instanceof DruidTable) {
+ DruidTable druidTable = (DruidTable) table;
+ isJoinable = druidTable.isJoinable();
+ isBroadcast = druidTable.isBroadcast();
+ } else {
+ isJoinable = false;
+ isBroadcast = false;
}
+
+ return new Object[]{
+ CATALOG_NAME, // TABLE_CATALOG
+ schemaName, // TABLE_SCHEMA
+ tableName, // TABLE_NAME
+ table.getJdbcTableType().toString(), // TABLE_TYPE
+ isJoinable ? INFO_TRUE : INFO_FALSE, // IS_JOINABLE
+ isBroadcast ? INFO_TRUE : INFO_FALSE // IS_BROADCAST
+ };
}
),
FluentIterable.from(authorizedFunctionNames).transform(
@@ -242,7 +257,9 @@
CATALOG_NAME, // TABLE_CATALOG
schemaName, // TABLE_SCHEMA
functionName, // TABLE_NAME
- "VIEW" // TABLE_TYPE
+ "VIEW", // TABLE_TYPE
+ INFO_FALSE, // IS_JOINABLE
+ INFO_FALSE // IS_BROADCAST
};
} else {
return null;
@@ -406,7 +423,7 @@
field.getName(), // COLUMN_NAME
String.valueOf(field.getIndex()), // ORDINAL_POSITION
"", // COLUMN_DEFAULT
- type.isNullable() ? "YES" : "NO", // IS_NULLABLE
+ type.isNullable() ? INFO_TRUE : INFO_FALSE, // IS_NULLABLE
type.getSqlTypeName().toString(), // DATA_TYPE
null, // CHARACTER_MAXIMUM_LENGTH
null, // CHARACTER_OCTET_LENGTH
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java
index b3f3314..6ddeaab 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java
@@ -57,7 +57,9 @@
final ImmutableMap.Builder<String, Table> tableMapBuilder = ImmutableMap.builder();
for (final String lookupName : lookupProvider.getAllLookupNames()) {
- tableMapBuilder.put(lookupName, new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE));
+ // all lookups should be also joinable through lookup joinable factory, and lookups are effectively broadcast
+ // (if we ignore lookup tiers...)
+ tableMapBuilder.put(lookupName, new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE, true, true));
}
return tableMapBuilder.build();
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java
index 521a051..94da5ed 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java
@@ -41,14 +41,20 @@
{
private final DataSource dataSource;
private final RowSignature rowSignature;
+ private final boolean joinable;
+ private final boolean broadcast;
public DruidTable(
final DataSource dataSource,
- final RowSignature rowSignature
+ final RowSignature rowSignature,
+ final boolean isJoinable,
+ final boolean isBroadcast
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature");
+ this.joinable = isJoinable;
+ this.broadcast = isBroadcast;
}
public DataSource getDataSource()
@@ -61,6 +67,16 @@
return rowSignature;
}
+ public boolean isJoinable()
+ {
+ return joinable;
+ }
+
+ public boolean isBroadcast()
+ {
+ return broadcast;
+ }
+
@Override
public Schema.TableType getJdbcTableType()
{
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 1ad92cb..62c1519 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -708,59 +708,59 @@
public void testInformationSchemaTables() throws Exception
{
testQuery(
- "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE\n"
+ "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, IS_JOINABLE, IS_BROADCAST\n"
+ "FROM INFORMATION_SCHEMA.TABLES\n"
+ "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')",
ImmutableList.of(),
ImmutableList.<Object[]>builder()
- .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"})
- .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"})
- .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE"})
- .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE"})
- .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"})
- .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE"})
- .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE"})
- .add(new Object[]{"druid", "aview", "VIEW"})
- .add(new Object[]{"druid", "bview", "VIEW"})
- .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"})
- .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"})
- .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"})
- .add(new Object[]{"lookup", "lookyloo", "TABLE"})
- .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
- .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
- .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
- .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"})
- .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"})
+ .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", "aview", "VIEW", "NO", "NO"})
+ .add(new Object[]{"druid", "bview", "VIEW", "NO", "NO"})
+ .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"})
+ .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"})
+ .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "NO", "NO"})
+ .add(new Object[]{"lookup", "lookyloo", "TABLE", "YES", "YES"})
+ .add(new Object[]{"sys", "segments", "SYSTEM_TABLE", "NO", "NO"})
+ .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE", "NO", "NO"})
+ .add(new Object[]{"sys", "servers", "SYSTEM_TABLE", "NO", "NO"})
+ .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE", "NO", "NO"})
+ .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE", "NO", "NO"})
.build()
);
testQuery(
PLANNER_CONFIG_DEFAULT,
- "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE\n"
+ "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, IS_JOINABLE, IS_BROADCAST\n"
+ "FROM INFORMATION_SCHEMA.TABLES\n"
+ "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')",
CalciteTests.SUPER_USER_AUTH_RESULT,
ImmutableList.of(),
ImmutableList.<Object[]>builder()
- .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"})
- .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"})
- .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE"})
- .add(new Object[]{"druid", CalciteTests.FORBIDDEN_DATASOURCE, "TABLE"})
- .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE"})
- .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"})
- .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE"})
- .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE"})
- .add(new Object[]{"druid", "aview", "VIEW"})
- .add(new Object[]{"druid", "bview", "VIEW"})
- .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"})
- .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"})
- .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"})
- .add(new Object[]{"lookup", "lookyloo", "TABLE"})
- .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
- .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
- .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
- .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"})
- .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"})
+ .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.FORBIDDEN_DATASOURCE, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", "aview", "VIEW", "NO", "NO"})
+ .add(new Object[]{"druid", "bview", "VIEW", "NO", "NO"})
+ .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"})
+ .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"})
+ .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "NO", "NO"})
+ .add(new Object[]{"lookup", "lookyloo", "TABLE", "YES", "YES"})
+ .add(new Object[]{"sys", "segments", "SYSTEM_TABLE", "NO", "NO"})
+ .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE", "NO", "NO"})
+ .add(new Object[]{"sys", "servers", "SYSTEM_TABLE", "NO", "NO"})
+ .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE", "NO", "NO"})
+ .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE", "NO", "NO"})
.build()
);
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java
index 11c2b97..a5e8831 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.schema;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -38,6 +39,8 @@
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.lookup.LookupReferencesManager;
+import org.apache.druid.segment.join.JoinableFactory;
+import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.security.AuthorizerMapper;
@@ -102,6 +105,7 @@
binder -> {
binder.bind(QueryLifecycleFactory.class).toInstance(queryLifecycleFactory);
binder.bind(TimelineServerView.class).toInstance(serverView);
+ binder.bind(JoinableFactory.class).toInstance(new MapJoinableFactory(ImmutableMap.of()));
binder.bind(PlannerConfig.class).toInstance(plannerConfig);
binder.bind(ViewManager.class).toInstance(viewManager);
binder.bind(Escalator.class).toInstance(escalator);
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
index fd20fb5..0ba26ea 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
@@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.SegmentManager;
@@ -54,6 +55,7 @@
),
new TestServerInventoryView(Collections.emptyList()),
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
+ new MapJoinableFactory(ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
index 8455965..adb3626 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
@@ -33,6 +33,7 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.DataSource;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.TableDataSource;
@@ -43,6 +44,10 @@
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.join.JoinConditionAnalysis;
+import org.apache.druid.segment.join.Joinable;
+import org.apache.druid.segment.join.JoinableFactory;
+import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
@@ -77,6 +82,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -132,12 +138,15 @@
private SpecificSegmentsQuerySegmentWalker walker = null;
private DruidSchema schema = null;
private SegmentManager segmentManager;
- private Set<String> dataSourceNames;
+ private Set<String> segmentDataSourceNames;
+ private Set<String> joinableDataSourceNames;
@Before
public void setUp() throws Exception
{
- dataSourceNames = Sets.newConcurrentHashSet();
+ segmentDataSourceNames = Sets.newConcurrentHashSet();
+ joinableDataSourceNames = Sets.newConcurrentHashSet();
+
final File tmpDir = temporaryFolder.newFolder();
final QueryableIndex index1 = IndexBuilder.create()
.tmpDir(new File(tmpDir, "1"))
@@ -173,7 +182,7 @@
public Set<String> getDataSourceNames()
{
getDatasourcesLatch.countDown();
- return dataSourceNames;
+ return segmentDataSourceNames;
}
};
@@ -222,10 +231,30 @@
serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments);
druidServers = serverView.getDruidServers();
+ final JoinableFactory globalTableJoinable = new JoinableFactory()
+ {
+ @Override
+ public boolean isDirectlyJoinable(DataSource dataSource)
+ {
+ return dataSource instanceof GlobalTableDataSource &&
+ joinableDataSourceNames.contains(((GlobalTableDataSource) dataSource).getName());
+ }
+
+ @Override
+ public Optional<Joinable> build(
+ DataSource dataSource,
+ JoinConditionAnalysis condition
+ )
+ {
+ return Optional.empty();
+ }
+ };
+
schema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
segmentManager,
+ new MapJoinableFactory(ImmutableMap.of(GlobalTableDataSource.class, globalTableJoinable)),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()
@@ -461,12 +490,16 @@
}
@Test
- public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedException
+ public void testLocalSegmentCacheSetsDataSourceAsGlobalAndJoinable() throws InterruptedException
{
DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo");
Assert.assertNotNull(fooTable);
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
+ Assert.assertFalse(fooTable.isJoinable());
+ Assert.assertFalse(fooTable.isBroadcast());
+
+ buildTableLatch.await(1, TimeUnit.SECONDS);
final DataSegment someNewBrokerSegment = new DataSegment(
"foo",
@@ -481,12 +514,12 @@
100L,
PruneSpecsHolder.DEFAULT
);
- dataSourceNames.add("foo");
+ segmentDataSourceNames.add("foo");
+ joinableDataSourceNames.add("foo");
serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
- // wait for build
- buildTableLatch.await(1, TimeUnit.SECONDS);
- buildTableLatch = new CountDownLatch(1);
+ // wait for build twice
+ buildTableLatch = new CountDownLatch(2);
buildTableLatch.await(1, TimeUnit.SECONDS);
// wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
@@ -497,9 +530,12 @@
Assert.assertNotNull(fooTable);
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
Assert.assertTrue(fooTable.getDataSource() instanceof GlobalTableDataSource);
+ Assert.assertTrue(fooTable.isJoinable());
+ Assert.assertTrue(fooTable.isBroadcast());
// now remove it
- dataSourceNames.remove("foo");
+ joinableDataSourceNames.remove("foo");
+ segmentDataSourceNames.remove("foo");
serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER);
// wait for build
@@ -515,6 +551,74 @@
Assert.assertNotNull(fooTable);
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
+ Assert.assertFalse(fooTable.isJoinable());
+ Assert.assertFalse(fooTable.isBroadcast());
}
+ @Test
+ public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throws InterruptedException
+ {
+ DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo");
+ Assert.assertNotNull(fooTable);
+ Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
+ Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
+ Assert.assertFalse(fooTable.isJoinable());
+ Assert.assertFalse(fooTable.isBroadcast());
+
+ // wait for build twice
+ buildTableLatch.await(1, TimeUnit.SECONDS);
+
+ final DataSegment someNewBrokerSegment = new DataSegment(
+ "foo",
+ Intervals.of("2012/2013"),
+ "version1",
+ null,
+ ImmutableList.of("dim1", "dim2"),
+ ImmutableList.of("met1", "met2"),
+ new NumberedShardSpec(2, 3),
+ null,
+ 1,
+ 100L,
+ PruneSpecsHolder.DEFAULT
+ );
+ segmentDataSourceNames.add("foo");
+ serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
+
+ buildTableLatch = new CountDownLatch(2);
+ buildTableLatch.await(1, TimeUnit.SECONDS);
+
+ // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
+ getDatasourcesLatch = new CountDownLatch(1);
+ getDatasourcesLatch.await(1, TimeUnit.SECONDS);
+
+ fooTable = (DruidTable) schema.getTableMap().get("foo");
+ Assert.assertNotNull(fooTable);
+ Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
+ // should not be a GlobalTableDataSource for now, because isGlobal is couple with joinability. idealy this will be
+ // changed in the future and we should expect
+ Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
+ Assert.assertTrue(fooTable.isBroadcast());
+ Assert.assertFalse(fooTable.isJoinable());
+
+
+ // now remove it
+ segmentDataSourceNames.remove("foo");
+ serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER);
+
+ // wait for build
+ buildTableLatch.await(1, TimeUnit.SECONDS);
+ buildTableLatch = new CountDownLatch(1);
+ buildTableLatch.await(1, TimeUnit.SECONDS);
+
+ // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
+ getDatasourcesLatch = new CountDownLatch(1);
+ getDatasourcesLatch.await(1, TimeUnit.SECONDS);
+
+ fooTable = (DruidTable) schema.getTableMap().get("foo");
+ Assert.assertNotNull(fooTable);
+ Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
+ Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
+ Assert.assertFalse(fooTable.isBroadcast());
+ Assert.assertFalse(fooTable.isJoinable());
+ }
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 3e1356a..ace7aca 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -67,6 +67,7 @@
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.DruidNode;
@@ -242,6 +243,7 @@
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
new TestServerInventoryView(walker.getSegments(), realtimeSegments),
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
+ new MapJoinableFactory(ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index b7d56f5..ca683a2 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -73,6 +73,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.DruidNode;
@@ -979,6 +980,7 @@
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
new TestServerInventoryView(walker.getSegments()),
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
+ new MapJoinableFactory(ImmutableMap.of()),
plannerConfig,
viewManager,
TEST_AUTHENTICATOR_ESCALATOR
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
index 1da0fba..0490420 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -120,6 +120,7 @@
scheduler
),
conglomerate,
+ joinableFactoryToUse,
new ServerConfig()
);
}