Merge pull request #1513 from dxichen/merge-state-backend-async-commit
Merge changes from master onto feature branch.
diff --git a/build.gradle b/build.gradle
index a3512e0..2b98b8a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -127,11 +127,6 @@
toolVersion = "$checkstyleVersion"
}
- tasks.withType(ScalaCompile) {
- // show compile errors in console output
- logging.setLevel LogLevel.WARN
- }
-
tasks.withType(Test) {
test {
testLogging {
@@ -196,7 +191,6 @@
compile "org.scala-lang:scala-library:$scalaVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
compile "net.jodah:failsafe:$failsafeVersion"
- compile "com.linkedin.cytodynamics:cytodynamics-nucleus:$cytodynamicsVersion"
testCompile project(":samza-api").sourceSets.test.output
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
@@ -221,6 +215,7 @@
dependencies {
compile "com.azure:azure-storage-blob:12.0.1"
+ compile "com.azure:azure-identity:1.0.0"
compile "com.microsoft.azure:azure-storage:5.3.1"
compile "com.microsoft.azure:azure-eventhubs:1.0.1"
compile "com.fasterxml.jackson.core:jackson-core:$jacksonVersion"
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index b588e64..18a782f 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1912,6 +1912,22 @@
</tr>
<tr>
+ <td class="property" id="stores-rocksdb-max-open-files">stores.<span class="store">store-name</span>.<br>rocksdb.max.open.files</td>
+ <td class="default">-1</td>
+ <td class="description">
+ Limits the number of open files that RocksDB can have open at one time.
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="stores-rocksdb-max-file-opening-threads">stores.<span class="store">store-name</span>.<br>rocksdb.max.file.opening.threads</td>
+ <td class="default">16</td>
+ <td class="description">
+ Sets the number of threads used to open RocksDB files.
+ </td>
+ </tr>
+
+ <tr>
<td class="property" id="stores-rocksdb-metrics">stores.<span class="store">store-name</span>.<br>rocksdb.metrics.list</td>
<td class="default"></td>
<td class="description">
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 596dc52..1ef74cd 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -258,7 +258,7 @@
#### <a name="advanced-azure-blob-storage"></a>[Advanced Azure Blob Storage Configurations](#advanced-azure-blob-storage)
|Name|Default|Description|
|--- |--- |--- |
-|systems.**_system-name_**.azureblob.proxy.use |false|if true, proxy will be used to connect to Azure.|
+|systems.**_system-name_**.azureblob.proxy.use |false|if true, proxy will be used to connect to Azure for blob creation.|
|systems.**_system-name_**.azureblob.proxy.hostname| |if proxy.use is true then host name of proxy.|
|systems.**_system-name_**.azureblob.proxy.port| |if proxy.use is true then port of proxy.|
|systems.**_system-name_**.azureblob.writer.factory.class|`org.apache.samza.system.`<br>`azureblob.avro.`<br>`AzureBlobAvroWriterFactory`|Fully qualified class name of the `org.apache.samza.system.azureblob.producer.AzureBlobWriter` impl for the system producer.<br><br>The default writer creates blobs that are of type AVRO and require the messages sent to a blob to be AVRO records. The blobs created by the default writer are of type [Block Blobs](https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs#about-block-blobs).|
@@ -272,7 +272,14 @@
|systems.**_system-name_**.azureblob.closeTimeoutMs|300000 (5 mins)|timeout to finish committing all the blobs currently being written to. This does not include the flush timeout per blob.|
|systems.**_system-name_**.azureblob.suffixRandomStringToBlobName|true|if true, a random string of 8 chars is suffixed to the blob name to prevent name collision when more than one Samza tasks are writing to the same SSP.|
|systems.**_system-name_**.azureblob.metadataPropertiesGeneratorFactory|`org.apache.samza.system.`<br>`azureblob.utils.`<br>`NullBlobMetadataGeneratorFactory`|Fully qualified class name of the `org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory` impl for the system producer. <br><br>The default metadata generator does not add any metadata to the blob.|
-|systems.**_system-name_**.azureblob.metadataGeneratorConfig| |Additional configs for the metadata generator should be prefixed with this string which is passed to the generator.<br>For example, to pass a "key":"value" pair to the metadata generator, add config like systems.<system-name>.azureblob.metadataGeneratorConfig.\<key\> with value \<value\>|
+|systems.**_system-name_**.azureblob.metadataGeneratorConfig| |Additional configs for the metadata generator should be prefixed with this string which is passed to the generator.<br>For example, to pass a "key":"value" pair to the metadata generator, add config like systems.<system-name>.azureblob.metadataGeneratorConfig.\<key\> with value \<value\>|
+|systems.**_system-name_**.azureblob.useTokenCredentialAuthentication|false| if true, then com.azure.core.credential.TokenCredential is used to authenticate with Azure.|
+|systems.**_system-name_**.azureblob.client.id| | if TokenCredential authentication, then the client id to be used for authentication.|
+|systems.**_system-name_**.azureblob.client.secret| | if TokenCredential authentication, then the client secret to be used for authentication.|
+|systems.**_system-name_**.azureblob.tenant.id| | if TokenCredential authentication, then the tenant id to be used for authentication.|
+|systems.**_system-name_**.azureblob.authProxy.use|false| if TokenCredential authentication, then setting this to true will result in using a proxy for authentication.|
+|systems.**_system-name_**.azureblob.authProxy.hostName| |if authProxy.use is true then host name of proxy.|
+|systems.**_system-name_**.azureblob.authProxy.port| |if authProxy.use is true then port of proxy.|
### <a name="state-storage"></a>[4. State Storage](#state-storage)
diff --git a/gradle.properties b/gradle.properties
index d7c4cd7..1a6ee97 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -25,3 +25,5 @@
systemProp.file.encoding=utf-8
checkstyleVersion=6.11.2
+
+org.gradle.parallel=true
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index ebd2d38..019cde3 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -25,7 +25,6 @@
commonsHttpClientVersion = "3.1"
commonsIoVersion = "2.8.0"
commonsLang3Version = "3.11"
- cytodynamicsVersion = "0.2.0"
elasticsearchVersion = "2.2.0"
gsonVersion = "2.8.6"
guavaVersion = "30.1-jre"
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java
new file mode 100644
index 0000000..b036dc6
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob;
+
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+import com.azure.core.http.policy.HttpLogDetailLevel;
+import com.azure.core.http.policy.HttpLogOptions;
+import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.storage.blob.BlobServiceAsyncClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import java.net.InetSocketAddress;
+import com.azure.core.http.HttpClient;
+import java.util.Locale;
+
+
+/**
+ * This class provides a method to create {@link BlobServiceAsyncClient} to be used by the
+ * {@link org.apache.samza.system.azureblob.producer.AzureBlobSystemProducer}. It created the client based on the
+ * configs given to the SystemProducer - such as which authentication method to use, whether to use proxy to authenticate,
+ * and so on.
+ */
+public final class AzureBlobClientBuilder {
+ private final String systemName;
+ private final String azureUrlFormat;
+ private final AzureBlobConfig azureBlobConfig;
+ public AzureBlobClientBuilder(String systemName, String azureUrlFormat, AzureBlobConfig azureBlobConfig) {
+ this.systemName = systemName;
+ this.azureUrlFormat = azureUrlFormat;
+ this.azureBlobConfig = azureBlobConfig;
+ }
+
+ /**
+ * method creates BlobServiceAsyncClient using the configs provided earlier.
+ * if the authentication method is set to {@link TokenCredential} then a {@link com.azure.identity.ClientSecretCredential}
+ * is created and used for the Blob client. Else authentication is done via account name and key using the
+ * {@link StorageSharedKeyCredential}.
+ * The config used to determine which authentication is systems.%s.azureblob.useTokenCredentialAuthentication = true
+ * for using TokenCredential.
+ * @return BlobServiceAsyncClient
+ */
+ public BlobServiceAsyncClient getBlobServiceAsyncClient() {
+ BlobServiceClientBuilder blobServiceClientBuilder = getBlobServiceClientBuilder();
+
+ if (azureBlobConfig.getUseTokenCredentialAuthentication(systemName)) {
+ // Use your Azure Blob Storage account's name and client details to create a token credential object to access your account.
+ TokenCredential tokenCredential = getTokenCredential();
+ return blobServiceClientBuilder.credential(tokenCredential).buildAsyncClient();
+ }
+
+ // Use your Azure Blob Storage account's name and key to create a credential object to access your account.
+ StorageSharedKeyCredential storageSharedKeyCredential = getStorageSharedKeyCredential();
+ return blobServiceClientBuilder.credential(storageSharedKeyCredential).buildAsyncClient();
+ }
+
+ /**
+ * Method to get the builder {@link BlobServiceClientBuilder} for creating BlobServiceAsyncClient.
+ * This builder is not provided the credential for authentication here.
+ * this builder is given an endpoint for the Azure Storage account and a http client to be passed on to the
+ * BlobServiceAsyncClient for blob creation.
+ * @return BlobServiceClientBuilder
+ */
+ private BlobServiceClientBuilder getBlobServiceClientBuilder() {
+ // From the Azure portal, get your Storage account blob service AsyncClient endpoint.
+ String endpoint = String.format(Locale.ROOT, azureUrlFormat, azureBlobConfig.getAzureAccountName(systemName));
+
+ HttpLogOptions httpLogOptions = new HttpLogOptions();
+ httpLogOptions.setLogLevel(HttpLogDetailLevel.BASIC);
+
+ BlobServiceClientBuilder blobServiceClientBuilder = new BlobServiceClientBuilder()
+ .httpLogOptions(httpLogOptions)
+ .endpoint(endpoint)
+ .httpClient(getHttpClient());
+
+ return blobServiceClientBuilder;
+ }
+
+ /**
+ * Method to create the {@link com.azure.identity.ClientSecretCredential} to be used for authenticating with
+ * Azure Storage. If the config systems.%s.azureblob.authProxy.use is set to true, then authentication happens
+ * via the proxy specified by systems.%s.azureblob.authProxy.hostname and systems.%s.azureblob.authProxy.port configs.
+ * @return ClientSecretCredential which extends from {@link TokenCredential}
+ */
+ private TokenCredential getTokenCredential() {
+ ClientSecretCredentialBuilder clientSecretCredentialBuilder = new ClientSecretCredentialBuilder()
+ .clientId(azureBlobConfig.getAzureClientId(systemName))
+ .clientSecret(azureBlobConfig.getAzureClientSecret(systemName))
+ .tenantId(azureBlobConfig.getAzureTenantId(systemName));
+
+ if (azureBlobConfig.getUseAuthProxy(systemName)) {
+ return clientSecretCredentialBuilder
+ .proxyOptions(new ProxyOptions(ProxyOptions.Type.HTTP,
+ new InetSocketAddress(azureBlobConfig.getAuthProxyHostName(systemName),
+ azureBlobConfig.getAuthProxyPort(systemName))))
+ .build();
+ }
+ return clientSecretCredentialBuilder
+ .build();
+ }
+
+ /**
+ * Method to create {@link StorageSharedKeyCredential} to used for authenticating with Azure Storage.
+ * @return StorageSharedKeyCredential
+ */
+ private StorageSharedKeyCredential getStorageSharedKeyCredential() {
+ return new StorageSharedKeyCredential(azureBlobConfig.getAzureAccountName(systemName),
+ azureBlobConfig.getAzureAccountKey(systemName));
+ }
+
+ /**
+ * Method to create {@link HttpClient} to be used by the {@link BlobServiceAsyncClient} while creating blobs
+ * in the Azure Storage. If the config systems.%s.azureblob.proxy.use is set to true then the http client
+ * uses the proxy provided by systems.%s.azureblob.proxy.hostname and systems.%s.azureblob.proxy.port configs.
+ * @return HttpClient
+ */
+ private HttpClient getHttpClient() {
+ HttpClient httpClient;
+ if (azureBlobConfig.getUseBlobProxy(systemName)) {
+ httpClient = new NettyAsyncHttpClientBuilder()
+ .proxy(new ProxyOptions(ProxyOptions.Type.HTTP,
+ new InetSocketAddress(azureBlobConfig.getAzureBlobProxyHostname(systemName),
+ azureBlobConfig.getAzureBlobProxyPort(systemName))))
+ .build();
+ } else {
+ httpClient = HttpClient.createDefault();
+ }
+ return httpClient;
+ }
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
index 58f206e..3026a89 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
@@ -33,11 +33,27 @@
public static final String AZURE_BLOB_LOG_SLOW_REQUESTS_MS = "samza.azureblob.log.slowRequestMs";
private static final long AZURE_BLOB_LOG_SLOW_REQUESTS_MS_DEFAULT = Duration.ofSeconds(30).toMillis();
+ // Azure authentication - via a/c name+key or ClientSecretCredential
// system Level Properties.
// fully qualified class name of the AzureBlobWriter impl for the producer system
public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME = SYSTEM_AZUREBLOB_PREFIX + "writer.factory.class";
public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME_DEFAULT = "org.apache.samza.system.azureblob.avro.AzureBlobAvroWriterFactory";
+ public static final String SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION = Config.SENSITIVE_PREFIX + SYSTEM_AZUREBLOB_PREFIX + "useTokenCredentialAuthentication";
+ private static final boolean SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION_DEFAULT = false;
+
+ // ClientSecretCredential needs client id, client secret, tenant id, vault name, service principal
+ public static final String SYSTEM_AZURE_CLIENT_ID = Config.SENSITIVE_PREFIX + SYSTEM_AZUREBLOB_PREFIX + "client.id";
+ public static final String SYSTEM_AZURE_CLIENT_SECRET = Config.SENSITIVE_PREFIX + SYSTEM_AZUREBLOB_PREFIX + "client.secret";
+ public static final String SYSTEM_AZURE_TENANT_ID = Config.SENSITIVE_PREFIX + SYSTEM_AZUREBLOB_PREFIX + "tenant.id";
+ // Whether to use proxy while authenticating with Azure
+ public static final String SYSTEM_AZURE_USE_AUTH_PROXY = SYSTEM_AZUREBLOB_PREFIX + "authProxy.use";
+ public static final boolean SYSTEM_AZURE_USE_AUTH_PROXY_DEFAULT = false;
+
+ // name of the host to be used as auth proxy
+ public static final String SYSTEM_AZURE_AUTH_PROXY_HOSTNAME = SYSTEM_AZUREBLOB_PREFIX + "authProxy.hostname";
+ // port in the auth proxy host to be used
+ public static final String SYSTEM_AZURE_AUTH_PROXY_PORT = SYSTEM_AZUREBLOB_PREFIX + "authProxy.port";
// Azure Storage Account name under which the Azure container representing this system is.
// System name = Azure container name
// (https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata#container-names)
@@ -101,7 +117,6 @@
public static final String SYSTEM_BLOB_METADATA_PROPERTIES_GENERATOR_FACTORY = SYSTEM_AZUREBLOB_PREFIX + "metadataPropertiesGeneratorFactory";
private static final String SYSTEM_BLOB_METADATA_PROPERTIES_GENERATOR_FACTORY_DEFAULT =
"org.apache.samza.system.azureblob.utils.NullBlobMetadataGeneratorFactory";
-
// Additional configs for the metadata generator should be prefixed with this string which is passed to the generator.
// for example, to pass a "key":"value" pair to the metadata generator, add config like
// systems.<system-name>.azureblob.metadataGeneratorConfig.<key> with value <value>
@@ -127,11 +142,11 @@
return accountName;
}
- public boolean getUseProxy(String systemName) {
+ public boolean getUseBlobProxy(String systemName) {
return getBoolean(String.format(SYSTEM_AZURE_USE_PROXY, systemName), SYSTEM_AZURE_USE_PROXY_DEFAULT);
}
- public String getAzureProxyHostname(String systemName) {
+ public String getAzureBlobProxyHostname(String systemName) {
String hostname = get(String.format(SYSTEM_AZURE_PROXY_HOSTNAME, systemName));
if (hostname == null) {
throw new ConfigException("Azure proxy host name is required.");
@@ -139,7 +154,7 @@
return hostname;
}
- public int getAzureProxyPort(String systemName) {
+ public int getAzureBlobProxyPort(String systemName) {
return getInt(String.format(SYSTEM_AZURE_PROXY_PORT, systemName));
}
@@ -207,4 +222,50 @@
public Config getSystemBlobMetadataGeneratorConfigs(String systemName) {
return subset(String.format(SYSTEM_BLOB_METADATA_GENERATOR_CONFIG_PREFIX, systemName));
}
+
+ public boolean getUseTokenCredentialAuthentication(String systemName) {
+ return getBoolean(String.format(SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION, systemName),
+ SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION_DEFAULT);
+ }
+
+ public boolean getUseAuthProxy(String systemName) {
+ return getBoolean(String.format(SYSTEM_AZURE_USE_AUTH_PROXY, systemName), SYSTEM_AZURE_USE_AUTH_PROXY_DEFAULT);
+ }
+
+ public String getAuthProxyHostName(String systemName) {
+ String hostname = get(String.format(SYSTEM_AZURE_AUTH_PROXY_HOSTNAME, systemName));
+ if (hostname == null) {
+ throw new ConfigException("Azure proxy host name is required.");
+ }
+ return hostname;
+ }
+
+ public int getAuthProxyPort(String systemName) {
+ return getInt(String.format(SYSTEM_AZURE_AUTH_PROXY_PORT, systemName));
+ }
+
+ public String getAzureClientId(String systemName) {
+ String clientId = get(String.format(SYSTEM_AZURE_CLIENT_ID, systemName));
+ if (clientId == null) {
+ throw new ConfigException("Azure Client id is required.");
+ }
+ return clientId;
+ }
+
+ public String getAzureClientSecret(String systemName) {
+ String clientSecret = get(String.format(SYSTEM_AZURE_CLIENT_SECRET, systemName));
+ if (clientSecret == null) {
+ throw new ConfigException("Azure Client secret is required.");
+ }
+ return clientSecret;
+ }
+
+
+ public String getAzureTenantId(String systemName) {
+ String tenantId = get(String.format(SYSTEM_AZURE_TENANT_ID, systemName));
+ if (tenantId == null) {
+ throw new ConfigException("Azure tenant id is required.");
+ }
+ return tenantId;
+ }
}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index e615808..222b99c 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -273,7 +273,7 @@
// SAMZA-2476 stubbing BlockBlobAsyncClient.stageBlock was causing flaky tests.
@VisibleForTesting
- void stageBlock(String blockIdEncoded, ByteBuffer outputStream, int blockSize) {
+ void stageBlock(String blockIdEncoded, ByteBuffer outputStream, int blockSize) throws InterruptedException {
blobAsyncClient.stageBlock(blockIdEncoded, Flux.just(outputStream), blockSize).block();
}
@@ -335,6 +335,11 @@
// StageBlock generates exception on Failure.
stageBlock(blockIdEncoded, outputStream, blockSize);
break;
+ } catch (InterruptedException e) {
+ String msg = String.format("Upload block for blob: %s failed for blockid: %s due to InterruptedException.",
+ blobAsyncClient.getBlobUrl().toString(), blockId);
+ LOG.error(msg, e);
+ throw new AzureException("InterruptedException encountered during block upload. Will not retry.", e);
} catch (Exception e) {
attemptCount += 1;
String msg = "Upload block for blob: " + blobAsyncClient.getBlobUrl().toString()
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index d89f38f..781dce4 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -19,27 +19,16 @@
package org.apache.samza.system.azureblob.producer;
-import com.azure.core.http.HttpClient;
import com.azure.core.http.HttpResponse;
-import com.azure.core.http.ProxyOptions;
-import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
-import com.azure.core.http.policy.HttpLogDetailLevel;
-import com.azure.core.http.policy.HttpLogOptions;
import com.azure.core.util.Configuration;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobServiceAsyncClient;
-import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.SkuName;
-import com.azure.storage.common.StorageSharedKeyCredential;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.apache.samza.system.azureblob.AzureBlobConfig;
-import org.apache.samza.system.azureblob.compression.CompressionFactory;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -55,6 +44,9 @@
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemProducerException;
+import org.apache.samza.system.azureblob.AzureBlobClientBuilder;
+import org.apache.samza.system.azureblob.AzureBlobConfig;
+import org.apache.samza.system.azureblob.compression.CompressionFactory;
import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -196,11 +188,7 @@
LOG.warn("Attempting to start an already started producer.");
return;
}
-
- String accountName = config.getAzureAccountName(systemName);
- String accountKey = config.getAzureAccountKey(systemName);
-
- setupAzureContainer(accountName, accountKey);
+ setupAzureContainer();
LOG.info("Starting producer.");
isStarted = true;
@@ -354,47 +342,11 @@
}
@VisibleForTesting
- void setupAzureContainer(String accountName, String accountKey) {
+ void setupAzureContainer() {
try {
- // Use your Azure Blob Storage account's name and key to create a credential object to access your account.
- StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
-
- HttpClient httpClient;
- if (config.getUseProxy(systemName)) {
- LOG.info("HTTP Proxy setup for AzureBlob pipeline");
- httpClient = new NettyAsyncHttpClientBuilder()
- .proxy(new ProxyOptions(ProxyOptions.Type.HTTP,
- new InetSocketAddress(config.getAzureProxyHostname(systemName), config.getAzureProxyPort(systemName)))).build();
- } else {
- httpClient = HttpClient.createDefault();
- }
-
- // From the Azure portal, get your Storage account blob service AsyncClient endpoint.
- String endpoint = String.format(Locale.ROOT, AZURE_URL, accountName);
-
- HttpLogOptions httpLogOptions = new HttpLogOptions();
- httpLogOptions.setLogLevel(HttpLogDetailLevel.BASIC);
- BlobServiceAsyncClient storageClient =
- new BlobServiceClientBuilder()
- .httpLogOptions(httpLogOptions)
- .endpoint(endpoint)
- .credential(credential)
- .httpClient(httpClient)
- .buildAsyncClient();
-
-
- SkuName accountType = storageClient.getAccountInfo().block().getSkuName();
- long flushThresholdSize = config.getMaxFlushThresholdSize(systemName);
- boolean isPremiumAccount = SkuName.PREMIUM_LRS == accountType;
- if (isPremiumAccount && flushThresholdSize > PREMIUM_MAX_BLOCK_SIZE) { // 100 MB
- throw new SystemProducerException("Azure storage account with name: " + accountName
- + " is a premium account and can only handle upto " + PREMIUM_MAX_BLOCK_SIZE + " threshold size. Given flush threshold size is "
- + flushThresholdSize);
- } else if (!isPremiumAccount && flushThresholdSize > STANDARD_MAX_BLOCK_SIZE) { // STANDARD account
- throw new SystemProducerException("Azure storage account with name: " + accountName
- + " is a standard account and can only handle upto " + STANDARD_MAX_BLOCK_SIZE + " threshold size. Given flush threshold size is "
- + flushThresholdSize);
- }
+ BlobServiceAsyncClient storageClient = new AzureBlobClientBuilder(systemName, AZURE_URL, config)
+ .getBlobServiceAsyncClient();
+ validateFlushThresholdSizeSupported(storageClient);
containerAsyncClient = storageClient.getBlobContainerAsyncClient(systemName);
@@ -406,6 +358,22 @@
}
}
+ void validateFlushThresholdSizeSupported(BlobServiceAsyncClient storageClient) {
+ SkuName accountType = storageClient.getAccountInfo().block().getSkuName();
+ String accountName = storageClient.getAccountName();
+ long flushThresholdSize = config.getMaxFlushThresholdSize(systemName);
+ boolean isPremiumAccount = SkuName.PREMIUM_LRS == accountType;
+ if (isPremiumAccount && flushThresholdSize > PREMIUM_MAX_BLOCK_SIZE) { // 100 MB
+ throw new SystemProducerException("Azure storage account with name: " + accountName
+ + " is a premium account and can only handle upto " + PREMIUM_MAX_BLOCK_SIZE + " threshold size. Given flush threshold size is "
+ + flushThresholdSize);
+ } else if (!isPremiumAccount && flushThresholdSize > STANDARD_MAX_BLOCK_SIZE) { // STANDARD account
+ throw new SystemProducerException(
+ "Azure storage account with name: " + accountName + " is a standard account and can only handle upto " + STANDARD_MAX_BLOCK_SIZE + " threshold size. Given flush threshold size is "
+ + flushThresholdSize);
+ }
+ }
+
/**
* // find the writer in the writerMap else create one
* @param source for which to find/create the writer
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index b713ec7..4412edf 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -127,7 +127,7 @@
}
@Test
- public void testWrite() {
+ public void testWrite() throws InterruptedException {
byte[] b = new byte[THRESHOLD - 10];
azureBlobOutputStream.write(b, 0, THRESHOLD - 10);
verify(azureBlobOutputStream, never()).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
@@ -136,7 +136,7 @@
}
@Test
- public void testWriteLargerThanThreshold() {
+ public void testWriteLargerThanThreshold() throws InterruptedException {
byte[] largeRecord = RANDOM_STRING.substring(0, 2 * THRESHOLD).getBytes();
byte[] largeRecordFirstHalf = RANDOM_STRING.substring(0, THRESHOLD).getBytes();
byte[] largeRecordSecondHalf = RANDOM_STRING.substring(THRESHOLD, 2 * THRESHOLD).getBytes();
@@ -165,7 +165,7 @@
}
@Test
- public void testWriteLargeRecordWithSmallRecordInBuffer() {
+ public void testWriteLargeRecordWithSmallRecordInBuffer() throws InterruptedException {
byte[] halfBlock = new byte[THRESHOLD / 2];
byte[] fullBlock = new byte[THRESHOLD];
byte[] largeRecord = new byte[2 * THRESHOLD];
@@ -229,6 +229,36 @@
azureBlobOutputStream.close();
}
+ @Test(expected = AzureException.class)
+ public void testWriteFailedInterruptedException() throws InterruptedException {
+
+ doThrow(new InterruptedException("Lets interrupt the thread"))
+ .when(azureBlobOutputStream).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
+ byte[] b = new byte[100];
+ doReturn(COMPRESSED_BYTES).when(mockCompression).compress(b);
+
+ try {
+ azureBlobOutputStream.write(b, 0, THRESHOLD); // threshold crossed so stageBlock is scheduled.
+ // azureBlobOutputStream.close waits on the CompletableFuture which does the actual stageBlock in uploadBlockAsync
+ azureBlobOutputStream.close();
+ } catch (AzureException exception) {
+ // get root cause of the exception - to confirm its an InterruptedException
+ Throwable dupException = exception;
+ while (dupException.getCause() != null && dupException.getCause() != dupException) {
+ dupException = dupException.getCause();
+ }
+
+ Assert.assertTrue(dupException.getClass().getName().equals(InterruptedException.class.getCanonicalName()));
+ Assert.assertEquals("Lets interrupt the thread", dupException.getMessage());
+
+ // verify stageBlock was called exactly once - aka no retries happen when interrupted exception is thrown
+ verify(azureBlobOutputStream).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
+
+ // rethrow the exception so that the test will fail if no exception was thrown in the try block
+ throw exception;
+ }
+ }
+
@Test
public void testClose() {
azureBlobOutputStream.write(BYTES, 0, THRESHOLD);
@@ -278,7 +308,7 @@
}
@Test(expected = AzureException.class)
- public void testCloseFailed() {
+ public void testCloseFailed() throws InterruptedException {
azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
@@ -320,7 +350,7 @@
}
@Test (expected = AzureException.class)
- public void testFlushFailed() throws IOException {
+ public void testFlushFailed() throws IOException, InterruptedException {
azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
60000, THRESHOLD, mockByteArrayOutputStream, mockCompression));
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
index acf4cfb..a364cf6 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
@@ -98,14 +98,14 @@
// use mock writer impl
setupWriterForProducer(systemProducer, mockAzureWriter, STREAM);
// bypass Azure connection setup
- doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
+ doNothing().when(systemProducer).setupAzureContainer();
}
@Test
public void testStart() {
systemProducer.start();
- verify(systemProducer).setupAzureContainer(ACCOUNT_NAME, ACCOUNT_KEY);
+ verify(systemProducer).setupAzureContainer();
}
public void testMultipleStart() {
@@ -264,7 +264,7 @@
mockMetricsRegistry));
PowerMockito.whenNew(AzureBlobAvroWriter.class).withAnyArguments().thenThrow(new SystemProducerException("Failed"));
// bypass Azure connection setup
- doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
+ doNothing().when(systemProducer).setupAzureContainer();
systemProducer.register(SOURCE);
systemProducer.start();
@@ -315,7 +315,7 @@
AzureBlobSystemProducer systemProducer = spy(new AzureBlobSystemProducer(SYSTEM_NAME, azureBlobConfig, mockMetricsRegistry));
// bypass Azure connection setup
- doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
+ doNothing().when(systemProducer).setupAzureContainer();
doReturn(mockAzureWriter1).when(systemProducer).getOrCreateWriter(source1, ome1);
doReturn(mockAzureWriter2).when(systemProducer).getOrCreateWriter(source2, ome2);
@@ -359,7 +359,7 @@
AzureBlobSystemProducer systemProducer = spy(new AzureBlobSystemProducer(SYSTEM_NAME, azureBlobConfig, mockMetricsRegistry));
// bypass Azure connection setup
- doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
+ doNothing().when(systemProducer).setupAzureContainer();
doReturn(mockAzureWriter1).when(systemProducer).getOrCreateWriter(source1, ome1);
doReturn(mockAzureWriter2).when(systemProducer).getOrCreateWriter(source2, ome2);
@@ -411,7 +411,7 @@
AzureBlobSystemProducer systemProducer = spy(new AzureBlobSystemProducer(SYSTEM_NAME, azureBlobConfig, mockMetricsRegistry));
// bypass Azure connection setup
- doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
+ doNothing().when(systemProducer).setupAzureContainer();
systemProducer.register(source1);
systemProducer.start();
@@ -450,7 +450,7 @@
AzureBlobSystemProducer systemProducer = spy(new AzureBlobSystemProducer(SYSTEM_NAME, azureBlobConfig, mockMetricsRegistry));
// bypass Azure connection setup
- doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
+ doNothing().when(systemProducer).setupAzureContainer();
setupWriterForProducer(systemProducer, mockAzureWriter1, stream1);
@@ -492,7 +492,7 @@
AzureBlobSystemProducer systemProducer = spy(new AzureBlobSystemProducer(SYSTEM_NAME, azureBlobConfig, mockMetricsRegistry));
// bypass Azure connection setup
- doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
+ doNothing().when(systemProducer).setupAzureContainer();
setupWriterForProducer(systemProducer, mockAzureWriter1, STREAM);
diff --git a/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java b/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java
deleted file mode 100644
index a0b5d1e..0000000
--- a/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.classloader;
-
-public class DependencyIsolationUtils {
- /**
- * Directory inside the home directory of the cluster-based job coordinator in which the framework API artifacts are
- * placed, for usage in dependency isolation for the cluster-based job coordinator.
- * TODO make this configurable or taken from an environment variable
- */
- public static final String FRAMEWORK_API_DIRECTORY = "__samzaFrameworkApi";
-
- /**
- * Directory inside the home directory of the cluster-based job coordinator in which the framework infrastructure
- * artifacts are placed, for usage in dependency isolation for the cluster-based job coordinator.
- * TODO make this configurable or taken from an environment variable
- */
- public static final String FRAMEWORK_INFRASTRUCTURE_DIRECTORY = "__samzaFrameworkInfrastructure";
-
- /**
- * Directory inside the home directory of the cluster-based job coordinator in which the application artifacts are
- * placed, for usage in dependency isolation for the cluster-based job coordinator.
- * TODO make this configurable or taken from an environment variable
- */
- public static final String APPLICATION_DIRECTORY = "__package";
-
- /**
- * Name of the file which contains the class names (or globs) which should be loaded from the framework API
- * classloader.
- */
- public static final String FRAMEWORK_API_CLASS_LIST_FILE_NAME = "samza-framework-api-classes.txt";
-
- public static final String RUNTIME_FRAMEWORK_RESOURCES_PATHING_JAR_NAME = "runtime-framework-resources-pathing.jar";
-}
diff --git a/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java b/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java
deleted file mode 100644
index 344a034..0000000
--- a/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.classloader;
-
-import com.linkedin.cytodynamics.matcher.BootstrapClassPredicate;
-import com.linkedin.cytodynamics.matcher.GlobMatcher;
-import com.linkedin.cytodynamics.nucleus.DelegateRelationship;
-import com.linkedin.cytodynamics.nucleus.DelegateRelationshipBuilder;
-import com.linkedin.cytodynamics.nucleus.IsolationLevel;
-import com.linkedin.cytodynamics.nucleus.LoaderBuilder;
-import com.linkedin.cytodynamics.nucleus.OriginRestriction;
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.SamzaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Use this to build a classloader for running Samza which isolates the Samza framework code/dependencies from the
- * application code/dependencies.
- */
-public class IsolatingClassLoaderFactory {
- private static final Logger LOG = LoggerFactory.getLogger(IsolatingClassLoaderFactory.class);
-
- private static final String LIB_DIRECTORY = "lib";
-
- /**
- * Build a classloader which will isolate Samza framework code from application code. Samza framework classes and
- * application-specific classes will be loaded using a different classloaders. This will enable dependencies of each
- * category of classes to also be loaded separately, so that runtime dependency conflicts do not happen.
- * Each call to this method will build a different instance of a classloader.
- *
- * Samza framework API classes need to be specified in a file called
- * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} which is in the lib directory which is in the
- * API package. The file needs to be generated when building the framework API package. This class will not generate
- * the file.
- *
- * Implementation notes:
- *
- * The cytodynamics isolating classloader is used for this. It provides more control than the built-in
- * {@link URLClassLoader}. Cytodynamics provides the ability to compose multiple classloaders together and have more
- * granular delegation strategies between the classloaders.
- *
- * In order to share objects between classes loaded by different classloaders, the classes for the shared objects must
- * be loaded by a common classloader. Those common classes will be loaded through a common API classloader. The
- * cytodynamics classloader can be set up to only use the common API classloader for an explicit set of classes. The
- * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} file should include the framework API classes.
- * Also, bootstrap classes (e.g. java.lang.String) need to be loaded by a common classloader, since objects of those
- * types need to be shared across different framework and application. There are also some static bootstrap classes
- * which should be shared (e.g. java.lang.System). Bootstrap classes will be loaded through a common classloader by
- * default.
- *
- * These are the classloaders which are used to make up the final classloader.
- * <ul>
- * <li>bootstrap classloader: Built-in Java classes (e.g. java.lang.String)</li>
- * <li>API classloader: Common Samza framework API classes</li>
- * <li>infrastructure classloader: Core Samza framework classes and plugins that are included in the framework</li>
- * <li>
- * application classloader: Application code and plugins that are needed in the app but are not included in the
- * framework
- * </li>
- * </ul>
- *
- * This is the delegation structure for the classloaders:
- * <pre>
- * (bootstrap (API (application
- * classloader) <---- classloader) <------- classloader)
- * ^ ^
- * | /
- * | /
- * | /
- * | /
- * (infrastructure classloader)
- * </pre>
- * The cytodynamics classloader allows control over when the delegation should happen.
- * <ol>
- * <li>API classloader delegates to the bootstrap classloader if the bootstrap classloader has the class.</li>
- * <li>
- * Infrastructure classloader only delegates to the API classloader for the common classes specified by
- * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME}.
- * </li>
- * <li>
- * Infrastructure classloader delegates to the application classloader when a class can't be found in the
- * infrastructure classloader.
- * </li>
- * <li>
- * Application classloader only delegates to the API classloader for the common classes specified by
- * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME}.
- * </li>
- * </ol>
- */
- public ClassLoader buildClassLoader() {
- // start at the user.dir to find the resources for the classpaths
- File baseJobDirectory = new File(System.getProperty("user.dir"));
- File apiLibDirectory = libDirectory(new File(baseJobDirectory, DependencyIsolationUtils.FRAMEWORK_API_DIRECTORY));
- LOG.info("Using API lib directory: {}", apiLibDirectory);
- File infrastructureLibDirectory =
- libDirectory(new File(baseJobDirectory, DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY));
- LOG.info("Using infrastructure lib directory: {}", infrastructureLibDirectory);
- File applicationLibDirectory =
- libDirectory(new File(baseJobDirectory, DependencyIsolationUtils.APPLICATION_DIRECTORY));
- LOG.info("Using application lib directory: {}", applicationLibDirectory);
-
- ClassLoader apiClassLoader = buildApiClassLoader(apiLibDirectory);
- ClassLoader applicationClassLoader =
- buildApplicationClassLoader(applicationLibDirectory, apiLibDirectory, apiClassLoader);
-
- // the classloader to return is the one with the infrastructure classpath
- return buildInfrastructureClassLoader(infrastructureLibDirectory, baseJobDirectory, apiLibDirectory, apiClassLoader,
- applicationClassLoader);
- }
-
- /**
- * Build the {@link ClassLoader} which can load framework API classes.
- *
- * This sets up the link between the bootstrap classloader and the API classloader (see {@link #buildClassLoader()}.
- */
- private static ClassLoader buildApiClassLoader(File apiLibDirectory) {
- /*
- * This can just use the built-in classloading, which checks the parent classloader first and then checks its own
- * classpath. A null parent means bootstrap classloader, which contains core Java classes (e.g. java.lang.String).
- * This doesn't need to be isolated from the parent, because we only want to load all bootstrap classes from the
- * bootstrap classloader.
- */
- return new URLClassLoader(getClasspathAsURLs(apiLibDirectory), null);
- }
-
- /**
- * Build the {@link ClassLoader} which can load application classes.
- *
- * This sets up the link between the application classloader and the API classloader (see {@link #buildClassLoader()}.
- */
- private static ClassLoader buildApplicationClassLoader(File applicationLibDirectory, File apiLibDirectory,
- ClassLoader apiClassLoader) {
- return LoaderBuilder.anIsolatingLoader()
- // look in application lib directory for JARs
- .withClasspath(getClasspathAsURIs(applicationLibDirectory))
- // getClasspathAsURIs should only return JARs within applicationLibDirectory anyways, but doing it to be safe
- .withOriginRestriction(OriginRestriction.denyByDefault().allowingDirectory(applicationLibDirectory, false))
- // delegate to the api classloader for API classes
- .withParentRelationship(buildApiParentRelationship(apiLibDirectory, apiClassLoader))
- .build();
- }
-
- /**
- * Build the {@link ClassLoader} which can load Samza framework core classes. If a file with the name
- * {@link DependencyIsolationUtils#RUNTIME_FRAMEWORK_RESOURCES_PATHING_JAR_NAME} is found in {@code baseJobDirectory},
- * then it will be included in the classpath.
- * This may also fall back to loading application classes.
- *
- * This sets up two links: One link between the infrastructure classloader and the API and another link between the
- * infrastructure classloader and the application classloader (see {@link #buildClassLoader()}.
- */
- private static ClassLoader buildInfrastructureClassLoader(File infrastructureLibDirectory,
- File baseJobDirectory,
- File apiLibDirectory,
- ClassLoader apiClassLoader,
- ClassLoader applicationClassLoader) {
- // start with JARs in infrastructure lib directory
- List<URI> classpathURIs = new ArrayList<>(getClasspathAsURIs(infrastructureLibDirectory));
- OriginRestriction originRestriction = OriginRestriction.denyByDefault()
- // getClasspathAsURIs should only return JARs within infrastructureLibDirectory anyways, but doing it to be safe
- .allowingDirectory(infrastructureLibDirectory, false);
- File runtimeFrameworkResourcesPathingJar =
- new File(baseJobDirectory, DependencyIsolationUtils.RUNTIME_FRAMEWORK_RESOURCES_PATHING_JAR_NAME);
- if (canAccess(runtimeFrameworkResourcesPathingJar)) {
- // if there is a runtime framework resources pathing JAR, then include that in the classpath as well
- classpathURIs.add(runtimeFrameworkResourcesPathingJar.toURI());
- originRestriction.allowingGlobPattern(fileURL(runtimeFrameworkResourcesPathingJar).toExternalForm());
- LOG.info("Added {} to infrastructure classpath", runtimeFrameworkResourcesPathingJar.getPath());
- } else {
- LOG.info("Unable to access {}, so not adding to infrastructure classpath",
- runtimeFrameworkResourcesPathingJar.getPath());
- }
- return LoaderBuilder.anIsolatingLoader()
- .withClasspath(Collections.unmodifiableList(classpathURIs))
- .withOriginRestriction(originRestriction)
- .withParentRelationship(buildApiParentRelationship(apiLibDirectory, apiClassLoader))
- /*
- * Fall back to the application classloader for certain classes. For example, the application might implement
- * some pluggable classes (e.g. SystemFactory). Another example is message schemas that are supplied by the
- * application.
- */
- .addFallbackDelegate(DelegateRelationshipBuilder.builder()
- .withDelegateClassLoader(applicationClassLoader)
- /*
- * NONE means that a class will be loaded from here if it is not found in the classpath of the loader that uses
- * this relationship.
- */
- .withIsolationLevel(IsolationLevel.NONE)
- .build())
- .build();
- }
-
- /**
- * Build a {@link DelegateRelationship} which defines how to delegate to the API classloader.
- *
- * Delegation will only happen for classes specified in
- * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} and the Java bootstrap classes.
- */
- private static DelegateRelationship buildApiParentRelationship(File apiLibDirectory, ClassLoader apiClassLoader) {
- DelegateRelationshipBuilder apiParentRelationshipBuilder = DelegateRelationshipBuilder.builder()
- // needs to load API classes from the API classloader
- .withDelegateClassLoader(apiClassLoader)
- /*
- * FULL means to only load classes explicitly specified as "API" from the API classloader. We will use
- * delegate-preferred class predicates to specify which classes are "API" (see below).
- */
- .withIsolationLevel(IsolationLevel.FULL);
-
- // bootstrap classes need to be loaded from a common classloader
- apiParentRelationshipBuilder.addDelegatePreferredClassPredicate(new BootstrapClassPredicate());
- // the classes which are Samza framework API classes are added here
- getFrameworkApiClassGlobs(apiLibDirectory).forEach(
- apiClassName -> apiParentRelationshipBuilder.addDelegatePreferredClassPredicate(new GlobMatcher(apiClassName)));
- return apiParentRelationshipBuilder.build();
- }
-
- /**
- * Gets the globs for matching against classes to load from the framework API classloader. This will read the
- * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} file in {@code directoryWithClassList} to get
- * the globs.
- *
- * @param directoryWithClassList Directory in which
- * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} lives
- * @return {@link List} of globs for matching against classes to load from the framework API classloader
- */
- @VisibleForTesting
- static List<String> getFrameworkApiClassGlobs(File directoryWithClassList) {
- File parentPreferredFile =
- new File(directoryWithClassList, DependencyIsolationUtils.FRAMEWORK_API_CLASS_LIST_FILE_NAME);
- validateCanAccess(parentPreferredFile);
- try {
- return Files.readAllLines(Paths.get(parentPreferredFile.toURI()), StandardCharsets.UTF_8)
- .stream()
- .filter(StringUtils::isNotBlank)
- .collect(Collectors.toList());
- } catch (IOException e) {
- throw new SamzaException("Error while reading samza-api class list", e);
- }
- }
-
- /**
- * Get the {@link URL}s of all JARs/WARs in the directory {@code jarsLocation}. This only looks one level down; it is
- * not recursive.
- */
- @VisibleForTesting
- static URL[] getClasspathAsURLs(File jarsLocation) {
- validateCanAccess(jarsLocation);
- File[] filesInJarsLocation = jarsLocation.listFiles();
- if (filesInJarsLocation == null) {
- throw new SamzaException(
- String.format("Could not find any files inside %s, probably because it is not a directory",
- jarsLocation.getPath()));
- }
- URL[] urls = Stream.of(filesInJarsLocation)
- .filter(file -> file.getName().endsWith(".jar") || file.getName().endsWith(".war"))
- .map(IsolatingClassLoaderFactory::fileURL)
- .toArray(URL[]::new);
- LOG.info("Found {} items to load into classpath from {}", urls.length, jarsLocation);
- Stream.of(urls).forEach(url -> LOG.debug("Found {} from {}", url, jarsLocation));
- return urls;
- }
-
- /**
- * Get the {@link URI}s of all JARs/WARs in the directory {@code jarsLocation}. This only looks one level down; it is
- * not recursive.
- */
- @VisibleForTesting
- static List<URI> getClasspathAsURIs(File jarsLocation) {
- return Stream.of(getClasspathAsURLs(jarsLocation))
- .map(IsolatingClassLoaderFactory::urlToURI)
- .collect(Collectors.toList());
- }
-
- private static boolean canAccess(File file) {
- return file.exists() && file.canRead();
- }
-
- /**
- * Makes sure that a file exists and can be read.
- */
- private static void validateCanAccess(File file) {
- if (!canAccess(file)) {
- throw new SamzaException("Unable to access file: " + file);
- }
- }
-
- /**
- * Get the {@link URL} for a {@link File}.
- * Converts checked exceptions into {@link SamzaException}s.
- */
- private static URL fileURL(File file) {
- URI uri = file.toURI();
- try {
- return uri.toURL();
- } catch (MalformedURLException e) {
- throw new SamzaException("Unable to get URL for file: " + file, e);
- }
- }
-
- /**
- * Get the {@link URI} for a {@link URL}.
- * Converts checked exceptions into {@link SamzaException}s.
- */
- private static URI urlToURI(URL url) {
- try {
- return url.toURI();
- } catch (URISyntaxException e) {
- throw new SamzaException("Unable to get URI for URL: " + url, e);
- }
- }
-
- /**
- * Get the {@link File} representing the {@link #LIB_DIRECTORY} inside the given {@code file}.
- */
- private static File libDirectory(File file) {
- return new File(file, LIB_DIRECTORY);
- }
-}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
index a152032..523742c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
@@ -26,7 +26,6 @@
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
-import org.apache.samza.classloader.IsolatingClassLoaderFactory;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
@@ -35,7 +34,6 @@
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.util.CoordinatorStreamUtil;
-import org.apache.samza.util.SplitDeploymentUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,19 +50,12 @@
LOG.error("Uncaught exception in ClusterBasedJobCoordinator::main. Exiting job coordinator", exception);
System.exit(1);
});
- if (!SplitDeploymentUtil.isSplitDeploymentEnabled()) {
- // no isolation enabled, so can just execute runClusterBasedJobCoordinator directly
- runClusterBasedJobCoordinator(args);
- } else {
- SplitDeploymentUtil.runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(),
- ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", args);
- }
+ runClusterBasedJobCoordinator(args);
System.exit(0);
}
/**
- * This is the actual execution for the {@link ClusterBasedJobCoordinator}. This is separated out from
- * {@link #main(String[])} so that it can be executed directly or from a separate classloader.
+ * This is the actual execution for the {@link ClusterBasedJobCoordinator}.
*/
@VisibleForTesting
static void runClusterBasedJobCoordinator(String[] args) {
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SingleFaultDomainManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SingleFaultDomainManager.java
new file mode 100644
index 0000000..34a5aad
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SingleFaultDomainManager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import java.util.Collections;
+import java.util.Set;
+
+
+/**
+ * Simple placeholder implementation of {@link FaultDomainManager} which contains a single fault domain for all hosts.
+ * This can be used when another concrete {@link FaultDomainManager} is undesirable or unavailable, but features which
+ * depend on a {@link FaultDomainManager} (such as standby containers) may have unexpected behavior.
+ */
+public class SingleFaultDomainManager implements FaultDomainManager {
+ private static final FaultDomain SINGLE_FAULT_DOMAIN = new FaultDomain(FaultDomainType.RACK, "0");
+
+ @Override
+ public Set<FaultDomain> getAllFaultDomains() {
+ return Collections.singleton(SINGLE_FAULT_DOMAIN);
+ }
+
+ @Override
+ public Set<FaultDomain> getFaultDomainsForHost(String host) {
+ return Collections.singleton(SINGLE_FAULT_DOMAIN);
+ }
+
+ @Override
+ public boolean hasSameFaultDomains(String host1, String host2) {
+ return true;
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SingleFaultDomainManagerFactory.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SingleFaultDomainManagerFactory.java
new file mode 100644
index 0000000..db96b15
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SingleFaultDomainManagerFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * Produces a simple placeholder {@link FaultDomainManager}. See {@link SingleFaultDomainManager}.
+ */
+public class SingleFaultDomainManagerFactory implements FaultDomainManagerFactory {
+ @Override
+ public FaultDomainManager getFaultDomainManager(Config config, MetricsRegistry metricsRegistry) {
+ return new SingleFaultDomainManager();
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index 3f27991..acb5a0a 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -137,6 +137,20 @@
private static final String AM_JMX_ENABLED = "yarn.am.jmx.enabled";
private static final String CLUSTER_MANAGER_JMX_ENABLED = "cluster-manager.jobcoordinator.jmx.enabled";
+ /**
+ * Use this to configure a static port for the job coordinator url for a Samza job. This url is used to provide
+ * information such as job model and locality.
+ * If the value is set to 0, then the port will be dynamically allocated from the available free ports on the node.
+ * The default value of this config is 0.
+ *
+ * Be careful when using this configuration. If the configured port is already in use on the node, then the job
+ * coordinator will fail to start.
+ *
+ * This configuration is experimental, and it might be removed in a future release.
+ */
+ private static final String JOB_COORDINATOR_URL_PORT = "cluster-manager.jobcoordinator.url.port";
+ private static final int DEFAULT_JOB_COORDINATOR_URL_PORT = 0;
+
public ClusterManagerConfig(Config config) {
super(config);
}
@@ -280,4 +294,8 @@
return true;
}
}
+
+ public int getCoordinatorUrlPort() {
+ return getInt(JOB_COORDINATOR_URL_PORT, DEFAULT_JOB_COORDINATOR_URL_PORT);
+ }
}
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 12257e0..4822067 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -155,8 +155,6 @@
public static final String COORDINATOR_STREAM_FACTORY = "job.coordinatorstream.config.factory";
public static final String DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY = "org.apache.samza.util.DefaultCoordinatorStreamConfigFactory";
- public static final String JOB_SPLIT_DEPLOYMENT_ENABLED = "job.split.deployment.enabled";
-
private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";
// Enable ClusterBasedJobCoordinator aka ApplicationMaster High Availability (AM-HA).
@@ -414,10 +412,6 @@
return getStandbyTaskReplicationFactor() > 1;
}
- public boolean isSplitDeploymentEnabled() {
- return getBoolean(JOB_SPLIT_DEPLOYMENT_ENABLED, false);
- }
-
/**
* The metadata file is written in a {@code exec-env-container-id}.metadata file in the log-dir of the container.
* Here the {@code exec-env-container-id} refers to the ID assigned by the cluster manager (e.g., YARN) to the container,
diff --git a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
index 73093a8..73bcf8e 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
@@ -57,29 +57,6 @@
*/
public static final String ENV_EXECUTION_ENV_CONTAINER_ID = "EXECUTION_ENV_CONTAINER_ID";
- /**
- * Set to "true" if split deployment feature is enabled. Otherwise, will be considered false.
- *
- * The launch process for the cluster-based job coordinator and job container depends on the value of this, since it
- * needs to be known if the cluster-based job coordinator and job container should be launched in a split deployment
- * mode.
- * This needs to be an environment variable, because the value needs to be known before the full configs can be read
- * from the metadata store (full configs are only read after launch is complete).
- */
- public static final String ENV_SPLIT_DEPLOYMENT_ENABLED = "ENV_SPLIT_DEPLOYMENT_ENABLED";
-
- /**
- * When running the cluster-based job coordinator and job container in a split deployment mode, it uses JARs and
- * resources from a lib directory which is provided by the framework. In some cases, it is necessary to use some
- * resources specified by the application as well. This environment variable can be set to a directory which is
- * different from the framework lib directory in order to tell Samza where application resources live.
- * This is an environment variable because it is needed in order to launch the cluster-based job coordinator and job
- * container Java processes, which means access to full configs is not available yet.
- * For example, this is used to set a system property for the location of an application-specified log4j configuration
- * file when launching the cluster-based job coordinator and job container Java processes.
- */
- public static final String ENV_APPLICATION_LIB_DIR = "APPLICATION_LIB_DIR";
-
/*
* The base directory for storing logged data stores used in Samza. This has to be set on all machine running Samza
* containers. For example, when using YARN, it has to be set in all NMs and passed to the containers.
diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index 4367244..b170bac 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -262,17 +262,6 @@
}
/**
- * Helper method to check if a system has a changelog attached to it.
- */
- public boolean isChangelogSystem(String systemName) {
- return getStoreNames().stream()
- .map(this::getChangelogStream)
- .filter(Optional::isPresent)
- .map(systemStreamName -> StreamUtil.getSystemStreamFromNames(systemStreamName.get()).getSystem())
- .anyMatch(system -> system.equals(systemName));
- }
-
- /**
* Helper method to check if there is any stores configured w/ a changelog
*/
public boolean hasDurableStores() {
diff --git a/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java b/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
index 3d3a53d..804ef93 100644
--- a/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
+++ b/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
@@ -21,12 +21,24 @@
import org.apache.samza.job.model.JobModel;
import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStreamPartition;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class is used for passing objects around for the implementation of the high-level API.
+ * 1) Container for objects that need to be passed between different components in the implementation of the high-level
+ * API.
+ * 2) The implementation of the high-level API is built on top of the low-level API. The low-level API only exposes
+ * {@link TaskContext}, but the implementation of the high-level API needs some other internal Samza components (e.g.
+ * {@link StreamMetadataCache}. We internally make these components available through {@link TaskContextImpl} so that we
+ * can do a cast to access the components. This class hides some of the messiness of casting. It's still not ideal to
+ * need to do any casting, even in this class.
+ */
public class InternalTaskContext {
-
private final Context context;
private final Map<String, Object> objectRegistry = new HashMap<>();
@@ -46,6 +58,10 @@
return context;
}
+ /**
+ * TODO: The public {@link JobContext} exposes {@link JobModel} now, so can this internal method be replaced by the
+ * public API?
+ */
public JobModel getJobModel() {
return ((TaskContextImpl) this.context.getTaskContext()).getJobModel();
}
@@ -53,4 +69,11 @@
public StreamMetadataCache getStreamMetadataCache() {
return ((TaskContextImpl) this.context.getTaskContext()).getStreamMetadataCache();
}
+
+ /**
+ * See {@link TaskContextImpl#getSspsExcludingSideInputs()}.
+ */
+ public Set<SystemStreamPartition> getSspsExcludingSideInputs() {
+ return ((TaskContextImpl) this.context.getTaskContext()).getSspsExcludingSideInputs();
+ }
}
diff --git a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
index edec17d..d87a5bc 100644
--- a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
@@ -29,9 +29,15 @@
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.TableManager;
+import java.util.Set;
import java.util.function.Function;
+/**
+ * This class provides the implementation for the public {@link TaskContext} interface.
+ * It also allows us to pass certain internal Samza components around so that the implementation of the high-level API
+ * can use them (see InternalTaskContext for some more details).
+ */
public class TaskContextImpl implements TaskContext {
private final TaskModel taskModel;
private final MetricsRegistry taskMetricsRegistry;
@@ -39,8 +45,13 @@
private final TableManager tableManager;
private final CallbackScheduler callbackScheduler;
private final OffsetManager offsetManager;
+
+ // The instance variables below are not used for implementing any public API methods. They are here so that we can
+ // pass some internal components over to the implementation of the high-level API. See InternalTaskContext.
+
private final JobModel jobModel;
private final StreamMetadataCache streamMetadataCache;
+ private final Set<SystemStreamPartition> sspsExcludingSideInputs;
public TaskContextImpl(TaskModel taskModel,
MetricsRegistry taskMetricsRegistry,
@@ -49,7 +60,8 @@
CallbackScheduler callbackScheduler,
OffsetManager offsetManager,
JobModel jobModel,
- StreamMetadataCache streamMetadataCache) {
+ StreamMetadataCache streamMetadataCache,
+ Set<SystemStreamPartition> sspsExcludingSideInputs) {
this.taskModel = taskModel;
this.taskMetricsRegistry = taskMetricsRegistry;
this.keyValueStoreProvider = keyValueStoreProvider;
@@ -58,6 +70,7 @@
this.offsetManager = offsetManager;
this.jobModel = jobModel;
this.streamMetadataCache = streamMetadataCache;
+ this.sspsExcludingSideInputs = sspsExcludingSideInputs;
}
@Override
@@ -101,4 +114,14 @@
public StreamMetadataCache getStreamMetadataCache() {
return this.streamMetadataCache;
}
-}
+
+ /**
+ * Returns the {@link SystemStreamPartition}s excluding the side-input SSPs. For the high-level API, watermarks and
+ * end-of-stream messages are propagated based on their input SSPs. However, the Samza framework does not give side
+ * input messages to the high-level operator tasks. Therefore, the operators need to know the input SSPs excluding the
+ * side input SSPs. See SAMZA-2303 for more details.
+ */
+ public Set<SystemStreamPartition> getSspsExcludingSideInputs() {
+ return this.sspsExcludingSideInputs;
+ }
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
index 860c596..7e40382 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
@@ -36,7 +36,6 @@
import org.apache.samza.job.JobCoordinatorMetadata;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
@@ -284,44 +283,44 @@
private static final String METADATA_WRITE_FAILED_COUNT = "metadata-write-failed-count";
private static final String NEW_DEPLOYMENT = "new-deployment";
- private final Counter applicationAttemptCount;
- private final Counter metadataGenerationFailedCount;
- private final Counter metadataReadFailedCount;
- private final Counter metadataWriteFailedCount;
+ private final Gauge<Integer> applicationAttemptCount;
+ private final Gauge<Integer> metadataGenerationFailedCount;
+ private final Gauge<Integer> metadataReadFailedCount;
+ private final Gauge<Integer> metadataWriteFailedCount;
private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
private final Gauge<Integer> configChangedAcrossApplicationAttempt;
private final Gauge<Integer> newDeployment;
public JobCoordinatorMetadataManagerMetrics(MetricsRegistry registry) {
- applicationAttemptCount = registry.newCounter(GROUP, APPLICATION_ATTEMPT_COUNT);
+ applicationAttemptCount = registry.newGauge(GROUP, APPLICATION_ATTEMPT_COUNT, 0);
configChangedAcrossApplicationAttempt =
registry.newGauge(GROUP, CONFIG_CHANGED, 0);
jobModelChangedAcrossApplicationAttempt =
registry.newGauge(GROUP, JOB_MODEL_CHANGED, 0);
- metadataGenerationFailedCount = registry.newCounter(GROUP,
- METADATA_GENERATION_FAILED_COUNT);
- metadataReadFailedCount = registry.newCounter(GROUP, METADATA_READ_FAILED_COUNT);
- metadataWriteFailedCount = registry.newCounter(GROUP, METADATA_WRITE_FAILED_COUNT);
+ metadataGenerationFailedCount = registry.newGauge(GROUP,
+ METADATA_GENERATION_FAILED_COUNT, 0);
+ metadataReadFailedCount = registry.newGauge(GROUP, METADATA_READ_FAILED_COUNT, 0);
+ metadataWriteFailedCount = registry.newGauge(GROUP, METADATA_WRITE_FAILED_COUNT, 0);
newDeployment = registry.newGauge(GROUP, NEW_DEPLOYMENT, 0);
}
@VisibleForTesting
- Counter getApplicationAttemptCount() {
+ Gauge<Integer> getApplicationAttemptCount() {
return applicationAttemptCount;
}
@VisibleForTesting
- Counter getMetadataGenerationFailedCount() {
+ Gauge<Integer> getMetadataGenerationFailedCount() {
return metadataGenerationFailedCount;
}
@VisibleForTesting
- Counter getMetadataReadFailedCount() {
+ Gauge<Integer> getMetadataReadFailedCount() {
return metadataReadFailedCount;
}
@VisibleForTesting
- Counter getMetadataWriteFailedCount() {
+ Gauge<Integer> getMetadataWriteFailedCount() {
return metadataWriteFailedCount;
}
@@ -341,19 +340,19 @@
}
void incrementApplicationAttemptCount() {
- applicationAttemptCount.inc();
+ applicationAttemptCount.set(applicationAttemptCount.getValue() + 1);
}
void incrementMetadataGenerationFailedCount() {
- metadataGenerationFailedCount.inc();
+ metadataGenerationFailedCount.set(metadataGenerationFailedCount.getValue() + 1);
}
void incrementMetadataReadFailedCount() {
- metadataReadFailedCount.inc();
+ metadataReadFailedCount.set(metadataReadFailedCount.getValue() + 1);
}
void incrementMetadataWriteFailedCount() {
- metadataWriteFailedCount.inc();
+ metadataWriteFailedCount.set(metadataWriteFailedCount.getValue() + 1);
}
void setConfigChangedAcrossApplicationAttempt(int value) {
diff --git a/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java b/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java
new file mode 100644
index 0000000..3725344
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job;
+
+import java.io.File;
+import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.ShellCommandConfig;
+
+
+public class ShellCommandBuilder extends CommandBuilder {
+ @Override
+ public String buildCommand() {
+ ShellCommandConfig shellCommandConfig = new ShellCommandConfig(config);
+ if (StringUtils.isEmpty(this.commandPath)) {
+ return shellCommandConfig.getCommand();
+ } else {
+ return this.commandPath + File.separator + shellCommandConfig.getCommand();
+ }
+ }
+
+ @Override
+ public Map<String, String> buildEnvironment() {
+ ShellCommandConfig shellCommandConfig = new ShellCommandConfig(config);
+ ImmutableMap.Builder<String, String> envBuilder = new ImmutableMap.Builder<>();
+ envBuilder.put(ShellCommandConfig.ENV_CONTAINER_ID, this.id);
+ envBuilder.put(ShellCommandConfig.ENV_COORDINATOR_URL, this.url.toString());
+ envBuilder.put(ShellCommandConfig.ENV_JAVA_OPTS, shellCommandConfig.getTaskOpts().orElse(""));
+ envBuilder.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR,
+ shellCommandConfig.getAdditionalClasspathDir().orElse(""));
+ shellCommandConfig.getJavaHome().ifPresent(javaHome -> envBuilder.put(ShellCommandConfig.ENV_JAVA_HOME, javaHome));
+ return envBuilder.build();
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporter.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporter.java
new file mode 100644
index 0000000..5961d7a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.Metric;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.MetricsVisitor;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of {@link MetricsReporter} which logs metrics which match a regex.
+ * The regex is checked against "[source name]-[group name]-[metric name]".
+ */
+public class LoggingMetricsReporter implements MetricsReporter {
+ private static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsReporter.class);
+ /**
+ * First part is source, second part is group name, third part is metric name
+ */
+ private static final String FULL_METRIC_FORMAT = "%s-%s-%s";
+
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final Pattern metricsToLog;
+ private final long loggingIntervalSeconds;
+ private final Queue<Runnable> loggingTasks = new ConcurrentLinkedQueue<>();
+
+ /**
+ * @param scheduledExecutorService executes the logging tasks
+ * @param metricsToLog Only log the metrics which match this regex. The strings for matching against this metric are
+ * constructed by concatenating source name, group name, and metric name, delimited by dashes.
+ * @param loggingIntervalSeconds interval at which to log metrics
+ */
+ public LoggingMetricsReporter(ScheduledExecutorService scheduledExecutorService, Pattern metricsToLog,
+ long loggingIntervalSeconds) {
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.metricsToLog = metricsToLog;
+ this.loggingIntervalSeconds = loggingIntervalSeconds;
+ }
+
+ @Override
+ public void start() {
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> this.loggingTasks.forEach(Runnable::run),
+ this.loggingIntervalSeconds, this.loggingIntervalSeconds, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void register(String source, ReadableMetricsRegistry registry) {
+ this.loggingTasks.add(buildLoggingTask(source, registry));
+ }
+
+ @Override
+ public void stop() {
+ this.scheduledExecutorService.shutdown();
+ try {
+ this.scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while shutting down executor", e);
+ }
+ if (!this.scheduledExecutorService.isTerminated()) {
+ LOG.warn("Unable to shutdown executor");
+ }
+ }
+
+ /**
+ * VisibleForTesting so that the logging call can be verified in unit tests.
+ */
+ @VisibleForTesting
+ void doLog(String logString) {
+ LOG.info(logString);
+ }
+
+ private Runnable buildLoggingTask(String source, ReadableMetricsRegistry registry) {
+ return () -> {
+ for (String group : registry.getGroups()) {
+ for (Map.Entry<String, Metric> metricGroupEntry : registry.getGroup(group).entrySet()) {
+ metricGroupEntry.getValue().visit(new MetricsVisitor() {
+ @Override
+ public void counter(Counter counter) {
+ logMetric(source, group, counter.getName(), counter.getCount());
+ }
+
+ @Override
+ public <T> void gauge(Gauge<T> gauge) {
+ logMetric(source, group, gauge.getName(), gauge.getValue());
+ }
+
+ @Override
+ public void timer(Timer timer) {
+ logMetric(source, group, timer.getName(), timer.getSnapshot().getAverage());
+ }
+ });
+ }
+ }
+ };
+ }
+
+ private <T> void logMetric(String source, String group, String metricName, T value) {
+ String fullMetricName = String.format(FULL_METRIC_FORMAT, source, group, metricName);
+ if (this.metricsToLog.matcher(fullMetricName).matches()) {
+ doLog(String.format("Metric: %s, Value: %s", fullMetricName, value));
+ }
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterConfig.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterConfig.java
new file mode 100644
index 0000000..e7a256f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.Optional;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+
+
+public class LoggingMetricsReporterConfig extends MapConfig {
+ private static final String METRICS_TO_LOG_REGEX_CONFIG = "metrics.reporter.%s.log.regex";
+ private static final String LOGGING_INTERVAL_SECONDS_CONFIG = "metrics.reporter.%s.logging.interval.seconds";
+ private static final long LOGGING_INTERVAL_SECONDS_DEFAULT = 60;
+
+ public LoggingMetricsReporterConfig(Config config) {
+ super(config);
+ }
+
+ public String getMetricsToLogRegex(String reporterName) {
+ String metricsToLogConfigKey = String.format(METRICS_TO_LOG_REGEX_CONFIG, reporterName);
+ return Optional.ofNullable(get(metricsToLogConfigKey))
+ .orElseThrow(() -> new ConfigException("Missing value for " + metricsToLogConfigKey));
+ }
+
+ public long getLoggingIntervalSeconds(String reporterName) {
+ return getLong(String.format(LOGGING_INTERVAL_SECONDS_CONFIG, reporterName), LOGGING_INTERVAL_SECONDS_DEFAULT);
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterFactory.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterFactory.java
new file mode 100644
index 0000000..88d3892
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.regex.Pattern;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.MetricsReporterFactory;
+
+
+/**
+ * Creates a {@link MetricsReporter} which logs metrics and their values.
+ * This can be used to access metric values when no other external metrics system is available.
+ */
+public class LoggingMetricsReporterFactory implements MetricsReporterFactory {
+ @Override
+ public MetricsReporter getMetricsReporter(String name, String processorId, Config config) {
+ ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("Samza LoggingMetricsReporter Thread-%d").setDaemon(true).build());
+ LoggingMetricsReporterConfig loggingMetricsReporterConfig = new LoggingMetricsReporterConfig(config);
+ Pattern metricsToLog = Pattern.compile(loggingMetricsReporterConfig.getMetricsToLogRegex(name));
+ return new LoggingMetricsReporter(scheduledExecutorService, metricsToLog,
+ loggingMetricsReporterConfig.getLoggingIntervalSeconds(name));
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 705f0cb..19cec80 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -111,17 +111,13 @@
LOG.info("{} has {} producer tasks.", stream, count);
});
- // set states for end-of-stream
+ // set states for end-of-stream; don't include side inputs (see SAMZA-2303)
internalTaskContext.registerObject(EndOfStreamStates.class.getName(),
- new EndOfStreamStates(
- internalTaskContext.getContext().getTaskContext().getTaskModel().getSystemStreamPartitions(),
- producerTaskCounts));
- // set states for watermark
+ new EndOfStreamStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts));
+ // set states for watermark; don't include side inputs (see SAMZA-2303)
internalTaskContext.registerObject(WatermarkStates.class.getName(),
- new WatermarkStates(
- internalTaskContext.getContext().getTaskContext().getTaskModel().getSystemStreamPartitions(),
- producerTaskCounts,
- context.getContainerContext().getContainerMetricsRegistry()));
+ new WatermarkStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts,
+ context.getContainerContext().getContainerMetricsRegistry()));
specGraph.getInputOperators().forEach((streamId, inputOpSpec) -> {
SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index bc4b227..87429ed 100644
--- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -68,7 +68,7 @@
List<IncomingMessageEnvelope> messages = bufferedMessages.get(ssp);
String offset = String.valueOf(messages.size());
- if (message instanceof EndOfStreamMessage) {
+ if (shouldUseEndOfStreamOffset(message)) {
offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET;
}
@@ -224,4 +224,22 @@
return ImmutableList.copyOf(messageEnvelopesForSSP.subList(startingOffset, messageEnvelopesForSSP.size()));
}
+
+ /**
+ * We don't always want to use {@link IncomingMessageEnvelope#END_OF_STREAM_OFFSET} for all
+ * {@link EndOfStreamMessage}s. Certain control message flows (e.g. end-of-stream) have an aggregation partition,
+ * which needs to listen for messages from all other partitions. These aggregation messages are marked by the task
+ * name being non-null. If we use {@link IncomingMessageEnvelope#END_OF_STREAM_OFFSET} for the aggregation messages,
+ * then the aggregation partition would stop listening once it got the message from one of the tasks, but that means
+ * it would miss the aggregation messages from all other tasks. See SAMZA-2300 for more details.
+ * One other note: If there is a serializer set for the stream, then by the time the message gets to this check, it
+ * will be a byte array, so this check will not return true, even if the deserialized message was an
+ * {@link EndOfStreamMessage}. So far this isn't a problem, because we only really need this to return true for
+ * input streams (not intermediate streams), and in-memory input stream data doesn't get serialized. For intermediate
+ * streams, we don't need END_OF_STREAM_OFFSET to be used since the high-level operators take care of end-of-stream
+ * messages based on MessageType.
+ */
+ private static boolean shouldUseEndOfStreamOffset(Object message) {
+ return (message instanceof EndOfStreamMessage) && ((EndOfStreamMessage) message).getTaskName() == null;
+ }
}
diff --git a/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java b/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java
deleted file mode 100644
index 200cd3c..0000000
--- a/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.ShellCommandConfig;
-
-
-public final class SplitDeploymentUtil {
-
- /**
- * The split deployment feature uses system env {@code ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED} to represent
- * if the user chooses to enable it.
- * This function helps to detect if the split deployment feature is enabled.
- *
- * @return true if split deployment is enabled; vice versa
- */
- public static boolean isSplitDeploymentEnabled() {
- return Boolean.parseBoolean(System.getenv(ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED));
- }
-
- /**
- * Execute the runner class using a separate isolated classloader.
- * @param classLoader {@link ClassLoader} to use to load the runner class which will run
- * @param originalRunnerClass {@link Class} for which will be executed with the new class loader.
- * @param runMethodName run method name of runner class
- * @param runMethodArgs arguments to pass to run method
- */
- public static void runWithClassLoader(ClassLoader classLoader, Class<?> originalRunnerClass, String runMethodName,
- String[] runMethodArgs) {
- // need to use the isolated classloader to load run method and then execute using that new class
- Class<?> runnerClass;
- try {
- runnerClass = classLoader.loadClass(originalRunnerClass.getName());
- } catch (ClassNotFoundException e) {
- throw new SamzaException(String.format(
- "Isolation was enabled, but unable to find %s in isolated classloader", originalRunnerClass.getName()), e);
- }
-
- // save the current context classloader so it can be reset after finishing the call to run method
- ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
- // this is needed because certain libraries (e.g. log4j) use the context classloader
- Thread.currentThread().setContextClassLoader(classLoader);
-
- try {
- executeRunForRunnerClass(runnerClass, runMethodName, runMethodArgs);
- } finally {
- // reset the context class loader; it's good practice, and could be important when running a test suite
- Thread.currentThread().setContextClassLoader(previousContextClassLoader);
- }
- }
-
- private static void executeRunForRunnerClass(Class<?> runnerClass, String runMethodName, String[] runMethodArgs) {
- Method runMethod;
- try {
- runMethod = runnerClass.getDeclaredMethod(runMethodName, String[].class);
- } catch (NoSuchMethodException e) {
- throw new SamzaException(String.format("Isolation was enabled, but unable to find %s method", runMethodName), e);
- }
- // only sets accessible flag for this method instance
- runMethod.setAccessible(true);
-
- try {
- // wrapping args in object array so that args is passed as a single argument to the method
- runMethod.invoke(null, new Object[]{runMethodArgs});
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new SamzaException(String.format("Exception while executing %s method", runMethodName), e);
- }
- }
-}
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index f91aae1..4e1c41a 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -88,7 +88,7 @@
}
})
private val taskContext = new TaskContextImpl(taskModel, metrics.registry, kvStoreSupplier, tableManager,
- new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache)
+ new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache, systemStreamPartitions)
// need separate field for this instead of using it through Context, since Context throws an exception if it is null
private val applicationTaskContextOption = applicationTaskContextFactoryOption
.map(_.create(externalContextOption.orNull, jobContext, containerContext, taskContext,
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 7c0e747..5dd662b 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -100,7 +100,8 @@
updateTaskAssignments(jobModel, taskAssignmentManager, taskPartitionAssignmentManager, grouperMetadata)
- val server = new HttpServer
+ val clusterManagerConfig = new ClusterManagerConfig(config)
+ val server = new HttpServer(port = clusterManagerConfig.getCoordinatorUrlPort)
server.addServlet("/", new JobServlet(serializedJobModelRef))
server.addServlet("/locality", new LocalityServlet(localityManager))
diff --git a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
deleted file mode 100644
index 9b95648..0000000
--- a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job
-
-
-import java.io.File
-
-import org.apache.samza.config.ShellCommandConfig
-import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
-
-import scala.collection.JavaConverters._
-
-class ShellCommandBuilder extends CommandBuilder {
- def buildCommand() = {
- val shellCommandConfig = new ShellCommandConfig(config)
- if(commandPath == null || commandPath.isEmpty())
- shellCommandConfig.getCommand
- else
- commandPath + File.separator + shellCommandConfig.getCommand
- }
-
- def buildEnvironment(): java.util.Map[String, String] = {
- val shellCommandConfig = new ShellCommandConfig(config)
- val envMap = Map(
- ShellCommandConfig.ENV_CONTAINER_ID -> id.toString,
- ShellCommandConfig.ENV_COORDINATOR_URL -> url.toString,
- ShellCommandConfig.ENV_JAVA_OPTS -> shellCommandConfig.getTaskOpts.orElse(""),
- ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR -> shellCommandConfig.getAdditionalClasspathDir.orElse(""))
-
- val envMapWithJavaHome = JavaOptionals.toRichOptional(shellCommandConfig.getJavaHome).toOption match {
- case Some(javaHome) => envMap + (ShellCommandConfig.ENV_JAVA_HOME -> javaHome)
- case None => envMap
- }
-
- envMapWithJavaHome.asJava
- }
-}
diff --git a/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java b/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java
deleted file mode 100644
index 7444fbf..0000000
--- a/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.classloader;
-
-import java.io.File;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Set;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.samza.SamzaException;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-public class TestIsolatingClassLoaderFactory {
- @Test
- public void testGetApiClasses() throws URISyntaxException {
- File apiClassListFile = Paths.get(getClass().getResource("/classloader").toURI()).toFile();
- List<String> apiClassNames = IsolatingClassLoaderFactory.getFrameworkApiClassGlobs(apiClassListFile);
- List<String> expected = ImmutableList.of(
- "org.apache.samza.JavaClass",
- "org.apache.samza.JavaClass$InnerJavaClass",
- "org.apache.samza.ScalaClass$",
- "org.apache.samza.ScalaClass$$anon$1",
- "my.package.with.wildcard.*",
- "my.package.with.question.mark?");
- assertEquals(expected, apiClassNames);
- }
-
- @Test(expected = SamzaException.class)
- public void testGetApiClassesFileDoesNotExist() throws URISyntaxException {
- File nonExistentDirectory =
- new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
- IsolatingClassLoaderFactory.getFrameworkApiClassGlobs(nonExistentDirectory);
- }
-
- @Test
- public void testGetClasspathAsURLs() throws URISyntaxException {
- File classpathDirectory = Paths.get(getClass().getResource("/classloader/classpath").toURI()).toFile();
- URL[] classpath = IsolatingClassLoaderFactory.getClasspathAsURLs(classpathDirectory);
- assertEquals(2, classpath.length);
- Set<URL> classpathSet = ImmutableSet.copyOf(classpath);
- URL jarUrl = getClass().getResource("/classloader/classpath/placeholder-jar.jar");
- assertTrue(classpathSet.contains(jarUrl));
- URL warUrl = getClass().getResource("/classloader/classpath/placeholder-war.war");
- assertTrue(classpathSet.contains(warUrl));
- }
-
- @Test(expected = SamzaException.class)
- public void testGetClasspathAsURLsDirectoryDoesNotExist() throws URISyntaxException {
- File nonExistentDirectory =
- new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
- IsolatingClassLoaderFactory.getClasspathAsURLs(nonExistentDirectory);
- }
-
- @Test
- public void testGetClasspathAsURIs() throws URISyntaxException {
- File classpathDirectory = Paths.get(getClass().getResource("/classloader/classpath").toURI()).toFile();
- List<URI> classpath = IsolatingClassLoaderFactory.getClasspathAsURIs(classpathDirectory);
- assertEquals(2, classpath.size());
- Set<URI> classpathSet = ImmutableSet.copyOf(classpath);
- URL jarUrl = getClass().getResource("/classloader/classpath/placeholder-jar.jar");
- assertTrue(classpathSet.contains(jarUrl.toURI()));
- URL warUrl = getClass().getResource("/classloader/classpath/placeholder-war.war");
- assertTrue(classpathSet.contains(warUrl.toURI()));
- }
-
- @Test(expected = SamzaException.class)
- public void testGetClasspathAsURIsDirectoryDoesNotExist() throws URISyntaxException {
- File nonExistentDirectory =
- new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
- IsolatingClassLoaderFactory.getClasspathAsURIs(nonExistentDirectory);
- }
-}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 00c2d5e..af4bd1d 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -546,17 +546,6 @@
}
@Test
- public void testGetClusterBasedJobCoordinatorDependencyIsolationEnabled() {
- Config config = new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true"));
- assertTrue(new JobConfig(config).isSplitDeploymentEnabled());
-
- config = new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false"));
- assertFalse(new JobConfig(config).isSplitDeploymentEnabled());
-
- assertFalse(new JobConfig(new MapConfig()).isSplitDeploymentEnabled());
- }
-
- @Test
public void testGetMetadataFile() {
String execEnvContainerId = "container-id";
String containerMetadataDirectory = "/tmp/samza/log/dir";
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index 5dba698..0d06c3b 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -359,18 +359,6 @@
}
@Test
- public void testIsChangelogSystem() {
- StorageConfig storageConfig = new StorageConfig(new MapConfig(ImmutableMap.of(
- // store0 has a changelog stream
- String.format(StorageConfig.FACTORY, STORE_NAME0), "factory.class",
- String.format(CHANGELOG_STREAM, STORE_NAME0), "system0.changelog-stream",
- // store1 does not have a changelog stream
- String.format(StorageConfig.FACTORY, STORE_NAME1), "factory.class")));
- assertTrue(storageConfig.isChangelogSystem("system0"));
- assertFalse(storageConfig.isChangelogSystem("other-system"));
- }
-
- @Test
public void testHasDurableStores() {
// no changelog, which means no durable stores
StorageConfig storageConfig = new StorageConfig(
diff --git a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
index 0e8f78e..094583e 100644
--- a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
@@ -62,7 +62,7 @@
MockitoAnnotations.initMocks(this);
taskContext =
new TaskContextImpl(taskModel, taskMetricsRegistry, keyValueStoreProvider, tableManager, callbackScheduler,
- offsetManager, null, null);
+ offsetManager, null, null, null);
when(this.taskModel.getTaskName()).thenReturn(TASK_NAME);
}
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
index d623aed..b1739d9 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
@@ -141,13 +141,13 @@
JobCoordinatorMetadata newMetadataWithNoChange =
new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
assertEquals("Application attempt count should be 0", 0,
- metrics.getApplicationAttemptCount().getCount());
+ metrics.getApplicationAttemptCount().getValue().intValue());
metadataChanged =
jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithNoChange);
assertFalse("Metadata check should return false", metadataChanged);
assertEquals("Application attempt count should be 1", 1,
- metrics.getApplicationAttemptCount().getCount());
+ metrics.getApplicationAttemptCount().getValue().intValue());
}
@Test
@@ -161,7 +161,7 @@
} catch (Exception e) {
assertTrue("Expecting SamzaException to be thrown", e instanceof SamzaException);
assertEquals("Metadata generation failed count should be 1", 1,
- jobCoordinatorMetadataManager.getMetrics().getMetadataGenerationFailedCount().getCount());
+ jobCoordinatorMetadataManager.getMetrics().getMetadataGenerationFailedCount().getValue().intValue());
}
}
@@ -211,7 +211,7 @@
JobCoordinatorMetadata actualMetadata = jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
assertNull("Read failed should return null", actualMetadata);
assertEquals("Metadata read failed count should be 1", 1,
- jobCoordinatorMetadataManager.getMetrics().getMetadataReadFailedCount().getCount());
+ jobCoordinatorMetadataManager.getMetrics().getMetadataReadFailedCount().getValue().intValue());
}
@Test
@@ -240,7 +240,7 @@
} catch (Exception e) {
assertTrue("Expecting SamzaException to be thrown", e instanceof SamzaException);
assertEquals("Metadata write failed count should be 1", 1,
- jobCoordinatorMetadataManager.getMetrics().getMetadataWriteFailedCount().getCount());
+ jobCoordinatorMetadataManager.getMetrics().getMetadataWriteFailedCount().getValue().intValue());
}
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java b/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java
new file mode 100644
index 0000000..ca7be0e
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ShellCommandConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestShellCommandBuilder {
+ private static final String URL_STRING = "http://www.google.com";
+
+ @Test
+ public void testBasicBuild() throws MalformedURLException {
+ Config config = new MapConfig(ImmutableMap.of(ShellCommandConfig.COMMAND_SHELL_EXECUTE, "foo"));
+ ShellCommandBuilder shellCommandBuilder = new ShellCommandBuilder();
+ shellCommandBuilder.setConfig(config);
+ shellCommandBuilder.setId("1");
+ shellCommandBuilder.setUrl(new URL(URL_STRING));
+ Map<String, String> expectedEnvironment = ImmutableMap.of(
+ ShellCommandConfig.ENV_CONTAINER_ID, "1",
+ ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING,
+ ShellCommandConfig.ENV_JAVA_OPTS, "",
+ ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
+ // assertions when command path is not set
+ assertEquals("foo", shellCommandBuilder.buildCommand());
+ assertEquals(expectedEnvironment, shellCommandBuilder.buildEnvironment());
+ // assertions when command path is set to empty string
+ shellCommandBuilder.setCommandPath("");
+ assertEquals("foo", shellCommandBuilder.buildCommand());
+ assertEquals(expectedEnvironment, shellCommandBuilder.buildEnvironment());
+ }
+
+ @Test
+ public void testBuildEnvironment() throws MalformedURLException {
+ Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
+ .put(ShellCommandConfig.COMMAND_SHELL_EXECUTE, "foo")
+ .put(ShellCommandConfig.TASK_JVM_OPTS, "-Xmx4g")
+ .put(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, "/path/to/additional/classpath")
+ .put(ShellCommandConfig.TASK_JAVA_HOME, "/path/to/java/home")
+ .build());
+ ShellCommandBuilder shellCommandBuilder = new ShellCommandBuilder();
+ shellCommandBuilder.setConfig(config);
+ shellCommandBuilder.setId("1");
+ shellCommandBuilder.setUrl(new URL(URL_STRING));
+ Map<String, String> expectedEnvironment = new ImmutableMap.Builder<String, String>()
+ .put(ShellCommandConfig.ENV_CONTAINER_ID, "1")
+ .put(ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING)
+ .put(ShellCommandConfig.ENV_JAVA_OPTS, "-Xmx4g")
+ .put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "/path/to/additional/classpath")
+ .put(ShellCommandConfig.ENV_JAVA_HOME, "/path/to/java/home")
+ .build();
+ assertEquals(expectedEnvironment, shellCommandBuilder.buildEnvironment());
+ }
+
+ @Test
+ public void testBuildCommandWithCommandPath() throws MalformedURLException {
+ Config config = new MapConfig(ImmutableMap.of(ShellCommandConfig.COMMAND_SHELL_EXECUTE, "foo"));
+ ShellCommandBuilder shellCommandBuilder = new ShellCommandBuilder();
+ shellCommandBuilder.setConfig(config);
+ shellCommandBuilder.setId("1");
+ shellCommandBuilder.setUrl(new URL(URL_STRING));
+ shellCommandBuilder.setCommandPath("/package/path");
+ assertEquals("/package/path/foo", shellCommandBuilder.buildCommand());
+ }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporter.java b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporter.java
new file mode 100644
index 0000000..c1f5972
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporter.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.Metric;
+import org.apache.samza.metrics.MetricsVisitor;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.apache.samza.metrics.Snapshot;
+import org.apache.samza.metrics.Timer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestLoggingMetricsReporter {
+ private static final long LOGGING_INTERVAL_SECONDS = 15;
+ private static final String COUNTER_NAME = "counter_name";
+ private static final long COUNTER_VALUE = 10;
+ private static final String GAUGE_NAME = "gauge_name";
+ private static final double GAUGE_VALUE = 20.0;
+ private static final String TIMER_NAME = "timer_name";
+ private static final double TIMER_VALUE = 30.0;
+ private static final Pattern DEFAULT_PATTERN = Pattern.compile(".*_name");
+ private static final String GROUP_NAME = "group_name";
+ private static final String SOURCE_NAME = "source_name";
+
+ @Mock
+ private ScheduledExecutorService scheduledExecutorService;
+ @Mock
+ private ReadableMetricsRegistry readableMetricsRegistry;
+ @Mock
+ private Counter counter;
+ @Mock
+ private Gauge<Double> gauge;
+ @Mock
+ private Timer timer;
+ @Mock
+ private Snapshot timerSnapshot;
+
+ private LoggingMetricsReporter loggingMetricsReporter;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+
+ when(this.scheduledExecutorService.scheduleAtFixedRate(any(), eq(LOGGING_INTERVAL_SECONDS),
+ eq(LOGGING_INTERVAL_SECONDS), eq(TimeUnit.SECONDS))).thenAnswer((Answer<Void>) invocation -> {
+ Runnable runnable = invocation.getArgumentAt(0, Runnable.class);
+ runnable.run();
+ return null;
+ });
+
+ when(this.counter.getName()).thenReturn(COUNTER_NAME);
+ when(this.counter.getCount()).thenReturn(COUNTER_VALUE);
+ doAnswer(invocation -> {
+ invocation.getArgumentAt(0, MetricsVisitor.class).counter(this.counter);
+ return null;
+ }).when(this.counter).visit(any());
+
+ when(this.gauge.getName()).thenReturn(GAUGE_NAME);
+ when(this.gauge.getValue()).thenReturn(GAUGE_VALUE);
+ doAnswer(invocation -> {
+ invocation.getArgumentAt(0, MetricsVisitor.class).gauge(this.gauge);
+ return null;
+ }).when(this.gauge).visit(any());
+
+ when(this.timer.getName()).thenReturn(TIMER_NAME);
+ when(this.timer.getSnapshot()).thenReturn(this.timerSnapshot);
+ doAnswer(invocation -> {
+ invocation.getArgumentAt(0, MetricsVisitor.class).timer(this.timer);
+ return null;
+ }).when(this.timer).visit(any());
+ when(this.timerSnapshot.getAverage()).thenReturn(TIMER_VALUE);
+
+ this.loggingMetricsReporter =
+ spy(new LoggingMetricsReporter(this.scheduledExecutorService, DEFAULT_PATTERN, LOGGING_INTERVAL_SECONDS));
+ }
+
+ @Test
+ public void testMetricTypes() {
+ when(this.readableMetricsRegistry.getGroups()).thenReturn(Collections.singleton(GROUP_NAME));
+ Map<String, Metric> metrics =
+ ImmutableMap.of(COUNTER_NAME, this.counter, GAUGE_NAME, this.gauge, TIMER_NAME, this.timer);
+ when(this.readableMetricsRegistry.getGroup(GROUP_NAME)).thenReturn(metrics);
+
+ this.loggingMetricsReporter.register(SOURCE_NAME, this.readableMetricsRegistry);
+ this.loggingMetricsReporter.start();
+
+ verify(this.loggingMetricsReporter).doLog("Metric: source_name-group_name-counter_name, Value: 10");
+ verify(this.loggingMetricsReporter).doLog("Metric: source_name-group_name-gauge_name, Value: 20.0");
+ verify(this.loggingMetricsReporter).doLog("Metric: source_name-group_name-timer_name, Value: 30.0");
+ }
+
+ @Test
+ public void testMultipleRegister() {
+ when(this.readableMetricsRegistry.getGroups()).thenReturn(Collections.singleton(GROUP_NAME));
+ when(this.readableMetricsRegistry.getGroup(GROUP_NAME)).thenReturn(ImmutableMap.of(COUNTER_NAME, this.counter));
+ ReadableMetricsRegistry otherRegistry = mock(ReadableMetricsRegistry.class);
+ String otherGroupName = "other_group";
+ when(otherRegistry.getGroups()).thenReturn(Collections.singleton(otherGroupName));
+ when(otherRegistry.getGroup(otherGroupName)).thenReturn(ImmutableMap.of(GAUGE_NAME, this.gauge));
+
+ this.loggingMetricsReporter.register(SOURCE_NAME, this.readableMetricsRegistry);
+ this.loggingMetricsReporter.register("other_source", otherRegistry);
+ this.loggingMetricsReporter.start();
+
+ verify(this.loggingMetricsReporter).doLog("Metric: source_name-group_name-counter_name, Value: 10");
+ verify(this.loggingMetricsReporter).doLog("Metric: other_source-other_group-gauge_name, Value: 20.0");
+ }
+
+ @Test
+ public void testFiltering() {
+ Pattern countersOnly = Pattern.compile(".*counter.*");
+ this.loggingMetricsReporter =
+ spy(new LoggingMetricsReporter(this.scheduledExecutorService, countersOnly, LOGGING_INTERVAL_SECONDS));
+
+ when(this.readableMetricsRegistry.getGroups()).thenReturn(Collections.singleton(GROUP_NAME));
+ Map<String, Metric> metrics = ImmutableMap.of(COUNTER_NAME, this.counter, GAUGE_NAME, this.gauge);
+ when(this.readableMetricsRegistry.getGroup(GROUP_NAME)).thenReturn(metrics);
+
+ this.loggingMetricsReporter.register(SOURCE_NAME, this.readableMetricsRegistry);
+ this.loggingMetricsReporter.start();
+
+ ArgumentCaptor<String> logs = ArgumentCaptor.forClass(String.class);
+ verify(this.loggingMetricsReporter).doLog(logs.capture());
+ assertEquals(Collections.singletonList("Metric: source_name-group_name-counter_name, Value: 10"),
+ logs.getAllValues());
+ }
+
+ @Test
+ public void testNewMetricsAfterRegister() {
+ when(this.readableMetricsRegistry.getGroups()).thenReturn(Collections.singleton(GROUP_NAME));
+ // first round of logging has one metric (counter only), second call has two (counter and gauge)
+ when(this.readableMetricsRegistry.getGroup(GROUP_NAME)).thenReturn(ImmutableMap.of(COUNTER_NAME, this.counter))
+ .thenReturn(ImmutableMap.of(COUNTER_NAME, this.counter, GAUGE_NAME, this.gauge));
+
+ // capture the logging task so it can be directly executed by the test
+ ArgumentCaptor<Runnable> loggingRunnable = ArgumentCaptor.forClass(Runnable.class);
+ when(this.scheduledExecutorService.scheduleAtFixedRate(loggingRunnable.capture(), eq(LOGGING_INTERVAL_SECONDS),
+ eq(LOGGING_INTERVAL_SECONDS), eq(TimeUnit.SECONDS))).thenReturn(null);
+
+ this.loggingMetricsReporter.register(SOURCE_NAME, this.readableMetricsRegistry);
+ this.loggingMetricsReporter.start();
+
+ // simulate first scheduled execution of logging task
+ loggingRunnable.getValue().run();
+ String expectedCounterLog = "Metric: source_name-group_name-counter_name, Value: 10";
+ // only should get log for counter for the first call
+ verify(this.loggingMetricsReporter).doLog(expectedCounterLog);
+ String expectedGaugeLog = "Metric: source_name-group_name-gauge_name, Value: 20.0";
+ verify(this.loggingMetricsReporter, never()).doLog(expectedGaugeLog);
+
+ // simulate second scheduled execution of logging task
+ loggingRunnable.getValue().run();
+ // should get second log for counter, first log for gauge
+ verify(this.loggingMetricsReporter, times(2)).doLog(expectedCounterLog);
+ verify(this.loggingMetricsReporter).doLog(expectedGaugeLog);
+ }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporterConfig.java b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporterConfig.java
new file mode 100644
index 0000000..40b44e7
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporterConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestLoggingMetricsReporterConfig {
+ private static final String REPORTER_NAME = "reporter_name";
+
+ @Test
+ public void testGetMetricsToLogRegex() {
+ Map<String, String> configMap = ImmutableMap.of("metrics.reporter.reporter_name.log.regex", ".*metric.*");
+ assertEquals(".*metric.*",
+ new LoggingMetricsReporterConfig(new MapConfig(configMap)).getMetricsToLogRegex(REPORTER_NAME));
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testGetMetricsToLogRegexMissing() {
+ new LoggingMetricsReporterConfig(new MapConfig()).getMetricsToLogRegex(REPORTER_NAME);
+ }
+
+ @Test
+ public void testGetLoggingIntervalSeconds() {
+ assertEquals(60, new LoggingMetricsReporterConfig(new MapConfig()).getLoggingIntervalSeconds(REPORTER_NAME));
+
+ Map<String, String> configMap = ImmutableMap.of("metrics.reporter.reporter_name.logging.interval.seconds", "100");
+ assertEquals(100,
+ new LoggingMetricsReporterConfig(new MapConfig(configMap)).getLoggingIntervalSeconds(REPORTER_NAME));
+ }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
index 76b79a7..8218720 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -36,6 +36,7 @@
import org.apache.samza.container.TaskName;
import org.apache.samza.context.Context;
import org.apache.samza.context.MockContext;
+import org.apache.samza.context.TaskContextImpl;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;
import org.apache.samza.job.model.TaskModel;
@@ -93,11 +94,13 @@
Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
Serde storeValSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
+ SystemStreamPartition ssp = new SystemStreamPartition("kafka", "integers", new Partition(0));
TaskModel taskModel = mock(TaskModel.class);
- when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet
- .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+ when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet.of(ssp));
when(taskModel.getTaskName()).thenReturn(new TaskName("task 1"));
when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+ when(((TaskContextImpl) this.context.getTaskContext()).getSspsExcludingSideInputs()).thenReturn(
+ ImmutableSet.of(ssp));
when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new MetricsRegistryMap());
when(this.context.getTaskContext().getStore("jobName-jobId-window-w1"))
diff --git a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
index 0a2e221..9439b01 100644
--- a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
+++ b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
@@ -127,7 +127,7 @@
}
@Test
- public void testEndOfStreamMessage() {
+ public void testEndOfStreamMessageWithTask() {
EndOfStreamMessage eos = new EndOfStreamMessage("test-task");
produceMessages(eos);
@@ -139,6 +139,24 @@
List<IncomingMessageEnvelope> results = consumeRawMessages(sspsToPoll);
assertEquals(1, results.size());
+ assertEquals("test-task", ((EndOfStreamMessage) results.get(0).getMessage()).getTaskName());
+ assertFalse(results.get(0).isEndOfStream());
+ }
+
+ @Test
+ public void testEndOfStreamMessageWithoutTask() {
+ EndOfStreamMessage eos = new EndOfStreamMessage();
+
+ produceMessages(eos);
+
+ Set<SystemStreamPartition> sspsToPoll = IntStream.range(0, PARTITION_COUNT)
+ .mapToObj(partition -> new SystemStreamPartition(SYSTEM_STREAM, new Partition(partition)))
+ .collect(Collectors.toSet());
+
+ List<IncomingMessageEnvelope> results = consumeRawMessages(sspsToPoll);
+
+ assertEquals(1, results.size());
+ assertNull(((EndOfStreamMessage) results.get(0).getMessage()).getTaskName());
assertTrue(results.get(0).isEndOfStream());
}
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
deleted file mode 100644
index 72772ba..0000000
--- a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util;
-
-import org.apache.samza.clustermanager.ClusterBasedJobCoordinatorRunner;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.*;
-import static org.mockito.AdditionalMatchers.*;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.*;
-
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ClusterBasedJobCoordinatorRunner.class})
-public class TestSplitDeploymentUtil {
-
- @Test
- public void testRunWithIsolatingClassLoader() throws Exception {
- // partially mock ClusterBasedJobCoordinator (mock runClusterBasedJobCoordinator method only)
- PowerMockito.spy(ClusterBasedJobCoordinatorRunner.class);
- // save the context classloader to make sure that it gets set properly once the test is finished
- ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
- ClassLoader classLoader = mock(ClassLoader.class);
- String[] args = new String[]{"arg0", "arg1"};
- doReturn(ClusterBasedJobCoordinatorRunner.class).when(classLoader).loadClass(ClusterBasedJobCoordinatorRunner.class.getName());
-
- // stub the private static method which is called by reflection
- PowerMockito.doAnswer(invocation -> {
- // make sure the only calls to this method has the expected arguments
- assertArrayEquals(args, invocation.getArgumentAt(0, String[].class));
- // checks that the context classloader is set correctly
- assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
- return null;
- }).when(ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", any());
-
- try {
- SplitDeploymentUtil.runWithClassLoader(classLoader,
- ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", args);
- assertEquals(previousContextClassLoader, Thread.currentThread().getContextClassLoader());
- } finally {
- // reset it explicitly just in case runWithClassLoader throws an exception
- Thread.currentThread().setContextClassLoader(previousContextClassLoader);
- }
- // make sure that the classloader got used
- verify(classLoader).loadClass(ClusterBasedJobCoordinatorRunner.class.getName());
- // make sure runClusterBasedJobCoordinator only got called once
- verifyPrivate(ClusterBasedJobCoordinatorRunner.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
- }
-}
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
deleted file mode 100644
index c70af8d..0000000
--- a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job
-
-import org.junit.Assert._
-import org.junit.Test
-import scala.collection.JavaConverters._
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.ShellCommandConfig
-import java.net.URL
-
-class TestShellCommandBuilder {
- @Test
- def testEnvironmentVariables {
- val urlStr = "http://www.google.com"
- val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo").asJava)
- val scb = new ShellCommandBuilder
- scb.setConfig(config)
- scb.setId("1")
- scb.setUrl(new URL(urlStr))
- val command = scb.buildCommand
- val environment = scb.buildEnvironment
- assertEquals("foo", command)
- assertEquals("1", environment.get(ShellCommandConfig.ENV_CONTAINER_ID))
- assertEquals(urlStr, environment.get(ShellCommandConfig.ENV_COORDINATOR_URL))
- }
-
- // if cmdPath is specified, the full path to the command should be adjusted
- @Test
- def testBuildCommandWithCommandPath {
- val urlStr = "http://www.linkedin.com"
- val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo").asJava)
- val scb = new ShellCommandBuilder
- scb.setConfig(config)
- scb.setId("1")
- scb.setUrl(new URL(urlStr))
- val command = scb.buildCommand
- assertEquals("foo", command)
-
- scb.setCommandPath("/package/path")
- val command1 = scb.buildCommand
- assertEquals("/package/path/foo", command1)
- }
-}
\ No newline at end of file
diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
index 01c22f5..97443e3 100644
--- a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
+++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
@@ -55,10 +55,6 @@
Preconditions.checkNotNull(type);
Preconditions.checkState(!grouperFactoryClassName.isEmpty(), "Empty grouper factory class provided");
- Preconditions.checkState(CHECKPOINT_V1_KEY_TYPE.equals(type) || CHECKPOINT_V2_KEY_TYPE.equals(type),
- String.format("Invalid type provided for checkpoint key. Expected: (%s or %s) Actual: (%s)",
- CHECKPOINT_V1_KEY_TYPE, CHECKPOINT_V2_KEY_TYPE, type));
-
this.grouperFactoryClassName = grouperFactoryClassName;
this.taskName = taskName;
this.type = type;
diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
index c5c431b..59ff34f 100644
--- a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
+++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
@@ -57,9 +57,8 @@
public KafkaCheckpointLogKey fromBytes(byte[] bytes) {
try {
LinkedHashMap<String, String> deserializedKey = MAPPER.readValue(bytes, LinkedHashMap.class);
- String key = deserializedKey.get(TYPE_FIELD);
- return new KafkaCheckpointLogKey(key, new TaskName(deserializedKey.get(TASK_NAME_FIELD)),
+ return new KafkaCheckpointLogKey(deserializedKey.get(TYPE_FIELD), new TaskName(deserializedKey.get(TASK_NAME_FIELD)),
deserializedKey.get(SSP_GROUPER_FACTORY_FIELD));
} catch (Exception e) {
throw new SamzaException(String.format("Exception in de-serializing checkpoint bytes: %s",
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index e719439..7dbb9b3 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -335,7 +335,7 @@
partitionMetaData.getOldestOffset
}
- private def buildOutgoingMessageEnvelope[T <: Checkpoint](taskName: TaskName, checkpoint: T): OutgoingMessageEnvelope = {
+ def buildOutgoingMessageEnvelope[T <: Checkpoint](taskName: TaskName, checkpoint: T): OutgoingMessageEnvelope = {
checkpoint match {
case checkpointV1: CheckpointV1 => {
val key = new KafkaCheckpointLogKey(
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 69a9966..391536a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -381,14 +381,12 @@
}
def getKafkaSystemProducerConfig( systemName: String,
- clientId: String,
- injectedProps: Map[String, String] = Map()) = {
+ clientId: String) = {
val subConf = config.subset("systems.%s.producer." format systemName, true)
val producerProps = new util.HashMap[String, String]()
producerProps.putAll(subConf)
producerProps.put("client.id", clientId)
- producerProps.putAll(injectedProps.asJava)
new KafkaProducerConfig(systemName, clientId, producerProps)
}
}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 8d1fd6b..a2773f6 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -32,14 +32,6 @@
import org.apache.samza.util._
object KafkaSystemFactory extends Logging {
- @VisibleForTesting
- def getInjectedProducerProperties(systemName: String, config: Config) = if (new StorageConfig(config).isChangelogSystem(systemName)) {
- warn("System name '%s' is being used as a changelog. Disabling compression since Kafka does not support compression for log compacted topics." format systemName)
- Map[String, String]("compression.type" -> "none")
- } else {
- Map[String, String]()
- }
-
val CLIENTID_PRODUCER_PREFIX = "kafka-producer"
val CLIENTID_CONSUMER_PREFIX = "kafka-consumer"
val CLIENTID_ADMIN_PREFIX = "kafka-admin-consumer"
@@ -67,9 +59,8 @@
}
def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = {
- val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config)
val clientId = KafkaConsumerConfig.createClientId(KafkaSystemFactory.CLIENTID_PRODUCER_PREFIX, config);
- val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps)
+ val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
val getProducer = () => {
new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
}
diff --git a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
index 7245e70..08fa02c 100644
--- a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
+++ b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
@@ -53,11 +53,23 @@
@Test
public void testCheckpointTypeV2() {
- KafkaCheckpointLogKey keyV2 = new KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_V2_KEY_TYPE,
- new TaskName("Partition 0"), GroupByPartitionFactory.class.getCanonicalName());
+ KafkaCheckpointLogKey keyV2 = new KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_V2_KEY_TYPE, new TaskName("Partition 0"),
+ GroupByPartitionFactory.class.getCanonicalName());
KafkaCheckpointLogKeySerde checkpointKeySerde = new KafkaCheckpointLogKeySerde();
// test that deserialize(serialize(k)) == k
Assert.assertEquals(keyV2, checkpointKeySerde.fromBytes(checkpointKeySerde.toBytes(keyV2)));
}
+
+ @Test
+ public void testForwardsCompatibility() {
+ // Set the key to another value, this is for the future if we want to support multiple checkpoint keys
+ // we do not want to throw in the Serdes layer, but must be validated in the CheckpointManager
+ KafkaCheckpointLogKey key = new KafkaCheckpointLogKey("checkpoint-v2",
+ new TaskName("Partition 0"), GroupByPartitionFactory.class.getCanonicalName());
+ KafkaCheckpointLogKeySerde checkpointSerde = new KafkaCheckpointLogKeySerde();
+
+ // test that deserialize(serialize(k)) == k
+ Assert.assertEquals(key, checkpointSerde.fromBytes(checkpointSerde.toBytes(key)));
+ }
}
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 866904b..835f53e 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -20,7 +20,6 @@
package org.apache.samza.checkpoint.kafka
import java.util.Properties
-
import kafka.integration.KafkaServerTestHarness
import kafka.utils.{CoreUtils, TestUtils}
import com.google.common.collect.ImmutableMap
@@ -29,7 +28,7 @@
import org.apache.samza.container.TaskName
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.serializers.CheckpointV1Serde
+import org.apache.samza.serializers.{CheckpointV1Serde, CheckpointV2Serde}
import org.apache.samza.system._
import org.apache.samza.system.kafka.{KafkaStreamSpec, KafkaSystemFactory}
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
@@ -276,6 +275,42 @@
}
@Test
+ def testReadCheckpointShouldIgnoreUnknownCheckpointKeys(): Unit = {
+ val checkpointTopic = "checkpoint-topic-1"
+ val kcm1 = createKafkaCheckpointManager(checkpointTopic)
+ kcm1.register(taskName)
+ kcm1.createResources
+ kcm1.start
+ kcm1.stop
+
+ // check that start actually creates the topic with log compaction enabled
+ val topicConfig = adminZkClient.getAllTopicConfigs().getOrElse(checkpointTopic, new Properties())
+
+ assertEquals(topicConfig, new KafkaConfig(config).getCheckpointTopicProperties())
+ assertEquals("compact", topicConfig.get("cleanup.policy"))
+ assertEquals("26214400", topicConfig.get("segment.bytes"))
+
+ // read before topic exists should result in a null checkpoint
+ val readCp = readCheckpoint(checkpointTopic, taskName)
+ assertNull(readCp)
+ // skips unknown checkpoints from checkpoint topic
+ writeCheckpoint(checkpointTopic, taskName, checkpoint1, "checkpoint-v2", useMock = true)
+ assertNull(readCheckpoint(checkpointTopic, taskName, useMock = true))
+
+ // reads latest v1 checkpoints
+ writeCheckpoint(checkpointTopic, taskName, checkpoint1, useMock = true)
+ assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName, useMock = true))
+
+ // writing checkpoint v2 still returns the previous v1 checkpoint
+ writeCheckpoint(checkpointTopic, taskName, checkpoint2, "checkpoint-v2", useMock = true)
+ assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName, useMock = true))
+
+ // writing checkpoint2 with the correct key returns the checkpoint2
+ writeCheckpoint(checkpointTopic, taskName, checkpoint2, useMock = true)
+ assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName, useMock = true))
+ }
+
+ @Test
def testWriteCheckpointShouldRetryFiniteTimesOnFailure(): Unit = {
val checkpointTopic = "checkpoint-topic-2"
val mockKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])
@@ -401,7 +436,8 @@
}
private def createKafkaCheckpointManager(cpTopic: String, serde: CheckpointV1Serde = new CheckpointV1Serde,
- failOnTopicValidation: Boolean = true, overrideConfig: Config = config) = {
+ failOnTopicValidation: Boolean = true, useMock: Boolean = false, checkpointKey: String = KafkaCheckpointLogKey.CHECKPOINT_V1_KEY_TYPE,
+ overrideConfig: Config = config) = {
val kafkaConfig = new org.apache.samza.config.KafkaConfig(overrideConfig)
val props = kafkaConfig.getCheckpointTopicProperties()
val systemName = kafkaConfig.getCheckpointSystem.getOrElse(
@@ -414,11 +450,17 @@
val systemFactory = ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory])
val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, props)
- new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, overrideConfig, new NoOpMetricsRegistry, serde)
+
+ if (useMock) {
+ new MockKafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, serde, checkpointKey)
+ } else {
+ new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, overrideConfig, new NoOpMetricsRegistry, serde)
+ }
}
- private def readCheckpoint(checkpointTopic: String, taskName: TaskName, config: Config = config) : Checkpoint = {
- val kcm = createKafkaCheckpointManager(checkpointTopic, overrideConfig = config)
+ private def readCheckpoint(checkpointTopic: String, taskName: TaskName, config: Config = config,
+ useMock: Boolean = false) : Checkpoint = {
+ val kcm = createKafkaCheckpointManager(checkpointTopic, overrideConfig = config, useMock = useMock)
kcm.register(taskName)
kcm.start
val checkpoint = kcm.readLastCheckpoint(taskName)
@@ -426,8 +468,9 @@
checkpoint
}
- private def writeCheckpoint(checkpointTopic: String, taskName: TaskName, checkpoint: Checkpoint): Unit = {
- val kcm = createKafkaCheckpointManager(checkpointTopic)
+ private def writeCheckpoint(checkpointTopic: String, taskName: TaskName, checkpoint: Checkpoint,
+ checkpointKey: String = KafkaCheckpointLogKey.CHECKPOINT_V1_KEY_TYPE, useMock: Boolean = false): Unit = {
+ val kcm = createKafkaCheckpointManager(checkpointTopic, checkpointKey = checkpointKey, useMock = useMock)
kcm.register(taskName)
kcm.start
kcm.writeCheckpoint(taskName, checkpoint)
@@ -456,4 +499,35 @@
}
}
+
+ class MockKafkaCheckpointManager(spec: KafkaStreamSpec, systemFactory: SystemFactory, failOnTopicValidation: Boolean,
+ serde: CheckpointV1Serde = new CheckpointV1Serde, checkpointKey: String)
+ extends KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, config,
+ new NoOpMetricsRegistry, serde) {
+
+ override def buildOutgoingMessageEnvelope[T <: Checkpoint](taskName: TaskName, checkpoint: T): OutgoingMessageEnvelope = {
+ val key = new KafkaCheckpointLogKey(checkpointKey, taskName, expectedGrouperFactory)
+ val keySerde = new KafkaCheckpointLogKeySerde
+ val checkpointMsgSerde = new CheckpointV1Serde
+ val checkpointV2MsgSerde = new CheckpointV2Serde
+ val keyBytes = try {
+ keySerde.toBytes(key)
+ } catch {
+ case e: Exception => throw new SamzaException(s"Exception when writing checkpoint-key for $taskName: $checkpoint", e)
+ }
+ val msgBytes = try {
+ checkpoint match {
+ case v1: CheckpointV1 =>
+ checkpointMsgSerde.toBytes(v1)
+ case v2: CheckpointV2 =>
+ checkpointV2MsgSerde.toBytes(v2)
+ case _ =>
+ throw new IllegalArgumentException("Unknown checkpoint key type for test, please use Checkpoint v1 or v2")
+ }
+ } catch {
+ case e: Exception => throw new SamzaException(s"Exception when writing checkpoint for $taskName: $checkpoint", e)
+ }
+ new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
+ }
+ }
}
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
index 596d67b..ecbc00d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
@@ -82,16 +82,4 @@
assertNotNull(producer)
assertTrue(producer.isInstanceOf[KafkaSystemProducer])
}
-
- @Test
- def testInjectedProducerProps {
- val configMap = Map[String, String](
- StorageConfig.FACTORY.format("system1") -> "some.factory.Class",
- StorageConfig.CHANGELOG_STREAM.format("system1") -> "system1.stream1",
- StorageConfig.FACTORY.format("system2") -> "some.factory.Class")
- val config = new MapConfig(configMap.asJava)
- assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system3", config))
- assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system2", config))
- assertEquals(Map[String, String]("compression.type" -> "none"), KafkaSystemFactory.getInjectedProducerProperties("system1", config))
- }
}
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
index 0908a97..ac2b3f0 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
@@ -69,6 +69,8 @@
private static final String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num";
private static final String ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICROS = "rocksdb.delete.obsolete.files.period.micros";
private static final String ROCKSDB_MAX_MANIFEST_FILE_SIZE = "rocksdb.max.manifest.file.size";
+ private static final String ROCKSDB_MAX_OPEN_FILES = "rocksdb.max.open.files";
+ private static final String ROCKSDB_MAX_FILE_OPENING_THREADS = "rocksdb.max.file.opening.threads";
public static Options options(Config storeConfig, int numTasksForContainer, File storeDir, StorageEngineFactory.StoreMode storeMode) {
Options options = new Options();
@@ -124,6 +126,8 @@
options.setMaxLogFileSize(storeConfig.getLong(ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, 64 * 1024 * 1024L));
options.setKeepLogFileNum(storeConfig.getLong(ROCKSDB_KEEP_LOG_FILE_NUM, 2));
options.setDeleteObsoleteFilesPeriodMicros(storeConfig.getLong(ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICROS, 21600000000L));
+ options.setMaxOpenFiles(storeConfig.getInt(ROCKSDB_MAX_OPEN_FILES, -1));
+ options.setMaxFileOpeningThreads(storeConfig.getInt(ROCKSDB_MAX_FILE_OPENING_THREADS, 16));
// The default for rocksdb is 18446744073709551615, which is larger than java Long.MAX_VALUE. Hence setting it only if it's passed.
if (storeConfig.containsKey(ROCKSDB_MAX_MANIFEST_FILE_SIZE)) {
options.setMaxManifestFileSize(storeConfig.getLong(ROCKSDB_MAX_MANIFEST_FILE_SIZE));
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
index 0044904..f9a1f24 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
@@ -49,6 +49,9 @@
static final public String ROCKSDB_NUM_WRITE_BUFFERS = "rocksdb.num.write.buffers";
static final public String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes";
static final public String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num";
+ static final public String ROCKSDB_MAX_OPEN_FILES = "rocksdb.max.open.files";
+ static final public String ROCKSDB_MAX_FILE_OPENING_THREADS = "rocksdb.max.file.opening.threads";
+
private Integer writeBatchSize;
private Integer objectCacheSize;
@@ -61,6 +64,8 @@
private Integer numLogFilesToKeep;
private String compressionType;
private String compactionStyle;
+ private Integer maxOpenFiles;
+ private Integer maxFileOpeningThreads;
/**
* Constructs a table descriptor instance
@@ -273,6 +278,35 @@
return this;
}
+ /**
+ * Limits the number of open files that can be used by the DB. You may need to increase this if your database
+ * has a large working set. Value -1 means files opened are always kept open.
+ * <p>
+ * Default value is -1.
+ * <p>
+ * @param maxOpenFiles the number of open files that can be used by the DB.
+ * @return this table descriptor instance
+ */
+ public RocksDbTableDescriptor<K, V> withMaxOpenFiles(int maxOpenFiles) {
+ this.maxOpenFiles = maxOpenFiles;
+ return this;
+ }
+
+ /**
+ * Sets the number of threads used to open DB files.
+ * If max_open_files is -1, DB will open all files on DB::Open(). You can use this option to increase the number of
+ * threads used to open files.
+ * <p>
+ * Default is 16.
+ * <p>
+ * @param maxFileOpeningThreads The number of threads to use when opening DB files.
+ * @return the table descriptor instance
+ */
+ public RocksDbTableDescriptor<K, V> withMaxFileOpeningThreads(int maxFileOpeningThreads) {
+ this.maxFileOpeningThreads = maxFileOpeningThreads;
+ return this;
+ }
+
@Override
public String getProviderFactoryClassName() {
return LocalTableProviderFactory.class.getName();
@@ -320,6 +354,12 @@
if (numLogFilesToKeep != null) {
addStoreConfig(ROCKSDB_KEEP_LOG_FILE_NUM, numLogFilesToKeep.toString(), tableConfig);
}
+ if (maxOpenFiles != null) {
+ addStoreConfig(ROCKSDB_MAX_OPEN_FILES, maxOpenFiles.toString(), tableConfig);
+ }
+ if (maxFileOpeningThreads != null) {
+ addStoreConfig(ROCKSDB_MAX_FILE_OPENING_THREADS, maxFileOpeningThreads.toString(), tableConfig);
+ }
return Collections.unmodifiableMap(tableConfig);
}
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
index c58d123..9fc9938 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
@@ -73,10 +73,12 @@
.withTtl(7)
.withWriteBatchSize(8)
.withWriteBufferSize(9)
+ .withMaxOpenFiles(10)
+ .withMaxFileOpeningThreads(11)
.withConfig("abc", "xyz")
.toConfig(createJobConfig());
- Assert.assertEquals(14, tableConfig.size());
+ Assert.assertEquals(16, tableConfig.size());
assertEquals("1", RocksDbTableDescriptor.ROCKSDB_BLOCK_SIZE_BYTES, tableConfig);
assertEquals("2", RocksDbTableDescriptor.CONTAINER_CACHE_SIZE_BYTES, tableConfig);
assertEquals("3", RocksDbTableDescriptor.ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, tableConfig);
@@ -86,6 +88,8 @@
assertEquals("7", RocksDbTableDescriptor.ROCKSDB_TTL_MS, tableConfig);
assertEquals("8", RocksDbTableDescriptor.WRITE_BATCH_SIZE, tableConfig);
assertEquals("9", RocksDbTableDescriptor.CONTAINER_WRITE_BUFFER_SIZE_BYTES, tableConfig);
+ assertEquals("10", RocksDbTableDescriptor.ROCKSDB_MAX_OPEN_FILES, tableConfig);
+ assertEquals("11", RocksDbTableDescriptor.ROCKSDB_MAX_FILE_OPENING_THREADS, tableConfig);
assertEquals("snappy", RocksDbTableDescriptor.ROCKSDB_COMPRESSION, tableConfig);
assertEquals("fifo", RocksDbTableDescriptor.ROCKSDB_COMPACTION_STYLE, tableConfig);
Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_STREAM, TABLE_ID)));
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index 2b03c31..f82a576 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -81,7 +81,7 @@
private static final String CREATE_STREAM_ENABLED = "task.log4j.create.stream.enabled";
private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
- private final BlockingQueue<byte[]> logQueue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
+ private final BlockingQueue<EncodedLogEvent> logQueue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
private SystemStream systemStream = null;
private SystemProducer systemProducer = null;
@@ -233,13 +233,13 @@
*/
private void handleEvent(LogEvent event) throws InterruptedException {
if (usingAsyncLogger) {
- sendEventToSystemProducer(encodeLogEventToBytes(event));
+ sendEventToSystemProducer(encodeLogEvent(event));
return;
}
// Serialize the event before adding to the queue to leverage the caller thread
// and ensure that the transferThread can keep up.
- if (!logQueue.offer(encodeLogEventToBytes(event), queueTimeoutS, TimeUnit.SECONDS)) {
+ if (!logQueue.offer(encodeLogEvent(event), queueTimeoutS, TimeUnit.SECONDS)) {
// Do NOT retry adding system to the queue. Dropping the event allows us to alleviate the unlikely
// possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer
// which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack
@@ -265,8 +265,8 @@
metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
}
- protected byte[] encodeLogEventToBytes(LogEvent event) {
- return serde.toBytes(subLog(event));
+ protected EncodedLogEvent encodeLogEvent(LogEvent event) {
+ return new ByteArrayEncodedLogEvent(serde.toBytes(subLog(event)));
}
private Message subAppend(LogEvent event) {
@@ -437,21 +437,21 @@
/**
* Helper method to send a serialized log-event to the systemProducer, and increment respective methods.
- * @param serializedLogEvent
+ * @param logQueueEntry the serialized log-event to be sent to the systemProducer
*/
- private void sendEventToSystemProducer(byte[] serializedLogEvent) {
- metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
+ private void sendEventToSystemProducer(EncodedLogEvent logQueueEntry) {
+ metrics.logMessagesBytesSent.inc(logQueueEntry.getEntryValueSize());
metrics.logMessagesCountSent.inc();
- systemProducer.send(SOURCE, decorateLogEvent(serializedLogEvent));
+ systemProducer.send(SOURCE, decorateLogEvent(logQueueEntry));
}
/**
* Helper method to create an OutgoingMessageEnvelope from the serialized log event.
- * @param messageBytes message bytes
+ * @param logQueueEntry message bytes
* @return OutgoingMessageEnvelope that contains the message bytes along with the system stream
*/
- protected OutgoingMessageEnvelope decorateLogEvent(byte[] messageBytes) {
- return new OutgoingMessageEnvelope(systemStream, keyBytes, messageBytes);
+ protected OutgoingMessageEnvelope decorateLogEvent(EncodedLogEvent logQueueEntry) {
+ return new OutgoingMessageEnvelope(systemStream, keyBytes, logQueueEntry.getValue());
}
protected String getStreamName(String jobName, String jobId) {
@@ -498,4 +498,42 @@
public Serde<LogEvent> getSerde() {
return serde;
}
+
+ /**
+ * A LogQeueEntry is the element inserted into the log queue of the stream appender
+ * that holds the log messages before they are sent to the underlying system producer.
+ * @param <T> type of object held as the entry
+ */
+ protected interface EncodedLogEvent<T> {
+ /**
+ * fetches the size of the log message held within the LogQueueEntry
+ * @return size of the log message
+ */
+ public long getEntryValueSize();
+
+ /**
+ * fetches the actual log message held within the LogQueueEntry
+ * @return the actual log message
+ */
+ public T getValue();
+ }
+
+ /**
+ * LogQueueEntry impl that holds the serialized byte[] of the log message.
+ */
+ private class ByteArrayEncodedLogEvent implements EncodedLogEvent<byte[]> {
+ final byte[] entryValue;
+ public ByteArrayEncodedLogEvent(byte[] array) {
+ entryValue = array;
+ }
+ @Override
+ public byte[] getValue() {
+ return entryValue;
+ }
+
+ @Override
+ public long getEntryValueSize() {
+ return entryValue.length;
+ }
+ }
}
\ No newline at end of file
diff --git a/samza-shell/src/main/bash/run-class.sh b/samza-shell/src/main/bash/run-class.sh
index 9b5ac30..a162ad5 100755
--- a/samza-shell/src/main/bash/run-class.sh
+++ b/samza-shell/src/main/bash/run-class.sh
@@ -53,13 +53,13 @@
echo APPLICATION_LIB_DIR=$APPLICATION_LIB_DIR
echo BASE_LIB_DIR=$BASE_LIB_DIR
-BASE_LIB_CLASSPATH=""
+CLASSPATH=""
# all the jars need to be appended on newlines to ensure line argument length of 72 bytes is not violated
for file in $BASE_LIB_DIR/*.[jw]ar;
do
- BASE_LIB_CLASSPATH=$BASE_LIB_CLASSPATH" $file \n"
+ CLASSPATH=$CLASSPATH" $file \n"
done
-echo generated from BASE_LIB_DIR BASE_LIB_CLASSPATH=$BASE_LIB_CLASSPATH
+echo generated from BASE_LIB_DIR CLASSPATH=$CLASSPATH
# In some cases (AWS) $JAVA_HOME/bin doesn't contain jar.
if [ -z "$JAVA_HOME" ] || [ ! -e "$JAVA_HOME/bin/jar" ]; then
@@ -68,23 +68,21 @@
JAR="$JAVA_HOME/bin/jar"
fi
-# Create a pathing JAR for the JARs in the BASE_LIB_DIR
-# Newlines and spaces are intended to ensure proper parsing of manifest in pathing jar
-printf "Class-Path: \n $BASE_LIB_CLASSPATH \n" > base-lib-manifest.txt
-# Creates a new archive and adds custom manifest information to base-lib-pathing.jar
-eval "$JAR -cvmf base-lib-manifest.txt base-lib-pathing.jar"
+# Create a separate directory for writing files related to classpath management. It is easier to manage
+# permissions for the classpath-related files when they are in their own directory. An example of where
+# this is helpful is when using container images which might have predefined permissions for certain
+# directories.
+CLASSPATH_WORKSPACE_DIR=$base_dir/classpath_workspace
+mkdir -p $CLASSPATH_WORKSPACE_DIR
+# file containing the classpath string; used to avoid passing long classpaths directly to the jar command
+PATHING_MANIFEST_FILE=$CLASSPATH_WORKSPACE_DIR/manifest.txt
+# jar file to include on the classpath for running the main class
+PATHING_JAR_FILE=$CLASSPATH_WORKSPACE_DIR/pathing.jar
-# Create a pathing JAR for the runtime framework resources. It is useful to separate this from the base-lib-pathing.jar
-# because the split deployment framework may only need the resources from this runtime pathing JAR.
-if ! [[ $HADOOP_CONF_DIR =~ .*/$ ]]; then
- # manifest requires a directory to have a trailing slash
- HADOOP_CONF_DIR="$HADOOP_CONF_DIR/"
-fi
-# HADOOP_CONF_DIR should be supplied to classpath explicitly for Yarn to parse configs
-RUNTIME_FRAMEWORK_RESOURCES_CLASSPATH="$HADOOP_CONF_DIR \n"
-# TODO add JARs from ADDITIONAL_CLASSPATH_DIR to runtime-framework-resources-pathing.jar as well
-printf "Class-Path: \n $RUNTIME_FRAMEWORK_RESOURCES_CLASSPATH \n" > runtime-framework-resources-manifest.txt
-eval "$JAR -cvmf runtime-framework-resources-manifest.txt runtime-framework-resources-pathing.jar"
+# Newlines and spaces are intended to ensure proper parsing of manifest in pathing jar
+printf "Class-Path: \n $CLASSPATH \n" > $PATHING_MANIFEST_FILE
+# Creates a new archive and adds custom manifest information to pathing.jar
+eval "$JAR -cvmf $PATHING_MANIFEST_FILE $PATHING_JAR_FILE"
if [ -z "$JAVA_HOME" ]; then
JAVA="java"
@@ -163,11 +161,12 @@
# Check if 64 bit is set. If not - try and set it if it's supported
[[ $JAVA_OPTS != *-d64* ]] && check_and_enable_64_bit_mode
-echo $JAVA $JAVA_OPTS -cp base-lib-pathing.jar:runtime-framework-resources-pathing.jar "$@"
+# HADOOP_CONF_DIR should be supplied to classpath explicitly for Yarn to parse configs
+echo $JAVA $JAVA_OPTS -cp $HADOOP_CONF_DIR:$PATHING_JAR_FILE "$@"
## If localized resource lib directory is defined, then include it in the classpath.
if [[ -z "${ADDITIONAL_CLASSPATH_DIR}" ]]; then
- exec $JAVA $JAVA_OPTS -cp base-lib-pathing.jar:runtime-framework-resources-pathing.jar "$@"
+ exec $JAVA $JAVA_OPTS -cp $HADOOP_CONF_DIR:$PATHING_JAR_FILE "$@"
else
- exec $JAVA $JAVA_OPTS -cp base-lib-pathing.jar:runtime-framework-resources-pathing.jar:$ADDITIONAL_CLASSPATH_DIR "$@"
-fi
\ No newline at end of file
+ exec $JAVA $JAVA_OPTS -cp $HADOOP_CONF_DIR:$PATHING_JAR_FILE:$ADDITIONAL_CLASSPATH_DIR "$@"
+fi
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index fb65eea..943a8ca 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -107,6 +107,14 @@
this.configs = new HashMap<>();
this.inMemoryScope = RandomStringUtils.random(10, true, true);
configs.put(ApplicationConfig.APP_NAME, APP_NAME);
+ /*
+ * Use a unique app id to help make sure a test execution is isolated from others.
+ * A concrete example of where this helps is to avoid an issue with ControlMessageSender. It has a static cache
+ * keyed by stream id to save partition counts for intermediate streams. This means that different tests can
+ * collide in this cache if they use the same intermediate stream names. Having a unique app id makes the
+ * intermediate streams unique across tests.
+ */
+ configs.put(ApplicationConfig.APP_ID, this.inMemoryScope);
configs.put(JobConfig.PROCESSOR_ID, "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(JobConfig.STARTPOINT_METADATA_STORE_FACTORY, InMemoryMetadataStoreFactory.class.getCanonicalName());
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
index 0d1e49a..012eece 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
@@ -188,6 +188,7 @@
configMap.put("systems.kafka.samza.key.serde", "string");
configMap.put("systems.kafka.samza.msg.serde", "string");
configMap.put("systems.kafka.samza.offset.default", "oldest");
+ configMap.put("systems.kafka.producer.compression.type", "snappy");
configMap.put("job.coordinator.system", "kafka");
configMap.put("job.default.system", "kafka");
configMap.put("job.coordinator.replication.factor", "1");
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
index 78fc7b5..ea81f13 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
@@ -19,6 +19,7 @@
package org.apache.samza.test.table;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -27,10 +28,8 @@
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.context.Context;
@@ -39,7 +38,6 @@
import org.apache.samza.operators.MessageStream;
import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
@@ -47,11 +45,10 @@
import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.Table;
-import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.test.framework.TestRunner;
+import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.apache.samza.test.util.ArraySystemFactory;
-import org.apache.samza.test.util.Base64Serializer;
-
-import org.junit.Assert;
import org.junit.Test;
import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
@@ -62,50 +59,42 @@
import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
/**
* This test class tests sendTo() and join() for local tables
*/
-public class TestLocalTableEndToEnd extends IntegrationTestHarness {
+public class TestLocalTableEndToEnd {
+ private static final String SYSTEM_NAME = "test";
+ private static final String PAGEVIEW_STREAM = "pageview";
+ private static final String PROFILE_STREAM = "profile";
@Test
- public void testSendTo() throws Exception {
-
- int count = 10;
- Profile[] profiles = TestTableData.generateProfiles(count);
-
- int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
- configs.put("streams.Profile.samza.system", "test");
- configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
- configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
-
+ public void testSendTo() {
MyMapFunction mapFn = new MyMapFunction();
+ StreamApplication app = appDesc -> {
+ Table<KV<Integer, Profile>> table =
+ appDesc.getTable(new InMemoryTableDescriptor<>("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+ DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor(SYSTEM_NAME);
+ GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
- final StreamApplication app = appDesc -> {
-
- Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
- KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-
- appDesc.getInputStream(isd)
- .map(mapFn)
- .sendTo(table);
+ appDesc.getInputStream(isd).map(mapFn).sendTo(table);
};
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
- executeRun(runner, config);
- runner.waitForFinish();
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
+ InMemoryInputDescriptor<Profile> profileStreamDesc = isd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
- for (int i = 0; i < partitionCount; i++) {
+ int numProfilesPerPartition = 10;
+ int numInputPartitions = 4;
+ Map<Integer, List<Profile>> inputProfiles =
+ TestTableData.generatePartitionedProfiles(numProfilesPerPartition * numInputPartitions, numInputPartitions);
+ TestRunner.of(app).addInputStream(profileStreamDesc, inputProfiles).run(Duration.ofSeconds(10));
+
+ for (int i = 0; i < numInputPartitions; i++) {
MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
- assertEquals(count, mapFnCopy.received.size());
- mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
+ assertEquals(numProfilesPerPartition, mapFnCopy.received.size());
+ mapFnCopy.received.forEach(p -> assertNotNull(mapFnCopy.table.get(p.getMemberId())));
}
}
@@ -116,52 +105,49 @@
@Override
public void describe(StreamApplicationDescriptor appDesc) {
Table<KV<Integer, Profile>> table = appDesc.getTable(
- new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+ new InMemoryTableDescriptor<>("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+ DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor(SYSTEM_NAME);
+ GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
+ profileISD.shouldBootstrap();
appDesc.getInputStream(profileISD)
- .map(m -> new KV(m.getMemberId(), m))
+ .map(m -> new KV<>(m.getMemberId(), m))
.sendTo(table);
- GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+ GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>());
appDesc.getInputStream(pageViewISD)
.map(pv -> {
received.add(pv);
return pv;
})
- .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
+ .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new IntegerSerde(), new PageViewJsonSerde()), "p1")
.join(table, new PageViewToProfileJoinFunction())
.sink((m, collector, coordinator) -> joined.add(m));
}
}
@Test
- public void testStreamTableJoin() throws Exception {
-
- int count = 10;
- PageView[] pageViews = TestTableData.generatePageViews(count);
- Profile[] profiles = TestTableData.generateProfiles(count);
-
+ public void testStreamTableJoin() {
+ int totalPageViews = 40;
int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+ Map<Integer, List<PageView>> inputPageViews =
+ TestTableData.generatePartitionedPageViews(totalPageViews, partitionCount);
+ // 10 is the max member id for page views
+ Map<Integer, List<Profile>> inputProfiles =
+ TestTableData.generatePartitionedProfiles(10, partitionCount);
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
+ InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
+ .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>());
+ InMemoryInputDescriptor<Profile> profileStreamDesc = isd
+ .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
- configs.put("streams.PageView.samza.system", "test");
- configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
+ TestRunner.of(new StreamTableJoinApp())
+ .addInputStream(pageViewStreamDesc, inputPageViews)
+ .addInputStream(profileStreamDesc, inputProfiles)
+ .run(Duration.ofSeconds(10));
- configs.put("streams.Profile.samza.system", "test");
- configs.put("streams.Profile.samza.bootstrap", "true");
- configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
- configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config);
- executeRun(runner, config);
- runner.waitForFinish();
-
- assertEquals(count * partitionCount, StreamTableJoinApp.received.size());
- assertEquals(count * partitionCount, StreamTableJoinApp.joined.size());
- assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView);
+ assertEquals(totalPageViews, StreamTableJoinApp.received.size());
+ assertEquals(totalPageViews, StreamTableJoinApp.joined.size());
+ assertNotNull(StreamTableJoinApp.joined.get(0));
}
static class DualStreamTableJoinApp implements StreamApplication {
@@ -178,29 +164,31 @@
PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction();
PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction();
- Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
+ Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor<>("t1", profileKVSerde));
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
- GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
+ DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor(SYSTEM_NAME);
+ GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor(PROFILE_STREAM + "1", new NoOpSerde<>());
+ profileISD1.shouldBootstrap();
+ GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor(PROFILE_STREAM + "2", new NoOpSerde<>());
+ profileISD2.shouldBootstrap();
MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1);
MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2);
profileStream1
.map(m -> {
sentToProfileTable1.add(m);
- return new KV(m.getMemberId(), m);
+ return new KV<>(m.getMemberId(), m);
})
.sendTo(profileTable);
profileStream2
.map(m -> {
sentToProfileTable2.add(m);
- return new KV(m.getMemberId(), m);
+ return new KV<>(m.getMemberId(), m);
})
.sendTo(profileTable);
- GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
- GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
+ GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor(PAGEVIEW_STREAM + "1", new NoOpSerde<>());
+ GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor(PAGEVIEW_STREAM + "2", new NoOpSerde<>());
MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1);
MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2);
@@ -217,45 +205,40 @@
}
@Test
- public void testDualStreamTableJoin() throws Exception {
-
- int count = 10;
- PageView[] pageViews = TestTableData.generatePageViews(count);
- Profile[] profiles = TestTableData.generateProfiles(count);
-
+ public void testDualStreamTableJoin() {
+ int totalPageViews = 40;
int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+ Map<Integer, List<PageView>> inputPageViews1 =
+ TestTableData.generatePartitionedPageViews(totalPageViews, partitionCount);
+ Map<Integer, List<PageView>> inputPageViews2 =
+ TestTableData.generatePartitionedPageViews(totalPageViews, partitionCount);
+ // 10 is the max member id for page views
+ int numProfiles = 10;
+ Map<Integer, List<Profile>> inputProfiles1 = TestTableData.generatePartitionedProfiles(numProfiles, partitionCount);
+ Map<Integer, List<Profile>> inputProfiles2 = TestTableData.generatePartitionedProfiles(numProfiles, partitionCount);
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
+ InMemoryInputDescriptor<PageView> pageViewStreamDesc1 = isd
+ .getInputDescriptor(PAGEVIEW_STREAM + "1", new NoOpSerde<>());
+ InMemoryInputDescriptor<PageView> pageViewStreamDesc2 = isd
+ .getInputDescriptor(PAGEVIEW_STREAM + "2", new NoOpSerde<>());
+ InMemoryInputDescriptor<Profile> profileStreamDesc1 = isd
+ .getInputDescriptor(PROFILE_STREAM + "1", new NoOpSerde<>());
+ InMemoryInputDescriptor<Profile> profileStreamDesc2 = isd
+ .getInputDescriptor(PROFILE_STREAM + "2", new NoOpSerde<>());
- configs.put("streams.Profile1.samza.system", "test");
- configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
- configs.put("streams.Profile1.samza.bootstrap", "true");
- configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount));
+ TestRunner.of(new DualStreamTableJoinApp())
+ .addInputStream(pageViewStreamDesc1, inputPageViews1)
+ .addInputStream(pageViewStreamDesc2, inputPageViews2)
+ .addInputStream(profileStreamDesc1, inputProfiles1)
+ .addInputStream(profileStreamDesc2, inputProfiles2)
+ .run(Duration.ofSeconds(10));
- configs.put("streams.Profile2.samza.system", "test");
- configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles));
- configs.put("streams.Profile2.samza.bootstrap", "true");
- configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount));
-
- configs.put("streams.PageView1.samza.system", "test");
- configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount));
-
- configs.put("streams.PageView2.samza.system", "test");
- configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount));
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config);
- executeRun(runner, config);
- runner.waitForFinish();
-
- assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size());
- assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size());
-
- assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size());
- assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size());
- assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView);
- assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView);
+ assertEquals(numProfiles, DualStreamTableJoinApp.sentToProfileTable1.size());
+ assertEquals(numProfiles, DualStreamTableJoinApp.sentToProfileTable2.size());
+ assertEquals(totalPageViews, DualStreamTableJoinApp.joinedPageViews1.size());
+ assertEquals(totalPageViews, DualStreamTableJoinApp.joinedPageViews2.size());
+ assertNotNull(DualStreamTableJoinApp.joinedPageViews1.get(0));
+ assertNotNull(DualStreamTableJoinApp.joinedPageViews2.get(0));
}
static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
@@ -283,8 +266,7 @@
}
private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> {
-
- private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>();
+ private static final Map<String, MyMapFunction> TASK_TO_MAP_FUNCTION_MAP = new HashMap<>();
private transient List<Profile> received;
private transient ReadWriteTable table;
@@ -294,17 +276,17 @@
table = context.getTaskContext().getTable("t1");
this.received = new ArrayList<>();
- taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
+ TASK_TO_MAP_FUNCTION_MAP.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
}
@Override
public KV<Integer, Profile> apply(Profile profile) {
received.add(profile);
- return new KV(profile.getMemberId(), profile);
+ return new KV<>(profile.getMemberId(), profile);
}
public static MyMapFunction getMapFunctionByTask(String taskName) {
- return taskToMapFunctionMap.get(taskName);
+ return TASK_TO_MAP_FUNCTION_MAP.get(taskName);
}
}
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
index 071f65e..34ac29a 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
@@ -19,128 +19,190 @@
package org.apache.samza.test.table;
-import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
-import org.apache.samza.SamzaException;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.application.SamzaApplication;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StreamConfig;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
-import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.framework.StreamAssert;
import org.apache.samza.test.framework.TestRunner;
import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
-import org.apache.samza.test.harness.IntegrationTestHarness;
import org.junit.Test;
import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
import static org.apache.samza.test.table.TestTableData.PageView;
import static org.apache.samza.test.table.TestTableData.Profile;
import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-public class TestLocalTableWithSideInputsEndToEnd extends IntegrationTestHarness {
+public class TestLocalTableWithSideInputsEndToEnd {
+ private static final String SYSTEM_NAME = "test";
private static final String PAGEVIEW_STREAM = "pageview";
private static final String PROFILE_STREAM = "profile";
+ private static final String PROFILE_TABLE = "profile-table";
private static final String ENRICHED_PAGEVIEW_STREAM = "enrichedpageview";
+ private static final SystemStream OUTPUT_SYSTEM_STREAM = new SystemStream(SYSTEM_NAME, ENRICHED_PAGEVIEW_STREAM);
@Test
- public void testJoinWithSideInputsTable() {
+ public void testLowLevelJoinWithSideInputsTable() throws InterruptedException {
+ int partitionCount = 4;
+ IntegerSerde integerSerde = new IntegerSerde();
+ // for low-level, need to pre-partition the input in the same way that the profiles are partitioned
+ Map<Integer, List<PageView>> pageViewsPartitionedByMemberId =
+ TestTableData.generatePartitionedPageViews(20, partitionCount)
+ .values()
+ .stream()
+ .flatMap(List::stream)
+ .collect(Collectors.groupingBy(
+ pageView -> Math.abs(Arrays.hashCode(integerSerde.toBytes(pageView.getMemberId()))) % partitionCount));
runTest(
- "test",
+ new LowLevelPageViewProfileJoin(),
+ pageViewsPartitionedByMemberId,
+ TestTableData.generatePartitionedProfiles(10, partitionCount));
+ }
+
+ @Test
+ public void testJoinWithSideInputsTable() throws InterruptedException {
+ runTest(
new PageViewProfileJoin(),
- Arrays.asList(TestTableData.generatePageViews(10)),
- Arrays.asList(TestTableData.generateProfiles(10)));
+ TestTableData.generatePartitionedPageViews(20, 4),
+ TestTableData.generatePartitionedProfiles(10, 4));
}
@Test
- public void testJoinWithDurableSideInputTable() {
+ public void testJoinWithDurableSideInputTable() throws InterruptedException {
runTest(
- "test",
new DurablePageViewProfileJoin(),
- Arrays.asList(TestTableData.generatePageViews(5)),
- Arrays.asList(TestTableData.generateProfiles(5)));
+ TestTableData.generatePartitionedPageViews(20, 4),
+ TestTableData.generatePartitionedProfiles(10, 4));
}
- private void runTest(String systemName, StreamApplication app, List<PageView> pageViews,
- List<Profile> profiles) {
- Map<String, String> configs = new HashMap<>();
- configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, PAGEVIEW_STREAM), systemName);
- configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, PROFILE_STREAM), systemName);
- configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, ENRICHED_PAGEVIEW_STREAM), systemName);
-
- InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);
-
+ private <T extends ApplicationDescriptor<?>> void runTest(SamzaApplication<T> app,
+ Map<Integer, List<PageView>> pageViews,
+ Map<Integer, List<Profile>> profiles) throws InterruptedException {
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
- .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<PageView>());
-
+ .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>());
InMemoryInputDescriptor<Profile> profileStreamDesc = isd
- .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<Profile>());
-
+ .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
InMemoryOutputDescriptor<EnrichedPageView> outputStreamDesc = isd
- .getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<EnrichedPageView>());
+ .getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>());
- TestRunner
- .of(app)
+ TestRunner.of(app)
.addInputStream(pageViewStreamDesc, pageViews)
.addInputStream(profileStreamDesc, profiles)
.addOutputStream(outputStreamDesc, 1)
- .addConfig(new MapConfig(configs))
- .run(Duration.ofMillis(100000));
+ .run(Duration.ofSeconds(10));
- try {
- Map<Integer, List<EnrichedPageView>> result = TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
- List<EnrichedPageView> results = result.values().stream()
- .flatMap(List::stream)
- .collect(Collectors.toList());
+ List<EnrichedPageView> expectedEnrichedPageViews = buildExpectedEnrichedPageViews(pageViews, profiles);
+ StreamAssert.containsInAnyOrder(expectedEnrichedPageViews, outputStreamDesc, Duration.ofSeconds(1));
+ }
- List<EnrichedPageView> expectedEnrichedPageviews = pageViews.stream()
- .flatMap(pv -> profiles.stream()
- .filter(profile -> pv.memberId == profile.memberId)
- .map(profile -> new EnrichedPageView(pv.pageKey, profile.memberId, profile.company)))
- .collect(Collectors.toList());
+ private static List<EnrichedPageView> buildExpectedEnrichedPageViews(Map<Integer, List<PageView>> pageViews,
+ Map<Integer, List<Profile>> profiles) {
+ ImmutableMap.Builder<Integer, Profile> profilesByMemberIdBuilder = new ImmutableMap.Builder<>();
+ profiles.values()
+ .stream()
+ .flatMap(List::stream)
+ .forEach(profile -> profilesByMemberIdBuilder.put(profile.getMemberId(), profile));
+ Map<Integer, Profile> profilesByMemberId = profilesByMemberIdBuilder.build();
+ ImmutableList.Builder<EnrichedPageView> enrichedPageViewsBuilder = new ImmutableList.Builder<>();
+ pageViews.values()
+ .stream()
+ .flatMap(List::stream)
+ .forEach(pageView -> Optional.ofNullable(profilesByMemberId.get(pageView.getMemberId()))
+ .ifPresent(profile -> enrichedPageViewsBuilder.add(
+ new EnrichedPageView(pageView.getPageKey(), profile.getMemberId(), profile.getCompany()))));
+ return enrichedPageViewsBuilder.build();
+ }
- boolean successfulJoin = results.stream().allMatch(expectedEnrichedPageviews::contains);
- assertEquals("Mismatch between the expected and actual join count", expectedEnrichedPageviews.size(), results.size());
- assertTrue("Pageview profile join did not succeed for all inputs", successfulJoin);
- } catch (SamzaException e) {
- e.printStackTrace();
+ static class LowLevelPageViewProfileJoin implements TaskApplication {
+ @Override
+ public void describe(TaskApplicationDescriptor appDescriptor) {
+ DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor(SYSTEM_NAME);
+ appDescriptor.withInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>()));
+ appDescriptor.withInputStream(sd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>()));
+
+ TableDescriptor<Integer, Profile, ?> tableDescriptor = new InMemoryTableDescriptor<>(PROFILE_TABLE,
+ KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())).withSideInputs(ImmutableList.of(PROFILE_STREAM))
+ .withSideInputsProcessor((msg, store) -> {
+ Profile profile = (Profile) msg.getMessage();
+ int key = profile.getMemberId();
+ return ImmutableList.of(new Entry<>(key, profile));
+ });
+ appDescriptor.withTable(tableDescriptor);
+
+ appDescriptor.withOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>()));
+
+ appDescriptor.withTaskFactory((StreamTaskFactory) PageViewProfileJoinStreamTask::new);
+ }
+ }
+
+ static class PageViewProfileJoinStreamTask implements InitableTask, StreamTask {
+ private ReadWriteTable<Integer, Profile> profileTable;
+
+ @Override
+ public void init(Context context) {
+ this.profileTable = context.getTaskContext().getTable(PROFILE_TABLE);
+ }
+
+ @Override
+ public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+ PageView pageView = (PageView) envelope.getMessage();
+ Profile profile = this.profileTable.get(pageView.getMemberId());
+ if (profile != null) {
+ EnrichedPageView enrichedPageView =
+ new EnrichedPageView(pageView.getPageKey(), profile.getMemberId(), profile.getCompany());
+ collector.send(new OutgoingMessageEnvelope(OUTPUT_SYSTEM_STREAM, enrichedPageView));
+ }
}
}
static class PageViewProfileJoin implements StreamApplication {
- static final String PROFILE_TABLE = "profile-table";
-
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(getTableDescriptor());
- KafkaSystemDescriptor sd =
- new KafkaSystemDescriptor("test");
+ DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor(SYSTEM_NAME);
appDescriptor.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()))
- .partitionBy(TestTableData.PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "partition-page-view")
+ .partitionBy(TestTableData.PageView::getMemberId, v -> v,
+ KVSerde.of(new IntegerSerde(), new TestTableData.PageViewJsonSerde()), "partition-page-view")
.join(table, new PageViewToProfileJoinFunction())
.sendTo(appDescriptor.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>())));
}
protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
- return new InMemoryTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
+ return new InMemoryTableDescriptor<>(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
.withSideInputs(ImmutableList.of(PROFILE_STREAM))
.withSideInputsProcessor((msg, store) -> {
Profile profile = (Profile) msg.getMessage();
@@ -153,7 +215,7 @@
static class DurablePageViewProfileJoin extends PageViewProfileJoin {
@Override
protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
- return new RocksDbTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
+ return new RocksDbTableDescriptor<>(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
.withSideInputs(ImmutableList.of(PROFILE_STREAM))
.withSideInputsProcessor((msg, store) -> {
TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
@@ -162,4 +224,4 @@
});
}
}
-}
\ No newline at end of file
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
index 76c56b0..39f9b02 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
@@ -20,19 +20,29 @@
package org.apache.samza.test.table;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Random;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.SerdeFactory;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
public class TestTableData {
+ private static final IntegerSerde INTEGER_SERDE = new IntegerSerde();
public static class PageView implements Serializable {
@JsonProperty("pageKey")
@@ -205,6 +215,35 @@
return pageviews;
}
+ /**
+ * Create page views and spread out page views with the same member id across different partitions.
+ * Member ids are spread out like this to make sure that partitionBy operators properly repartition the messages.
+ * Member ids are assigned randomly from [0, 10).
+ *
+ * Example
+ * generatePartitionedPageViews(20, 4) will return:
+ * 0 -> page views with member ids [0, 5)
+ * 1 -> page views with member ids [6, 10)
+ * 2 -> page views with member ids [0, 5)
+ * 3 -> page views with member ids [6, 10)
+ */
+ public static Map<Integer, List<PageView>> generatePartitionedPageViews(int numPageViews, int partitionCount) {
+ Preconditions.checkArgument(numPageViews % partitionCount == 0, "partitionCount must divide numPageViews evenly");
+ int numPerPartition = numPageViews / partitionCount;
+ Random random = new Random();
+ ImmutableMap.Builder<Integer, List<PageView>> pageViewsBuilder = new ImmutableMap.Builder<>();
+ for (int i = 0; i < partitionCount; i++) {
+ pageViewsBuilder.put(i, new ArrayList<>());
+ }
+ Map<Integer, List<PageView>> pageViews = pageViewsBuilder.build();
+ for (int i = 0; i < numPageViews; i++) {
+ String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
+ int memberId = i % 10;
+ pageViews.get(i / numPerPartition).add(new PageView(pagekey, memberId));
+ }
+ return pageViews;
+ }
+
static public PageView[] generatePageViewsWithDistinctKeys(int count) {
Random random = new Random();
PageView[] pageviews = new PageView[count];
@@ -227,4 +266,20 @@
return profiles;
}
+ /**
+ * Create profiles and partition them based on the bytes representation of the member id. This uses the bytes
+ * representation for partitioning because this needs to use the same partition function as the InMemorySystemProducer
+ * (which is used in the test framework) so that table joins can be tested.
+ * One profile for each member id in [0, numProfiles) is created.
+ */
+ public static Map<Integer, List<Profile>> generatePartitionedProfiles(int numProfiles, int partitionCount) {
+ Random random = new Random();
+ return IntStream.range(0, numProfiles)
+ .mapToObj(i -> {
+ String company = COMPANIES[random.nextInt(COMPANIES.length - 1)];
+ return new Profile(i, company);
+ })
+ .collect(Collectors.groupingBy(
+ profile -> Math.abs(Arrays.hashCode(INTEGER_SERDE.toBytes(profile.getMemberId()))) % partitionCount));
+ }
}
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index b85e3c5..fb540f7 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -41,12 +41,12 @@
val registry: ReadableMetricsRegistry) extends MetricsHelper with Logging {
private val metricsConfig = new MetricsConfig(config)
- val containersFromPreviousAttempts = newCounter("container-from-previous-attempt")
+ val containersFromPreviousAttempts = newGauge("container-from-previous-attempt", 0L)
val reporters = MetricsReporterLoader.getMetricsReporters(metricsConfig, SamzaAppMasterMetrics.sourceName).asScala
reporters.values.foreach(_.register(SamzaAppMasterMetrics.sourceName, registry))
def setContainersFromPreviousAttempts(containerCount: Int) {
- containersFromPreviousAttempts.inc(containerCount)
+ containersFromPreviousAttempts.set(containerCount)
}
def start() {
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 7e4565b..237667d 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -19,14 +19,11 @@
package org.apache.samza.job.yarn
-import java.lang.Boolean
-
import com.google.common.annotations.VisibleForTesting
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.samza.SamzaException
-import org.apache.samza.classloader.DependencyIsolationUtils
import org.apache.samza.config.{Config, JobConfig, ShellCommandConfig, YarnConfig}
import org.apache.samza.job.ApplicationStatus.{SuccessfulFinish, UnsuccessfulFinish}
import org.apache.samza.job.{ApplicationStatus, StreamJob}
@@ -46,7 +43,7 @@
def submit: YarnJob = {
try {
val jobConfig = new JobConfig(config)
- val cmdExec = YarnJob.buildJobCoordinatorCmd(config, jobConfig)
+ val cmdExec = "./__package/bin/run-jc.sh"
val environment = YarnJob.buildEnvironment(config, this.yarnConfig, jobConfig)
appId = client.submitApplication(
@@ -184,13 +181,6 @@
Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig))
}
envMapBuilder += ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(yarnConfig.getAmOpts)
- val splitDeploymentEnabled = jobConfig.isSplitDeploymentEnabled
- envMapBuilder += ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED -> Util.envVarEscape(Boolean.toString(splitDeploymentEnabled))
- if (splitDeploymentEnabled) {
- //split deployment is enabled, so need to specify where the application lib directory is for app resources
- envMapBuilder += ShellCommandConfig.ENV_APPLICATION_LIB_DIR ->
- Util.envVarEscape(String.format("./%s/lib", DependencyIsolationUtils.APPLICATION_DIRECTORY))
- }
Option.apply(yarnConfig.getAMJavaHome).foreach {
amJavaHome => envMapBuilder += ShellCommandConfig.ENV_JAVA_HOME -> amJavaHome
}
@@ -198,18 +188,4 @@
Util.envVarEscape(config.get(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, ""))
envMapBuilder.result()
}
-
- /**
- * Build the command for the job coordinator execution.
- * Passing multiple separate config objects so that they can be reused in other places.
- */
- @VisibleForTesting
- private[yarn] def buildJobCoordinatorCmd(config: Config, jobConfig: JobConfig): String = {
- var cmdExec = "./__package/bin/run-jc.sh" // default location
- if (jobConfig.isSplitDeploymentEnabled) {
- cmdExec = "./%s/bin/run-jc.sh" format DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY
- logger.info("Using isolated cluster-based job coordinator path: %s" format cmdExec)
- }
- cmdExec
- }
-}
\ No newline at end of file
+}
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
index f068800..4e2c4a7 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
-import org.apache.samza.classloader.DependencyIsolationUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
@@ -38,19 +37,6 @@
public class TestYarnJob {
@Test
- public void testBuildJobCoordinatorCmd() {
- // cluster-based job coordinator dependency isolation is not enabled; use script from __package directory
- Config config = new MapConfig();
- assertEquals("./__package/bin/run-jc.sh", YarnJob$.MODULE$.buildJobCoordinatorCmd(config, new JobConfig(config)));
-
- // split deployment is enabled; use script from framework infrastructure directory
- Config splitDeploymentEnabled =
- new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true"));
- assertEquals(String.format("./%s/bin/run-jc.sh", DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY),
- YarnJob$.MODULE$.buildJobCoordinatorCmd(splitDeploymentEnabled, new JobConfig(splitDeploymentEnabled)));
- }
-
- @Test
public void testBuildEnvironment() throws IOException {
String amJvmOptions = "-Xmx1g -Dconfig.key='config value'";
Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
@@ -58,35 +44,12 @@
.put(JobConfig.JOB_ID, "jobId")
.put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
.put(YarnConfig.AM_JVM_OPTIONS, amJvmOptions) // needs escaping
- .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false")
.build());
String expectedCoordinatorStreamConfigStringValue = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
.writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
Map<String, String> expected = ImmutableMap.of(
ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue,
ShellCommandConfig.ENV_JAVA_OPTS, Util.envVarEscape(amJvmOptions),
- ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false",
- ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
- assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
- YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());
- }
-
- @Test
- public void testBuildEnvironmentJobCoordinatorDependencyIsolationEnabled() throws IOException {
- Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
- .put(JobConfig.JOB_NAME, "jobName")
- .put(JobConfig.JOB_ID, "jobId")
- .put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
- .put(YarnConfig.AM_JVM_OPTIONS, "")
- .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
- .build());
- String expectedCoordinatorStreamConfigStringValue = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
- .writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
- Map<String, String> expected = ImmutableMap.of(
- ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue,
- ShellCommandConfig.ENV_JAVA_OPTS, "",
- ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
- ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib",
ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());
@@ -99,7 +62,6 @@
.put(JobConfig.JOB_ID, "jobId")
.put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
.put(YarnConfig.AM_JVM_OPTIONS, "")
- .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false")
.put(YarnConfig.AM_JAVA_HOME, "/some/path/to/java/home")
.build());
String expectedCoordinatorStreamConfigStringValue = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
@@ -107,7 +69,6 @@
Map<String, String> expected = ImmutableMap.of(
ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue,
ShellCommandConfig.ENV_JAVA_OPTS, "",
- ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false",
ShellCommandConfig.ENV_JAVA_HOME, "/some/path/to/java/home",
ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
@@ -121,15 +82,12 @@
.put(JobConfig.JOB_ID, "jobId")
.put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory")
.put(YarnConfig.AM_JVM_OPTIONS, "")
- .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
.build());
String expectedSubmissionConfig = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
.writeValueAsString(config));
Map<String, String> expected = ImmutableMap.of(
ShellCommandConfig.ENV_SUBMISSION_CONFIG, expectedSubmissionConfig,
ShellCommandConfig.ENV_JAVA_OPTS, "",
- ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
- ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib",
ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());
@@ -142,7 +100,6 @@
.put(JobConfig.JOB_ID, "jobId")
.put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory")
.put(YarnConfig.AM_JVM_OPTIONS, "")
- .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
.put(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, "./sqlapp/lib/*")
.build());
String expectedSubmissionConfig = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
@@ -150,8 +107,6 @@
Map<String, String> expected = ImmutableMap.of(
ShellCommandConfig.ENV_SUBMISSION_CONFIG, expectedSubmissionConfig,
ShellCommandConfig.ENV_JAVA_OPTS, "",
- ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
- ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib",
ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "./sqlapp/lib/*");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());