blob: b08ec1758712a957874ae9b76384f36f195844d2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.kudu;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kudu.client.ExternalConsistencyMode;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.SessionConfiguration;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* <p>
* Represents a connection to the Kudu cluster. An instance of this class is to
* be supplied (via a builder pattern to) {@link AbstractKuduOutputOperator} to
* connect to a Kudu cluster.
* </p>
*
* @since 3.8.0
*/
public class ApexKuduConnection implements AutoCloseable, Serializable
{
private static final long serialVersionUID = 4720185362997969198L;
private transient KuduSession kuduSession;
private transient KuduTable kuduTable;
private transient KuduClient kuduClient;
public static final Logger LOG = LoggerFactory.getLogger(ApexKuduConnection.class);
private ApexKuduConnectionBuilder builderForThisConnection;
private ApexKuduConnection(ApexKuduConnectionBuilder builder)
{
checkNotNull(builder,"Builder cannot be null to establish kudu session");
checkArgument(builder.mastersCollection.size() > 0, "Atleast one kudu master needs to be specified");
checkNotNull(builder.tableName,"Kudu table cannot be null");
builderForThisConnection = builder;
KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(builder.mastersCollection);
if (builder.isOperationTimeOutSet) {
kuduClientBuilder.defaultOperationTimeoutMs(builder.operationTimeOutMs);
}
if (builder.isBossThreadCountSet) {
kuduClientBuilder.bossCount(builder.numBossThreads);
}
if (builder.isWorkerThreadsCountSet) {
kuduClientBuilder.workerCount(builder.workerThreads);
}
if (builder.isSocketReadTimeOutSet) {
kuduClientBuilder.defaultSocketReadTimeoutMs(builder.socketReadTimeOutMs);
}
kuduClient = kuduClientBuilder.build();
kuduSession = kuduClient.newSession();
if (builder.isFlushModeSet) {
kuduSession.setFlushMode(builder.flushMode);
}
if (builder.isExternalConsistencyModeSet) {
kuduSession.setExternalConsistencyMode(builder.externalConsistencyMode);
}
try {
if (!kuduClient.tableExists(builder.tableName)) {
throw new Exception("Table " + builder.tableName + " does not exist. ");
} else {
kuduTable = kuduClient.openTable(builder.tableName);
}
} catch (Exception e) {
throw new RuntimeException("Kudu table existence could not be ascertained " + e.getMessage(), e);
}
}
@Override
public void close() throws Exception
{
kuduSession.close();
kuduClient.close();
}
public KuduSession getKuduSession()
{
return kuduSession;
}
public void setKuduSession(KuduSession kuduSession)
{
this.kuduSession = kuduSession;
}
public KuduTable getKuduTable()
{
return kuduTable;
}
public void setKuduTable(KuduTable kuduTable)
{
this.kuduTable = kuduTable;
}
public KuduClient getKuduClient()
{
return kuduClient;
}
public void setKuduClient(KuduClient kuduClient)
{
this.kuduClient = kuduClient;
}
public ApexKuduConnectionBuilder getBuilderForThisConnection()
{
return builderForThisConnection;
}
public void setBuilderForThisConnection(ApexKuduConnectionBuilder builderForThisConnection)
{
this.builderForThisConnection = builderForThisConnection;
}
public static class ApexKuduConnectionBuilder implements Serializable
{
private static final long serialVersionUID = -3428649955056723311L;
List<String> mastersCollection = new ArrayList<>();
String tableName;
// optional props
int numBossThreads = 1;
boolean isBossThreadCountSet = false;
int workerThreads = 2 * Runtime.getRuntime().availableProcessors();
boolean isWorkerThreadsCountSet = false;
long socketReadTimeOutMs = 10000;
boolean isSocketReadTimeOutSet = false;
long operationTimeOutMs = 30000;
boolean isOperationTimeOutSet = false;
ExternalConsistencyMode externalConsistencyMode;
boolean isExternalConsistencyModeSet = false;
SessionConfiguration.FlushMode flushMode;
boolean isFlushModeSet = false;
public ApexKuduConnectionBuilder withTableName(String tableName)
{
this.tableName = tableName;
return this;
}
public ApexKuduConnectionBuilder withAPossibleMasterHostAs(String masterHostAndPort)
{
mastersCollection.add(masterHostAndPort);
return this;
}
public ApexKuduConnectionBuilder withNumberOfBossThreads(int numberOfBossThreads)
{
this.numBossThreads = numberOfBossThreads;
isBossThreadCountSet = true;
return this;
}
public ApexKuduConnectionBuilder withNumberOfWorkerThreads(int numberOfWorkerThreads)
{
this.workerThreads = numberOfWorkerThreads;
isWorkerThreadsCountSet = true;
return this;
}
public ApexKuduConnectionBuilder withSocketReadTimeOutAs(long socketReadTimeOut)
{
this.socketReadTimeOutMs = socketReadTimeOut;
isSocketReadTimeOutSet = true;
return this;
}
public ApexKuduConnectionBuilder withOperationTimeOutAs(long operationTimeOut)
{
this.operationTimeOutMs = operationTimeOut;
isOperationTimeOutSet = true;
return this;
}
public ApexKuduConnectionBuilder withExternalConsistencyMode(ExternalConsistencyMode externalConsistencyMode)
{
this.externalConsistencyMode = externalConsistencyMode;
isExternalConsistencyModeSet = true;
return this;
}
public ApexKuduConnectionBuilder withFlushMode(SessionConfiguration.FlushMode flushMode)
{
this.flushMode = flushMode;
isFlushModeSet = true;
return this;
}
public ApexKuduConnection build()
{
ApexKuduConnection apexKuduConnection = new ApexKuduConnection(this);
return apexKuduConnection;
}
}
}