Merge pull request #1430 from MabelYC/addLabel

SAMZA-2574 : improve flexibility of SystemFactory interface
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
index 23234cd..4c45dde 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -70,6 +70,10 @@
   public static final String READ_FN = "io.read.func";
   public static final String WRITE_FN = "io.write.func";
   public static final String RATE_LIMITER = "io.ratelimiter";
+  //Key name for table api read rate limit
+  public static final String READ_CREDITS = "io.read.credits";
+  //Key name for table api write rate limit
+  public static final String WRITE_CREDITS = "io.write.credits";
   public static final String READ_CREDIT_FN = "io.read.credit.func";
   public static final String WRITE_CREDIT_FN = "io.write.credit.func";
   public static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size";
@@ -283,27 +287,7 @@
 
     Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
 
-    // Handle rate limiter
-    if (!tagCreditsMap.isEmpty()) {
-      RateLimiter defaultRateLimiter;
-      try {
-        @SuppressWarnings("unchecked")
-        Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
-        Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class);
-        defaultRateLimiter = ctor.newInstance(tagCreditsMap);
-      } catch (Exception ex) {
-        throw new SamzaException("Failed to create default rate limiter", ex);
-      }
-      addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", defaultRateLimiter), tableConfig);
-      if (defaultRateLimiter instanceof TablePart) {
-        addTablePartConfig(RATE_LIMITER, (TablePart) defaultRateLimiter, jobConfig, tableConfig);
-      }
-    } else if (rateLimiter != null) {
-      addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter), tableConfig);
-      if (rateLimiter instanceof TablePart) {
-        addTablePartConfig(RATE_LIMITER, (TablePart) rateLimiter, jobConfig, tableConfig);
-      }
-    }
+    writeRateLimiterConfig(jobConfig, tableConfig);
 
     // Handle readCredit functions
     if (readCreditFn != null) {
@@ -350,6 +334,37 @@
     return Collections.unmodifiableMap(tableConfig);
   }
 
+  // Handle rate limiter
+  private void writeRateLimiterConfig(Config jobConfig, Map<String, String> tableConfig) {
+    if (!tagCreditsMap.isEmpty()) {
+      RateLimiter defaultRateLimiter;
+      try {
+        @SuppressWarnings("unchecked")
+        Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
+        Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class);
+        defaultRateLimiter = ctor.newInstance(tagCreditsMap);
+      } catch (Exception ex) {
+        throw new SamzaException("Failed to create default rate limiter", ex);
+      }
+      addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", defaultRateLimiter), tableConfig);
+      if (defaultRateLimiter instanceof TablePart) {
+        addTablePartConfig(RATE_LIMITER, (TablePart) defaultRateLimiter, jobConfig, tableConfig);
+      }
+    } else if (rateLimiter != null) {
+      addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter), tableConfig);
+      if (rateLimiter instanceof TablePart) {
+        addTablePartConfig(RATE_LIMITER, (TablePart) rateLimiter, jobConfig, tableConfig);
+      }
+    }
+    //Write table api read/write rate limit
+    if (this.enableReadRateLimiter && tagCreditsMap.containsKey(RL_READ_TAG)) {
+      addTableConfig(READ_CREDITS, String.valueOf(tagCreditsMap.get(RL_READ_TAG)), tableConfig);
+    }
+    if (this.enableWriteRateLimiter && tagCreditsMap.containsKey(RL_WRITE_TAG)) {
+      addTableConfig(WRITE_CREDITS, String.valueOf(tagCreditsMap.get(RL_WRITE_TAG)), tableConfig);
+    }
+  }
+
   @Override
   protected void validate() {
     Preconditions.checkArgument(writeFn != null || readFn != null,
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index cf03599..18f2582 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -254,6 +254,17 @@
         "{\"exponentialFactor\":0.0,\"backoffType\":\"NONE\",\"retryPredicate\":{}}");
   }
 
+  @Test
+  public void testReadWriteRateLimitToConfig() {
+    Map<String, String> tableConfig = new RemoteTableDescriptor("1").withReadFunction(createMockTableReadFunction())
+        .withReadRetryPolicy(new TableRetryPolicy())
+        .withWriteRateLimit(1000)
+        .withReadRateLimit(2000)
+        .toConfig(new MapConfig());
+    Assert.assertEquals(String.valueOf(2000), tableConfig.get("tables.1.io.read.credits"));
+    Assert.assertEquals(String.valueOf(1000), tableConfig.get("tables.1.io.write.credits"));
+  }
+
   private Context createMockContext(TableDescriptor tableDescriptor) {
     Context context = mock(Context.class);