global table only if joinable (#10041)

* global table if only joinable

* oops

* fix style, add more tests

* Update sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java

* better information schema columns, distinguish broadcast from joinable

* fix javadoc

* fix mistake

Co-authored-by: Jihoon Son <jihoonson@apache.org>
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
index 971f8ed..23dc65f 100644
--- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
@@ -23,6 +23,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.inject.Injector;
 import com.google.inject.Module;
@@ -64,6 +65,7 @@
 import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.server.ClientQuerySegmentWalker;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.initialization.ServerConfig;
@@ -377,6 +379,7 @@
         baseClient,
         null /* local client; unused in this test, so pass in null */,
         warehouse,
+        new MapJoinableFactory(ImmutableMap.of()),
         retryConfig,
         jsonMapper,
         serverConfig,
diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java
index c3edd1c..d12a8ee 100644
--- a/processing/src/main/java/org/apache/druid/query/DataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/DataSource.java
@@ -71,6 +71,15 @@
   /**
    * Returns true if all servers have a full copy of this datasource. True for things like inline, lookup, etc, or
    * for queries of those.
+   *
+   * Currently this is coupled with joinability - if this returns true then the query engine expects there exists a
+   * {@link org.apache.druid.segment.join.JoinableFactory} which might build a
+   * {@link org.apache.druid.segment.join.Joinable} for this datasource directly. If a subquery 'inline' join is
+   * required to join this datasource on the right hand side, then this value must be false for now.
+   *
+   * In the future, instead of directly using this method, the query planner and engine should consider
+   * {@link org.apache.druid.segment.join.JoinableFactory#isDirectlyJoinable(DataSource)} when determining if the
+   * right hand side is directly joinable, which would allow decoupling this property from joins.
    */
   boolean isGlobal();
 
diff --git a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
index 4237e50..5b34f76 100644
--- a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
+++ b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
@@ -205,7 +205,7 @@
 
   /**
    * Returns true if all servers have the ability to compute this datasource. These datasources depend only on
-   * globally broadcast data, like lookups or inline data.
+   * globally broadcast data, like lookups or inline data or broadcast segments.
    */
   public boolean isGlobal()
   {
diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java
index fc63f1c..723aba5 100644
--- a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java
@@ -31,6 +31,13 @@
 public interface JoinableFactory
 {
   /**
+   * Returns true if a {@link Joinable} **may** be created for a given {@link DataSource}, but is not a guarantee that
+   * {@link #build} will return a non-empty result. Successfully building a {@link Joinable} might require specific
+   * criteria of the {@link JoinConditionAnalysis}.
+   */
+  boolean isDirectlyJoinable(DataSource dataSource);
+
+  /**
    * Create a Joinable object. This may be an expensive operation involving loading data, creating a hash table, etc.
    *
    * @param dataSource the datasource to join on
diff --git a/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java
index beb8106..abf4b6a 100644
--- a/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java
@@ -44,6 +44,17 @@
   }
 
   @Override
+  public boolean isDirectlyJoinable(DataSource dataSource)
+  {
+    JoinableFactory factory = joinableFactories.get(dataSource.getClass());
+    if (factory == null) {
+      return false;
+    } else {
+      return factory.isDirectlyJoinable(dataSource);
+    }
+  }
+
+  @Override
   public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
   {
     JoinableFactory factory = joinableFactories.get(dataSource.getClass());
diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
index ae36d17..f7c4f4b 100644
--- a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.DataSource;
 import org.apache.druid.query.LookupDataSource;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.extraction.MapLookupExtractor;
@@ -155,13 +156,24 @@
 
     final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
         ImmutableList.of(clause),
-        (dataSource, condition) -> {
-          if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) {
-            return Optional.of(
-                LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false))
-            );
-          } else {
-            return Optional.empty();
+        new JoinableFactory()
+        {
+          @Override
+          public boolean isDirectlyJoinable(DataSource dataSource)
+          {
+            return dataSource.equals(lookupDataSource);
+          }
+
+          @Override
+          public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
+          {
+            if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) {
+              return Optional.of(
+                  LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false))
+              );
+            } else {
+              return Optional.empty();
+            }
           }
         },
         new AtomicLong(),
diff --git a/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java
index cf03360..1d00e71 100644
--- a/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java
@@ -65,6 +65,8 @@
     target = new MapJoinableFactory(
         ImmutableMap.of(NoopDataSource.class, noopJoinableFactory));
   }
+
+
   @Test
   public void testBuildDataSourceNotRegisteredShouldReturnAbsent()
   {
@@ -89,4 +91,18 @@
     Optional<Joinable> joinable = target.build(noopDataSource, condition);
     Assert.assertEquals(mockJoinable, joinable.get());
   }
+
+  @Test
+  public void testIsDirectShouldBeFalseForNotRegistered()
+  {
+    Assert.assertFalse(target.isDirectlyJoinable(inlineDataSource));
+  }
+
+  @Test
+  public void testIsDirectlyJoinableShouldBeTrueForRegisteredThatIsJoinable()
+  {
+    EasyMock.expect(noopJoinableFactory.isDirectlyJoinable(noopDataSource)).andReturn(true).anyTimes();
+    EasyMock.replay(noopJoinableFactory);
+    Assert.assertTrue(target.isDirectlyJoinable(noopDataSource));
+  }
 }
diff --git a/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java b/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java
index ff13804..1583b02 100644
--- a/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java
+++ b/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java
@@ -33,6 +33,12 @@
   }
 
   @Override
+  public boolean isDirectlyJoinable(DataSource dataSource)
+  {
+    return false;
+  }
+
+  @Override
   public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
   {
     return Optional.empty();
diff --git a/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java b/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java
index 5945d42..4eee53f 100644
--- a/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java
@@ -37,6 +37,15 @@
 public class InlineJoinableFactory implements JoinableFactory
 {
   @Override
+  public boolean isDirectlyJoinable(DataSource dataSource)
+  {
+    // this should always be true if this is access through MapJoinableFactory, but check just in case...
+    // further, this should not ever be legitimately called, because this method is used to avoid subquery joins
+    // which use the InlineJoinableFactory
+    return dataSource instanceof InlineDataSource;
+  }
+
+  @Override
   public Optional<Joinable> build(final DataSource dataSource, final JoinConditionAnalysis condition)
   {
     final InlineDataSource inlineDataSource = (InlineDataSource) dataSource;
diff --git a/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java b/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java
index a6fd209..2dab0a6 100644
--- a/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java
@@ -43,6 +43,13 @@
   }
 
   @Override
+  public boolean isDirectlyJoinable(DataSource dataSource)
+  {
+    // this should always be true if this is access through MapJoinableFactory, but check just in case...
+    return dataSource instanceof LookupDataSource;
+  }
+
+  @Override
   public Optional<Joinable> build(final DataSource dataSource, final JoinConditionAnalysis condition)
   {
     final LookupDataSource lookupDataSource = (LookupDataSource) dataSource;
diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
index fa35ff7..c7318a6 100644
--- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
@@ -32,6 +32,7 @@
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.DataSource;
 import org.apache.druid.query.FluentQueryRunnerBuilder;
+import org.apache.druid.query.GlobalTableDataSource;
 import org.apache.druid.query.InlineDataSource;
 import org.apache.druid.query.PostProcessingOperator;
 import org.apache.druid.query.Query;
@@ -47,9 +48,11 @@
 import org.apache.druid.query.RetryQueryRunner;
 import org.apache.druid.query.RetryQueryRunnerConfig;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.server.initialization.ServerConfig;
 import org.joda.time.Interval;
 
@@ -77,6 +80,7 @@
   private final QuerySegmentWalker clusterClient;
   private final QuerySegmentWalker localClient;
   private final QueryToolChestWarehouse warehouse;
+  private final JoinableFactory joinableFactory;
   private final RetryQueryRunnerConfig retryConfig;
   private final ObjectMapper objectMapper;
   private final ServerConfig serverConfig;
@@ -88,6 +92,7 @@
       QuerySegmentWalker clusterClient,
       QuerySegmentWalker localClient,
       QueryToolChestWarehouse warehouse,
+      JoinableFactory joinableFactory,
       RetryQueryRunnerConfig retryConfig,
       ObjectMapper objectMapper,
       ServerConfig serverConfig,
@@ -99,6 +104,7 @@
     this.clusterClient = clusterClient;
     this.localClient = localClient;
     this.warehouse = warehouse;
+    this.joinableFactory = joinableFactory;
     this.retryConfig = retryConfig;
     this.objectMapper = objectMapper;
     this.serverConfig = serverConfig;
@@ -112,6 +118,7 @@
       CachingClusteredClient clusterClient,
       LocalQuerySegmentWalker localClient,
       QueryToolChestWarehouse warehouse,
+      JoinableFactory joinableFactory,
       RetryQueryRunnerConfig retryConfig,
       ObjectMapper objectMapper,
       ServerConfig serverConfig,
@@ -124,6 +131,7 @@
         (QuerySegmentWalker) clusterClient,
         (QuerySegmentWalker) localClient,
         warehouse,
+        joinableFactory,
         retryConfig,
         objectMapper,
         serverConfig,
@@ -137,10 +145,13 @@
   {
     final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
 
-    // First, do an inlining dry run to see if any inlining is necessary, without actually running the queries.
+    // transform TableDataSource to GlobalTableDataSource when eligible
+    // before further transformation to potentially inline
+    final DataSource freeTradeDataSource = globalizeIfPossible(query.getDataSource());
+    // do an inlining dry run to see if any inlining is necessary, without actually running the queries.
     final int maxSubqueryRows = QueryContexts.getMaxSubqueryRows(query, serverConfig.getMaxSubqueryRows());
     final DataSource inlineDryRun = inlineIfNecessary(
-        query.getDataSource(),
+        freeTradeDataSource,
         toolChest,
         new AtomicInteger(),
         maxSubqueryRows,
@@ -156,7 +167,7 @@
     // Now that we know the structure is workable, actually do the inlining (if necessary).
     final Query<T> newQuery = query.withDataSource(
         inlineIfNecessary(
-            query.getDataSource(),
+            freeTradeDataSource,
             toolChest,
             new AtomicInteger(),
             maxSubqueryRows,
@@ -187,10 +198,15 @@
   @Override
   public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
   {
-    // Inlining isn't done for segments-based queries.
+    // Inlining isn't done for segments-based queries, but we still globalify the table datasources if possible
+    final Query<T> freeTradeQuery = query.withDataSource(globalizeIfPossible(query.getDataSource()));
 
     if (canRunQueryUsingClusterWalker(query)) {
-      return decorateClusterRunner(query, clusterClient.getQueryRunnerForSegments(query, specs));
+      return new QuerySwappingQueryRunner<>(
+          decorateClusterRunner(freeTradeQuery, clusterClient.getQueryRunnerForSegments(freeTradeQuery, specs)),
+          query,
+          freeTradeQuery
+      );
     } else {
       // We don't expect end-users to see this message, since it only happens when specific segments are requested;
       // this is not typical end-user behavior.
@@ -235,6 +251,27 @@
                || toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery()));
   }
 
+
+  private DataSource globalizeIfPossible(
+      final DataSource dataSource
+  )
+  {
+    if (dataSource instanceof TableDataSource) {
+      GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(((TableDataSource) dataSource).getName());
+      if (joinableFactory.isDirectlyJoinable(maybeGlobal)) {
+        return maybeGlobal;
+      }
+      return dataSource;
+    } else {
+      List<DataSource> currentChildren = dataSource.getChildren();
+      List<DataSource> newChildren = new ArrayList<>(currentChildren.size());
+      for (DataSource child : currentChildren) {
+        newChildren.add(globalizeIfPossible(child));
+      }
+      return dataSource.withChildren(newChildren);
+    }
+  }
+
   /**
    * Replace QueryDataSources with InlineDataSources when necessary and possible. "Necessary" is defined as:
    *
diff --git a/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java b/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java
index d1be698..2a5bf3e 100644
--- a/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java
@@ -80,6 +80,13 @@
     Assert.assertEquals(3, joinable.getCardinality("long"));
   }
 
+  @Test
+  public void testIsDirectlyJoinable()
+  {
+    Assert.assertTrue(factory.isDirectlyJoinable(inlineDataSource));
+    Assert.assertFalse(factory.isDirectlyJoinable(new TableDataSource("foo")));
+  }
+
   private static JoinConditionAnalysis makeCondition(final String condition)
   {
     return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil());
diff --git a/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java b/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java
index 44ed4b2..6e0e737 100644
--- a/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java
@@ -125,6 +125,13 @@
     Assert.assertEquals(Joinable.CARDINALITY_UNKNOWN, joinable.getCardinality("v"));
   }
 
+  @Test
+  public void testIsDirectlyJoinable()
+  {
+    Assert.assertTrue(factory.isDirectlyJoinable(lookupDataSource));
+    Assert.assertFalse(factory.isDirectlyJoinable(new TableDataSource("foo")));
+  }
+
   private static JoinConditionAnalysis makeCondition(final String condition)
   {
     return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil());
diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
index f224b5f..565fee9 100644
--- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
+++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
@@ -33,6 +33,7 @@
 import org.apache.druid.query.BaseQuery;
 import org.apache.druid.query.DataSource;
 import org.apache.druid.query.Druids;
+import org.apache.druid.query.GlobalTableDataSource;
 import org.apache.druid.query.InlineDataSource;
 import org.apache.druid.query.JoinDataSource;
 import org.apache.druid.query.Query;
@@ -70,7 +71,9 @@
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.join.InlineJoinableFactory;
+import org.apache.druid.segment.join.JoinConditionAnalysis;
 import org.apache.druid.segment.join.JoinType;
+import org.apache.druid.segment.join.Joinable;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.server.initialization.ServerConfig;
@@ -96,6 +99,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * Tests ClientQuerySegmentWalker.
@@ -112,6 +116,7 @@
   private static final String FOO = "foo";
   private static final String BAR = "bar";
   private static final String MULTI = "multi";
+  private static final String GLOBAL = "broadcast";
 
   private static final Interval INTERVAL = Intervals.of("2000/P1Y");
   private static final String VERSION = "A";
@@ -219,6 +224,40 @@
   }
 
   @Test
+  public void testTimeseriesOnAutomaticGlobalTable()
+  {
+    final TimeseriesQuery query =
+        Druids.newTimeseriesQueryBuilder()
+              .dataSource(GLOBAL)
+              .granularity(Granularities.ALL)
+              .intervals(Collections.singletonList(INTERVAL))
+              .aggregators(new LongSumAggregatorFactory("sum", "n"))
+              .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false))
+              .build();
+
+    // expect global/joinable datasource to be automatically translated into a GlobalTableDataSource
+    final TimeseriesQuery expectedClusterQuery =
+        Druids.newTimeseriesQueryBuilder()
+              .dataSource(new GlobalTableDataSource(GLOBAL))
+              .granularity(Granularities.ALL)
+              .intervals(Collections.singletonList(INTERVAL))
+              .aggregators(new LongSumAggregatorFactory("sum", "n"))
+              .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false))
+              .build();
+
+    testQuery(
+        query,
+        ImmutableList.of(ExpectedQuery.cluster(expectedClusterQuery)),
+        ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L})
+    );
+
+    Assert.assertEquals(1, scheduler.getTotalRun().get());
+    Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
+    Assert.assertEquals(1, scheduler.getTotalAcquired().get());
+    Assert.assertEquals(1, scheduler.getTotalReleased().get());
+  }
+
+  @Test
   public void testTimeseriesOnInline()
   {
     final TimeseriesQuery query =
@@ -606,6 +645,20 @@
     final JoinableFactory joinableFactory = new MapJoinableFactory(
         ImmutableMap.<Class<? extends DataSource>, JoinableFactory>builder()
             .put(InlineDataSource.class, new InlineJoinableFactory())
+            .put(GlobalTableDataSource.class, new JoinableFactory()
+            {
+              @Override
+              public boolean isDirectlyJoinable(DataSource dataSource)
+              {
+                return ((GlobalTableDataSource) dataSource).getName().equals(GLOBAL);
+              }
+
+              @Override
+              public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
+              {
+                return Optional.empty();
+              }
+            })
             .build()
     );
 
@@ -651,7 +704,8 @@
                 ImmutableMap.of(
                     FOO, makeTimeline(FOO, FOO_INLINE),
                     BAR, makeTimeline(BAR, BAR_INLINE),
-                    MULTI, makeTimeline(MULTI, MULTI_VALUE_INLINE)
+                    MULTI, makeTimeline(MULTI, MULTI_VALUE_INLINE),
+                    GLOBAL, makeTimeline(GLOBAL, FOO_INLINE)
                 ),
                 joinableFactory,
                 conglomerate,
@@ -669,6 +723,7 @@
             ClusterOrLocal.LOCAL
         ),
         conglomerate,
+        joinableFactory,
         serverConfig
     );
   }
diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
index 1370ed7..445d8d6 100644
--- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java
+++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
@@ -95,6 +95,7 @@
       final QuerySegmentWalker clusterWalker,
       final QuerySegmentWalker localWalker,
       final QueryRunnerFactoryConglomerate conglomerate,
+      final JoinableFactory joinableFactory,
       final ServerConfig serverConfig
   )
   {
@@ -110,6 +111,7 @@
             return conglomerate.findFactory(query).getToolchest();
           }
         },
+        joinableFactory,
         new RetryQueryRunnerConfig(),
         TestHelper.makeJsonMapper(),
         serverConfig,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java
index 1655d73..6faf219 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java
@@ -337,6 +337,8 @@
   private static boolean computeRightRequiresSubquery(final DruidRel<?> right)
   {
     // Right requires a subquery unless it's a scan or mapping on top of a global datasource.
+    // ideally this would involve JoinableFactory.isDirectlyJoinable to check that the global datasources
+    // are in fact possibly joinable, but for now isGlobal is coupled to joinability
     return !(DruidRels.isScanOrMapping(right, false)
              && DruidRels.dataSourceIfLeafRel(right).filter(DataSource::isGlobal).isPresent());
   }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 6762aaf..b264749 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -56,6 +56,7 @@
 import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.coordination.DruidServerMetadata;
@@ -104,6 +105,7 @@
   private final PlannerConfig config;
   private final SegmentManager segmentManager;
   private final ViewManager viewManager;
+  private final JoinableFactory joinableFactory;
   private final ExecutorService cacheExec;
   private final ConcurrentMap<String, DruidTable> tables;
 
@@ -148,6 +150,7 @@
       final QueryLifecycleFactory queryLifecycleFactory,
       final TimelineServerView serverView,
       final SegmentManager segmentManager,
+      final JoinableFactory joinableFactory,
       final PlannerConfig config,
       final ViewManager viewManager,
       final Escalator escalator
@@ -156,6 +159,7 @@
     this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory");
     Preconditions.checkNotNull(serverView, "serverView");
     this.segmentManager = segmentManager;
+    this.joinableFactory = joinableFactory;
     this.config = Preconditions.checkNotNull(config, "config");
     this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager");
     this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d");
@@ -278,10 +282,11 @@
                 for (String dataSource : dataSourcesToRebuild) {
                   final DruidTable druidTable = buildDruidTable(dataSource);
                   final DruidTable oldTable = tables.put(dataSource, druidTable);
+                  final String description = druidTable.getDataSource().isGlobal() ? "global dataSource" : "dataSource";
                   if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
-                    log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature());
+                    log.info("%s [%s] has new signature: %s.", description, dataSource, druidTable.getRowSignature());
                   } else {
-                    log.debug("dataSource [%s] signature is unchanged.", dataSource);
+                    log.debug("%s [%s] signature is unchanged.", description, dataSource);
                   }
                 }
 
@@ -627,12 +632,21 @@
       columnTypes.forEach(builder::add);
 
       final TableDataSource tableDataSource;
-      if (segmentManager.getDataSourceNames().contains(dataSource)) {
-        tableDataSource = new GlobalTableDataSource(dataSource);
+
+      // to be a GlobalTableDataSource instead of a TableDataSource, it must appear on all servers (inferred by existing
+      // in the segment cache, which in this case belongs to the broker meaning only broadcast segments live here)
+      // to be joinable, it must be possibly joinable according to the factory. we only consider broadcast datasources
+      // at this time, and isGlobal is currently strongly coupled with joinable, so only make a global table datasource
+      // if also joinable
+      final GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(dataSource);
+      final boolean isJoinable = joinableFactory.isDirectlyJoinable(maybeGlobal);
+      final boolean isBroadcast = segmentManager.getDataSourceNames().contains(dataSource);
+      if (isBroadcast && isJoinable) {
+        tableDataSource = maybeGlobal;
       } else {
         tableDataSource = new TableDataSource(dataSource);
       }
-      return new DruidTable(tableDataSource, builder.build());
+      return new DruidTable(tableDataSource, builder.build(), isJoinable, isBroadcast);
     }
   }
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java
index bf84ea1..8ee93a2 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java
@@ -53,6 +53,7 @@
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.table.DruidTable;
 import org.apache.druid.sql.calcite.table.RowSignatures;
 
 import javax.annotation.Nullable;
@@ -83,6 +84,8 @@
       .add("TABLE_SCHEMA", ValueType.STRING)
       .add("TABLE_NAME", ValueType.STRING)
       .add("TABLE_TYPE", ValueType.STRING)
+      .add("IS_JOINABLE", ValueType.STRING)
+      .add("IS_BROADCAST", ValueType.STRING)
       .build();
   private static final RowSignature COLUMNS_SIGNATURE = RowSignature
       .builder()
@@ -109,6 +112,9 @@
     return Collections.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(datasourceName));
   };
 
+  private static final String INFO_TRUE = "YES";
+  private static final String INFO_FALSE = "NO";
+
   private final SchemaPlus rootSchema;
   private final Map<String, Table> tableMap;
   private final AuthorizerMapper authorizerMapper;
@@ -217,18 +223,27 @@
                   return Iterables.filter(
                       Iterables.concat(
                           FluentIterable.from(authorizedTableNames).transform(
-                              new Function<String, Object[]>()
-                              {
-                                @Override
-                                public Object[] apply(final String tableName)
-                                {
-                                  return new Object[]{
-                                      CATALOG_NAME, // TABLE_CATALOG
-                                      schemaName, // TABLE_SCHEMA
-                                      tableName, // TABLE_NAME
-                                      subSchema.getTable(tableName).getJdbcTableType().toString() // TABLE_TYPE
-                                  };
+                              tableName -> {
+                                final Table table = subSchema.getTable(tableName);
+                                final boolean isJoinable;
+                                final boolean isBroadcast;
+                                if (table instanceof DruidTable) {
+                                  DruidTable druidTable = (DruidTable) table;
+                                  isJoinable = druidTable.isJoinable();
+                                  isBroadcast = druidTable.isBroadcast();
+                                } else {
+                                  isJoinable = false;
+                                  isBroadcast = false;
                                 }
+
+                                return new Object[]{
+                                    CATALOG_NAME, // TABLE_CATALOG
+                                    schemaName, // TABLE_SCHEMA
+                                    tableName, // TABLE_NAME
+                                    table.getJdbcTableType().toString(), // TABLE_TYPE
+                                    isJoinable ? INFO_TRUE : INFO_FALSE, // IS_JOINABLE
+                                    isBroadcast ? INFO_TRUE : INFO_FALSE // IS_BROADCAST
+                                };
                               }
                           ),
                           FluentIterable.from(authorizedFunctionNames).transform(
@@ -242,7 +257,9 @@
                                         CATALOG_NAME, // TABLE_CATALOG
                                         schemaName, // TABLE_SCHEMA
                                         functionName, // TABLE_NAME
-                                        "VIEW" // TABLE_TYPE
+                                        "VIEW", // TABLE_TYPE
+                                        INFO_FALSE, // IS_JOINABLE
+                                        INFO_FALSE // IS_BROADCAST
                                     };
                                   } else {
                                     return null;
@@ -406,7 +423,7 @@
                       field.getName(), // COLUMN_NAME
                       String.valueOf(field.getIndex()), // ORDINAL_POSITION
                       "", // COLUMN_DEFAULT
-                      type.isNullable() ? "YES" : "NO", // IS_NULLABLE
+                      type.isNullable() ? INFO_TRUE : INFO_FALSE, // IS_NULLABLE
                       type.getSqlTypeName().toString(), // DATA_TYPE
                       null, // CHARACTER_MAXIMUM_LENGTH
                       null, // CHARACTER_OCTET_LENGTH
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java
index b3f3314..6ddeaab 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java
@@ -57,7 +57,9 @@
     final ImmutableMap.Builder<String, Table> tableMapBuilder = ImmutableMap.builder();
 
     for (final String lookupName : lookupProvider.getAllLookupNames()) {
-      tableMapBuilder.put(lookupName, new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE));
+      // all lookups should be also joinable through lookup joinable factory, and lookups are effectively broadcast
+      // (if we ignore lookup tiers...)
+      tableMapBuilder.put(lookupName, new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE, true, true));
     }
 
     return tableMapBuilder.build();
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java
index 521a051..94da5ed 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java
@@ -41,14 +41,20 @@
 {
   private final DataSource dataSource;
   private final RowSignature rowSignature;
+  private final boolean joinable;
+  private final boolean broadcast;
 
   public DruidTable(
       final DataSource dataSource,
-      final RowSignature rowSignature
+      final RowSignature rowSignature,
+      final boolean isJoinable,
+      final boolean isBroadcast
   )
   {
     this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
     this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature");
+    this.joinable = isJoinable;
+    this.broadcast = isBroadcast;
   }
 
   public DataSource getDataSource()
@@ -61,6 +67,16 @@
     return rowSignature;
   }
 
+  public boolean isJoinable()
+  {
+    return joinable;
+  }
+
+  public boolean isBroadcast()
+  {
+    return broadcast;
+  }
+
   @Override
   public Schema.TableType getJdbcTableType()
   {
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 1ad92cb..62c1519 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -708,59 +708,59 @@
   public void testInformationSchemaTables() throws Exception
   {
     testQuery(
-        "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE\n"
+        "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, IS_JOINABLE, IS_BROADCAST\n"
         + "FROM INFORMATION_SCHEMA.TABLES\n"
         + "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')",
         ImmutableList.of(),
         ImmutableList.<Object[]>builder()
-            .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"})
-            .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"})
-            .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE"})
-            .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE"})
-            .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"})
-            .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE"})
-            .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE"})
-            .add(new Object[]{"druid", "aview", "VIEW"})
-            .add(new Object[]{"druid", "bview", "VIEW"})
-            .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"})
-            .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"})
-            .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"})
-            .add(new Object[]{"lookup", "lookyloo", "TABLE"})
-            .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
-            .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
-            .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
-            .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"})
-            .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"})
+            .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", "aview", "VIEW", "NO", "NO"})
+            .add(new Object[]{"druid", "bview", "VIEW", "NO", "NO"})
+            .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"})
+            .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"})
+            .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "NO", "NO"})
+            .add(new Object[]{"lookup", "lookyloo", "TABLE", "YES", "YES"})
+            .add(new Object[]{"sys", "segments", "SYSTEM_TABLE", "NO", "NO"})
+            .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE", "NO", "NO"})
+            .add(new Object[]{"sys", "servers", "SYSTEM_TABLE", "NO", "NO"})
+            .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE", "NO", "NO"})
+            .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE", "NO", "NO"})
             .build()
     );
 
     testQuery(
         PLANNER_CONFIG_DEFAULT,
-        "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE\n"
+        "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, IS_JOINABLE, IS_BROADCAST\n"
         + "FROM INFORMATION_SCHEMA.TABLES\n"
         + "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')",
         CalciteTests.SUPER_USER_AUTH_RESULT,
         ImmutableList.of(),
         ImmutableList.<Object[]>builder()
-            .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"})
-            .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"})
-            .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE"})
-            .add(new Object[]{"druid", CalciteTests.FORBIDDEN_DATASOURCE, "TABLE"})
-            .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE"})
-            .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"})
-            .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE"})
-            .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE"})
-            .add(new Object[]{"druid", "aview", "VIEW"})
-            .add(new Object[]{"druid", "bview", "VIEW"})
-            .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"})
-            .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"})
-            .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"})
-            .add(new Object[]{"lookup", "lookyloo", "TABLE"})
-            .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
-            .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
-            .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
-            .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"})
-            .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"})
+            .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", CalciteTests.FORBIDDEN_DATASOURCE, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"})
+            .add(new Object[]{"druid", "aview", "VIEW", "NO", "NO"})
+            .add(new Object[]{"druid", "bview", "VIEW", "NO", "NO"})
+            .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"})
+            .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"})
+            .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "NO", "NO"})
+            .add(new Object[]{"lookup", "lookyloo", "TABLE", "YES", "YES"})
+            .add(new Object[]{"sys", "segments", "SYSTEM_TABLE", "NO", "NO"})
+            .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE", "NO", "NO"})
+            .add(new Object[]{"sys", "servers", "SYSTEM_TABLE", "NO", "NO"})
+            .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE", "NO", "NO"})
+            .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE", "NO", "NO"})
             .build()
     );
   }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java
index 11c2b97..a5e8831 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.sql.calcite.schema;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -38,6 +39,8 @@
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.query.lookup.LookupReferencesManager;
+import org.apache.druid.segment.join.JoinableFactory;
+import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.security.AuthorizerMapper;
@@ -102,6 +105,7 @@
         binder -> {
           binder.bind(QueryLifecycleFactory.class).toInstance(queryLifecycleFactory);
           binder.bind(TimelineServerView.class).toInstance(serverView);
+          binder.bind(JoinableFactory.class).toInstance(new MapJoinableFactory(ImmutableMap.of()));
           binder.bind(PlannerConfig.class).toInstance(plannerConfig);
           binder.bind(ViewManager.class).toInstance(viewManager);
           binder.bind(Escalator.class).toInstance(escalator);
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
index fd20fb5..0ba26ea 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.SegmentManager;
@@ -54,6 +55,7 @@
           ),
           new TestServerInventoryView(Collections.emptyList()),
           new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
+          new MapJoinableFactory(ImmutableMap.of()),
           PLANNER_CONFIG_DEFAULT,
           new NoopViewManager(),
           new NoopEscalator()
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
index 8455965..adb3626 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
@@ -33,6 +33,7 @@
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.DataSource;
 import org.apache.druid.query.GlobalTableDataSource;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.TableDataSource;
@@ -43,6 +44,10 @@
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.join.JoinConditionAnalysis;
+import org.apache.druid.segment.join.Joinable;
+import org.apache.druid.segment.join.JoinableFactory;
+import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.server.QueryStackTests;
@@ -77,6 +82,7 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -132,12 +138,15 @@
   private SpecificSegmentsQuerySegmentWalker walker = null;
   private DruidSchema schema = null;
   private SegmentManager segmentManager;
-  private Set<String> dataSourceNames;
+  private Set<String> segmentDataSourceNames;
+  private Set<String> joinableDataSourceNames;
 
   @Before
   public void setUp() throws Exception
   {
-    dataSourceNames = Sets.newConcurrentHashSet();
+    segmentDataSourceNames = Sets.newConcurrentHashSet();
+    joinableDataSourceNames = Sets.newConcurrentHashSet();
+
     final File tmpDir = temporaryFolder.newFolder();
     final QueryableIndex index1 = IndexBuilder.create()
                                               .tmpDir(new File(tmpDir, "1"))
@@ -173,7 +182,7 @@
       public Set<String> getDataSourceNames()
       {
         getDatasourcesLatch.countDown();
-        return dataSourceNames;
+        return segmentDataSourceNames;
       }
     };
 
@@ -222,10 +231,30 @@
     serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments);
     druidServers = serverView.getDruidServers();
 
+    final JoinableFactory globalTableJoinable = new JoinableFactory()
+    {
+      @Override
+      public boolean isDirectlyJoinable(DataSource dataSource)
+      {
+        return dataSource instanceof GlobalTableDataSource &&
+               joinableDataSourceNames.contains(((GlobalTableDataSource) dataSource).getName());
+      }
+
+      @Override
+      public Optional<Joinable> build(
+          DataSource dataSource,
+          JoinConditionAnalysis condition
+      )
+      {
+        return Optional.empty();
+      }
+    };
+
     schema = new DruidSchema(
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         serverView,
         segmentManager,
+        new MapJoinableFactory(ImmutableMap.of(GlobalTableDataSource.class, globalTableJoinable)),
         PLANNER_CONFIG_DEFAULT,
         new NoopViewManager(),
         new NoopEscalator()
@@ -461,12 +490,16 @@
   }
 
   @Test
-  public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedException
+  public void testLocalSegmentCacheSetsDataSourceAsGlobalAndJoinable() throws InterruptedException
   {
     DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo");
     Assert.assertNotNull(fooTable);
     Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
     Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
+    Assert.assertFalse(fooTable.isJoinable());
+    Assert.assertFalse(fooTable.isBroadcast());
+
+    buildTableLatch.await(1, TimeUnit.SECONDS);
 
     final DataSegment someNewBrokerSegment = new DataSegment(
         "foo",
@@ -481,12 +514,12 @@
         100L,
         PruneSpecsHolder.DEFAULT
     );
-    dataSourceNames.add("foo");
+    segmentDataSourceNames.add("foo");
+    joinableDataSourceNames.add("foo");
     serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
 
-    // wait for build
-    buildTableLatch.await(1, TimeUnit.SECONDS);
-    buildTableLatch = new CountDownLatch(1);
+    // wait for build twice
+    buildTableLatch = new CountDownLatch(2);
     buildTableLatch.await(1, TimeUnit.SECONDS);
 
     // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
@@ -497,9 +530,12 @@
     Assert.assertNotNull(fooTable);
     Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
     Assert.assertTrue(fooTable.getDataSource() instanceof GlobalTableDataSource);
+    Assert.assertTrue(fooTable.isJoinable());
+    Assert.assertTrue(fooTable.isBroadcast());
 
     // now remove it
-    dataSourceNames.remove("foo");
+    joinableDataSourceNames.remove("foo");
+    segmentDataSourceNames.remove("foo");
     serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER);
 
     // wait for build
@@ -515,6 +551,74 @@
     Assert.assertNotNull(fooTable);
     Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
     Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
+    Assert.assertFalse(fooTable.isJoinable());
+    Assert.assertFalse(fooTable.isBroadcast());
   }
 
+  @Test
+  public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throws InterruptedException
+  {
+    DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo");
+    Assert.assertNotNull(fooTable);
+    Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
+    Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
+    Assert.assertFalse(fooTable.isJoinable());
+    Assert.assertFalse(fooTable.isBroadcast());
+
+    // wait for build twice
+    buildTableLatch.await(1, TimeUnit.SECONDS);
+
+    final DataSegment someNewBrokerSegment = new DataSegment(
+        "foo",
+        Intervals.of("2012/2013"),
+        "version1",
+        null,
+        ImmutableList.of("dim1", "dim2"),
+        ImmutableList.of("met1", "met2"),
+        new NumberedShardSpec(2, 3),
+        null,
+        1,
+        100L,
+        PruneSpecsHolder.DEFAULT
+    );
+    segmentDataSourceNames.add("foo");
+    serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
+
+    buildTableLatch = new CountDownLatch(2);
+    buildTableLatch.await(1, TimeUnit.SECONDS);
+
+    // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
+    getDatasourcesLatch = new CountDownLatch(1);
+    getDatasourcesLatch.await(1, TimeUnit.SECONDS);
+
+    fooTable = (DruidTable) schema.getTableMap().get("foo");
+    Assert.assertNotNull(fooTable);
+    Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
+    // should not be a GlobalTableDataSource for now, because isGlobal is couple with joinability. idealy this will be
+    // changed in the future and we should expect
+    Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
+    Assert.assertTrue(fooTable.isBroadcast());
+    Assert.assertFalse(fooTable.isJoinable());
+
+
+    // now remove it
+    segmentDataSourceNames.remove("foo");
+    serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER);
+
+    // wait for build
+    buildTableLatch.await(1, TimeUnit.SECONDS);
+    buildTableLatch = new CountDownLatch(1);
+    buildTableLatch.await(1, TimeUnit.SECONDS);
+
+    // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
+    getDatasourcesLatch = new CountDownLatch(1);
+    getDatasourcesLatch.await(1, TimeUnit.SECONDS);
+
+    fooTable = (DruidTable) schema.getTableMap().get("foo");
+    Assert.assertNotNull(fooTable);
+    Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
+    Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
+    Assert.assertFalse(fooTable.isBroadcast());
+    Assert.assertFalse(fooTable.isJoinable());
+  }
 }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 3e1356a..ace7aca 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -67,6 +67,7 @@
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.server.DruidNode;
@@ -242,6 +243,7 @@
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         new TestServerInventoryView(walker.getSegments(), realtimeSegments),
         new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
+        new MapJoinableFactory(ImmutableMap.of()),
         PLANNER_CONFIG_DEFAULT,
         new NoopViewManager(),
         new NoopEscalator()
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index b7d56f5..ca683a2 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -73,6 +73,7 @@
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.server.DruidNode;
@@ -979,6 +980,7 @@
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         new TestServerInventoryView(walker.getSegments()),
         new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
+        new MapJoinableFactory(ImmutableMap.of()),
         plannerConfig,
         viewManager,
         TEST_AUTHENTICATOR_ESCALATOR
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
index 1da0fba..0490420 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -120,6 +120,7 @@
             scheduler
         ),
         conglomerate,
+        joinableFactoryToUse,
         new ServerConfig()
     );
   }