blob: ee515846b1a878f156bcfbad2526253667aad52e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.eventhub;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.impl.ClientConstants;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.impl.RetryExponential;
import com.microsoft.azure.eventhubs.RetryPolicy;
import com.microsoft.azure.eventhubs.EventHubException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.samza.SamzaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class SamzaEventHubClientManager implements EventHubClientManager {
private static final Logger LOG = LoggerFactory.getLogger(SamzaEventHubClientManager.class.getName());
private static final String EVENTHUB_REMOTE_HOST_FORMAT = "%s.servicebus.windows.net";
private static final Duration MIN_RETRY_BACKOFF = Duration.ofMillis(100);
private static final Duration MAX_RETRY_BACKOFF = Duration.ofMillis(11000);
private static final int MAX_RETRY_COUNT = 100;
private static final String SAMZA_EVENTHUB_RETRY = "SAMZA_CONNECTOR_RETRY";
private final int numClientThreads;
private EventHubClient eventHubClient;
private final String eventHubNamespace;
private final String entityPath;
private final String sasKeyName;
private final String sasKey;
private final RetryPolicy retryPolicy;
private ExecutorService eventHubClientExecutor;
public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey,
Integer numClientThreads) {
this(eventHubNamespace, entityPath, sasKeyName, sasKey,
new RetryExponential(MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF, MAX_RETRY_COUNT, SAMZA_EVENTHUB_RETRY), numClientThreads);
}
public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey,
RetryPolicy retryPolicy, int numClientThreads) {
this.eventHubNamespace = eventHubNamespace;
this.entityPath = entityPath;
this.sasKeyName = sasKeyName;
this.sasKey = sasKey;
this.retryPolicy = retryPolicy;
this.numClientThreads = numClientThreads;
}
@Override
public void init() {
String remoteHost = String.format(EVENTHUB_REMOTE_HOST_FORMAT, eventHubNamespace);
LOG.info("Initializing SamzaEventHubClientManager for namespace: " + eventHubNamespace);
try {
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder()
.setNamespaceName(eventHubNamespace)
.setEventHubName(entityPath)
.setSasKeyName(sasKeyName)
.setSasKey(sasKey);
ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder().setNameFormat("Samza EventHubClient Thread-%d").setDaemon(true);
eventHubClientExecutor = Executors.newFixedThreadPool(numClientThreads, threadFactoryBuilder.build());
eventHubClient = EventHubClient.createSync(connectionStringBuilder.toString(), retryPolicy, eventHubClientExecutor);
} catch (IOException | EventHubException e) {
String msg = String.format("Creation of EventHub client failed for eventHub EntityPath: %s on remote host %s:%d",
entityPath, remoteHost, ClientConstants.AMQPS_PORT);
LOG.error(msg, e);
throw new SamzaException(msg, e);
}
LOG.info("SamzaEventHubClientManager initialized for namespace: " + eventHubNamespace);
}
@Override
public EventHubClient getEventHubClient() {
return eventHubClient;
}
@Override
public void close(long timeoutMS) {
try {
if (timeoutMS == EventHubClientManager.BLOCK_UNTIL_CLOSE) {
eventHubClient.closeSync();
eventHubClientExecutor.shutdown();
} else {
CompletableFuture<Void> future = eventHubClient.close();
future.get(timeoutMS, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
LOG.error("Closing the EventHub client failed", e);
}
}
}