Merge pull request #1430 from MabelYC/addLabel
SAMZA-2574 : improve flexibility of SystemFactory interface
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
index 23234cd..4c45dde 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -70,6 +70,10 @@
public static final String READ_FN = "io.read.func";
public static final String WRITE_FN = "io.write.func";
public static final String RATE_LIMITER = "io.ratelimiter";
+ //Key name for table api read rate limit
+ public static final String READ_CREDITS = "io.read.credits";
+ //Key name for table api write rate limit
+ public static final String WRITE_CREDITS = "io.write.credits";
public static final String READ_CREDIT_FN = "io.read.credit.func";
public static final String WRITE_CREDIT_FN = "io.write.credit.func";
public static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size";
@@ -283,27 +287,7 @@
Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
- // Handle rate limiter
- if (!tagCreditsMap.isEmpty()) {
- RateLimiter defaultRateLimiter;
- try {
- @SuppressWarnings("unchecked")
- Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
- Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class);
- defaultRateLimiter = ctor.newInstance(tagCreditsMap);
- } catch (Exception ex) {
- throw new SamzaException("Failed to create default rate limiter", ex);
- }
- addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", defaultRateLimiter), tableConfig);
- if (defaultRateLimiter instanceof TablePart) {
- addTablePartConfig(RATE_LIMITER, (TablePart) defaultRateLimiter, jobConfig, tableConfig);
- }
- } else if (rateLimiter != null) {
- addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter), tableConfig);
- if (rateLimiter instanceof TablePart) {
- addTablePartConfig(RATE_LIMITER, (TablePart) rateLimiter, jobConfig, tableConfig);
- }
- }
+ writeRateLimiterConfig(jobConfig, tableConfig);
// Handle readCredit functions
if (readCreditFn != null) {
@@ -350,6 +334,37 @@
return Collections.unmodifiableMap(tableConfig);
}
+ // Handle rate limiter
+ private void writeRateLimiterConfig(Config jobConfig, Map<String, String> tableConfig) {
+ if (!tagCreditsMap.isEmpty()) {
+ RateLimiter defaultRateLimiter;
+ try {
+ @SuppressWarnings("unchecked")
+ Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
+ Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class);
+ defaultRateLimiter = ctor.newInstance(tagCreditsMap);
+ } catch (Exception ex) {
+ throw new SamzaException("Failed to create default rate limiter", ex);
+ }
+ addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", defaultRateLimiter), tableConfig);
+ if (defaultRateLimiter instanceof TablePart) {
+ addTablePartConfig(RATE_LIMITER, (TablePart) defaultRateLimiter, jobConfig, tableConfig);
+ }
+ } else if (rateLimiter != null) {
+ addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter), tableConfig);
+ if (rateLimiter instanceof TablePart) {
+ addTablePartConfig(RATE_LIMITER, (TablePart) rateLimiter, jobConfig, tableConfig);
+ }
+ }
+ //Write table api read/write rate limit
+ if (this.enableReadRateLimiter && tagCreditsMap.containsKey(RL_READ_TAG)) {
+ addTableConfig(READ_CREDITS, String.valueOf(tagCreditsMap.get(RL_READ_TAG)), tableConfig);
+ }
+ if (this.enableWriteRateLimiter && tagCreditsMap.containsKey(RL_WRITE_TAG)) {
+ addTableConfig(WRITE_CREDITS, String.valueOf(tagCreditsMap.get(RL_WRITE_TAG)), tableConfig);
+ }
+ }
+
@Override
protected void validate() {
Preconditions.checkArgument(writeFn != null || readFn != null,
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index cf03599..18f2582 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -254,6 +254,17 @@
"{\"exponentialFactor\":0.0,\"backoffType\":\"NONE\",\"retryPredicate\":{}}");
}
+ @Test
+ public void testReadWriteRateLimitToConfig() {
+ Map<String, String> tableConfig = new RemoteTableDescriptor("1").withReadFunction(createMockTableReadFunction())
+ .withReadRetryPolicy(new TableRetryPolicy())
+ .withWriteRateLimit(1000)
+ .withReadRateLimit(2000)
+ .toConfig(new MapConfig());
+ Assert.assertEquals(String.valueOf(2000), tableConfig.get("tables.1.io.read.credits"));
+ Assert.assertEquals(String.valueOf(1000), tableConfig.get("tables.1.io.write.credits"));
+ }
+
private Context createMockContext(TableDescriptor tableDescriptor) {
Context context = mock(Context.class);