blob: acba314ebf0794ac76aed431752ac152c85d5410 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Text.Json;
using System.Text.RegularExpressions;
using Apache.Arrow.Adbc.Extensions;
using Apache.Arrow.Ipc;
using Apache.Arrow.Types;
using Google.Api.Gax;
using Google.Apis.Auth.OAuth2;
using Google.Apis.Bigquery.v2.Data;
using Google.Cloud.BigQuery.V2;
namespace Apache.Arrow.Adbc.Drivers.BigQuery
{
/// <summary>
/// BigQuery-specific implementation of <see cref="AdbcConnection"/>
/// </summary>
public class BigQueryConnection : AdbcConnection
{
readonly IReadOnlyDictionary<string, string> properties;
BigQueryClient? client;
GoogleCredential? credential;
const string infoDriverName = "ADBC BigQuery Driver";
const string infoDriverVersion = "1.0.0";
const string infoVendorName = "BigQuery";
const string infoDriverArrowVersion = "1.0.0";
readonly AdbcInfoCode[] infoSupportedCodes = new [] {
AdbcInfoCode.DriverName,
AdbcInfoCode.DriverVersion,
AdbcInfoCode.DriverArrowVersion,
AdbcInfoCode.VendorName
};
public BigQueryConnection(IReadOnlyDictionary<string, string> properties)
{
this.properties = properties;
// add the default value for now and set to true until C# has a BigDecimal
Dictionary<string, string> modifiedProperties = this.properties.ToDictionary(k => k.Key, v => v.Value);
modifiedProperties[BigQueryParameters.LargeDecimalsAsString] = BigQueryConstants.TreatLargeDecimalAsString;
this.properties = new ReadOnlyDictionary<string, string>(modifiedProperties);
}
/// <summary>
/// Initializes the internal BigQuery connection
/// </summary>
/// <exception cref="ArgumentException"></exception>
internal BigQueryClient Open()
{
string? projectId = null;
string? clientId = null;
string? clientSecret = null;
string? refreshToken = null;
string tokenEndpoint = BigQueryConstants.TokenEndpoint;
string? authenticationType = BigQueryConstants.UserAuthenticationType;
// TODO: handle token expiration
if (!this.properties.TryGetValue(BigQueryParameters.ProjectId, out projectId))
throw new ArgumentException($"The {BigQueryParameters.ProjectId} parameter is not present");
if (this.properties.TryGetValue(BigQueryParameters.AuthenticationType, out string? newAuthenticationType))
{
if (!string.IsNullOrEmpty(newAuthenticationType))
authenticationType = newAuthenticationType;
if (!authenticationType.Equals(BigQueryConstants.UserAuthenticationType, StringComparison.OrdinalIgnoreCase) &&
!authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType, StringComparison.OrdinalIgnoreCase))
{
throw new ArgumentException($"The {BigQueryParameters.AuthenticationType} parameter can only be `{BigQueryConstants.UserAuthenticationType}` or `{BigQueryConstants.ServiceAccountAuthenticationType}`");
}
}
if (!string.IsNullOrEmpty(authenticationType) && authenticationType.Equals(BigQueryConstants.UserAuthenticationType, StringComparison.OrdinalIgnoreCase))
{
if (!this.properties.TryGetValue(BigQueryParameters.ClientId, out clientId))
throw new ArgumentException($"The {BigQueryParameters.ClientId} parameter is not present");
if (!this.properties.TryGetValue(BigQueryParameters.ClientSecret, out clientSecret))
throw new ArgumentException($"The {BigQueryParameters.ClientSecret} parameter is not present");
if (!this.properties.TryGetValue(BigQueryParameters.RefreshToken, out refreshToken))
throw new ArgumentException($"The {BigQueryParameters.RefreshToken} parameter is not present");
this.credential = ApplyScopes(GoogleCredential.FromAccessToken(GetAccessToken(clientId, clientSecret, refreshToken, tokenEndpoint)));
}
else
{
string? json = string.Empty;
if (!this.properties.TryGetValue(BigQueryParameters.JsonCredential, out json))
throw new ArgumentException($"The {BigQueryParameters.JsonCredential} parameter is not present");
this.credential = ApplyScopes(GoogleCredential.FromJson(json));
}
BigQueryClient client = BigQueryClient.Create(projectId, this.credential);
this.client = client;
return client;
}
/// <summary>
/// Apply any additional scopes to the credential.
/// </summary>
/// <param name="credential"><see cref="GoogleCredential"/></param>
/// <returns></returns>
private GoogleCredential ApplyScopes(GoogleCredential credential)
{
if (credential == null) throw new ArgumentNullException(nameof(credential));
if (this.properties.TryGetValue(BigQueryParameters.Scopes, out string? scopes))
{
if (!string.IsNullOrEmpty(scopes))
{
IEnumerable<string> parsedScopes = scopes.Split(',').Where(x => x.Length > 0);
return credential.CreateScoped(parsedScopes);
}
}
return credential;
}
public override IArrowArrayStream GetInfo(IReadOnlyList<AdbcInfoCode> codes)
{
const int strValTypeID = 0;
UnionType infoUnionType = new UnionType(
new Field[]
{
new Field("string_value", StringType.Default, true),
new Field("bool_value", BooleanType.Default, true),
new Field("int64_value", Int64Type.Default, true),
new Field("int32_bitmask", Int32Type.Default, true),
new Field(
"string_list",
new ListType(
new Field("item", StringType.Default, true)
),
false
),
new Field(
"int32_to_int32_list_map",
new ListType(
new Field("entries", new StructType(
new Field[]
{
new Field("key", Int32Type.Default, false),
new Field("value", Int32Type.Default, true),
}
), false)
),
true
)
},
new int[] { 0, 1, 2, 3, 4, 5 }.ToArray(),
UnionMode.Dense);
if (codes.Count == 0)
{
codes = infoSupportedCodes;
}
UInt32Array.Builder infoNameBuilder = new UInt32Array.Builder();
ArrowBuffer.Builder<byte> typeBuilder = new ArrowBuffer.Builder<byte>();
ArrowBuffer.Builder<int> offsetBuilder = new ArrowBuffer.Builder<int>();
StringArray.Builder stringInfoBuilder = new StringArray.Builder();
int nullCount = 0;
int arrayLength = codes.Count;
foreach (AdbcInfoCode code in codes)
{
switch (code)
{
case AdbcInfoCode.DriverName:
infoNameBuilder.Append((UInt32)code);
typeBuilder.Append(strValTypeID);
offsetBuilder.Append(stringInfoBuilder.Length);
stringInfoBuilder.Append(infoDriverName);
break;
case AdbcInfoCode.DriverVersion:
infoNameBuilder.Append((UInt32)code);
typeBuilder.Append(strValTypeID);
offsetBuilder.Append(stringInfoBuilder.Length);
stringInfoBuilder.Append(infoDriverVersion);
break;
case AdbcInfoCode.DriverArrowVersion:
infoNameBuilder.Append((UInt32)code);
typeBuilder.Append(strValTypeID);
offsetBuilder.Append(stringInfoBuilder.Length);
stringInfoBuilder.Append(infoDriverArrowVersion);
break;
case AdbcInfoCode.VendorName:
infoNameBuilder.Append((UInt32)code);
typeBuilder.Append(strValTypeID);
offsetBuilder.Append(stringInfoBuilder.Length);
stringInfoBuilder.Append(infoVendorName);
break;
default:
infoNameBuilder.Append((UInt32)code);
typeBuilder.Append(strValTypeID);
offsetBuilder.Append(stringInfoBuilder.Length);
stringInfoBuilder.AppendNull();
nullCount++;
break;
}
}
StructType entryType = new StructType(
new Field[] {
new Field("key", Int32Type.Default, false),
new Field("value", Int32Type.Default, true)});
StructArray entriesDataArray = new StructArray(entryType, 0,
new[] { new Int32Array.Builder().Build(), new Int32Array.Builder().Build() },
new ArrowBuffer.BitmapBuilder().Build());
IArrowArray[] childrenArrays = new IArrowArray[]
{
stringInfoBuilder.Build(),
new BooleanArray.Builder().Build(),
new Int64Array.Builder().Build(),
new Int32Array.Builder().Build(),
new ListArray.Builder(StringType.Default).Build(),
new List<IArrowArray?>(){ entriesDataArray }.BuildListArrayForType(entryType)
};
DenseUnionArray infoValue = new DenseUnionArray(infoUnionType, arrayLength, childrenArrays, typeBuilder.Build(), offsetBuilder.Build(), nullCount);
IArrowArray[] dataArrays = new IArrowArray[]
{
infoNameBuilder.Build(),
infoValue
};
StandardSchemas.GetInfoSchema.Validate(dataArrays);
return new BigQueryInfoArrowStream(StandardSchemas.GetInfoSchema, dataArrays);
}
public override IArrowArrayStream GetObjects(
GetObjectsDepth depth,
string? catalogPattern,
string? dbSchemaPattern,
string? tableNamePattern,
IReadOnlyList<string>? tableTypes,
string? columnNamePattern)
{
IArrowArray[] dataArrays = GetCatalogs(depth, catalogPattern, dbSchemaPattern,
tableNamePattern, tableTypes, columnNamePattern);
return new BigQueryInfoArrowStream(StandardSchemas.GetObjectsSchema, dataArrays);
}
/// <summary>
/// Executes the query using the BigQueryClient.
/// </summary>
/// <param name="sql">The query to execute.</param>
/// <param name="parameters">Parameters to include.</param>
/// <param name="queryOptions">Additional query options.</param>
/// <param name="resultsOptions">Additional result options.</param>
/// <returns></returns>
/// <remarks>
/// Can later add logging or metrics around query calls.
/// </remarks>
private BigQueryResults? ExecuteQuery(string sql, IEnumerable<BigQueryParameter>? parameters, QueryOptions? queryOptions = null, GetQueryResultsOptions? resultsOptions = null)
{
BigQueryResults? result = this.client?.ExecuteQuery(sql, parameters, queryOptions, resultsOptions);
return result;
}
private IArrowArray[] GetCatalogs(
GetObjectsDepth depth,
string? catalogPattern,
string? dbSchemaPattern,
string? tableNamePattern,
IReadOnlyList<string>? tableTypes,
string? columnNamePattern)
{
StringArray.Builder catalogNameBuilder = new StringArray.Builder();
List<IArrowArray?> catalogDbSchemasValues = new List<IArrowArray?>();
string catalogRegexp = PatternToRegEx(catalogPattern);
PagedEnumerable<ProjectList, CloudProject>? catalogs = this.client?.ListProjects();
if (catalogs != null)
{
foreach (CloudProject catalog in catalogs)
{
if (Regex.IsMatch(catalog.ProjectId, catalogRegexp, RegexOptions.IgnoreCase))
{
catalogNameBuilder.Append(catalog.ProjectId);
if (depth == GetObjectsDepth.Catalogs)
{
catalogDbSchemasValues.Add(null);
}
else
{
catalogDbSchemasValues.Add(GetDbSchemas(
depth, catalog.ProjectId, dbSchemaPattern,
tableNamePattern, tableTypes, columnNamePattern));
}
}
}
}
IArrowArray[] dataArrays = new IArrowArray[]
{
catalogNameBuilder.Build(),
catalogDbSchemasValues.BuildListArrayForType(new StructType(StandardSchemas.DbSchemaSchema)),
};
StandardSchemas.GetObjectsSchema.Validate(dataArrays);
return dataArrays;
}
private StructArray GetDbSchemas(
GetObjectsDepth depth,
string catalog,
string? dbSchemaPattern,
string? tableNamePattern,
IReadOnlyList<string>? tableTypes,
string? columnNamePattern)
{
StringArray.Builder dbSchemaNameBuilder = new StringArray.Builder();
List<IArrowArray?> dbSchemaTablesValues = new List<IArrowArray?>();
ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
int length = 0;
string dbSchemaRegexp = PatternToRegEx(dbSchemaPattern);
PagedEnumerable<DatasetList, BigQueryDataset>? schemas = this.client?.ListDatasets(catalog);
if (schemas != null)
{
foreach (BigQueryDataset schema in schemas)
{
if (Regex.IsMatch(schema.Reference.DatasetId, dbSchemaRegexp, RegexOptions.IgnoreCase))
{
dbSchemaNameBuilder.Append(schema.Reference.DatasetId);
length++;
nullBitmapBuffer.Append(true);
if (depth == GetObjectsDepth.DbSchemas)
{
dbSchemaTablesValues.Add(null);
}
else
{
dbSchemaTablesValues.Add(GetTableSchemas(
depth, catalog, schema.Reference.DatasetId,
tableNamePattern, tableTypes, columnNamePattern));
}
}
}
}
IArrowArray[] dataArrays = new IArrowArray[]
{
dbSchemaNameBuilder.Build(),
dbSchemaTablesValues.BuildListArrayForType(new StructType(StandardSchemas.TableSchema)),
};
StandardSchemas.DbSchemaSchema.Validate(dataArrays);
return new StructArray(
new StructType(StandardSchemas.DbSchemaSchema),
length,
dataArrays,
nullBitmapBuffer.Build());
}
private StructArray GetTableSchemas(
GetObjectsDepth depth,
string catalog,
string dbSchema,
string? tableNamePattern,
IReadOnlyList<string>? tableTypes,
string? columnNamePattern)
{
StringArray.Builder tableNameBuilder = new StringArray.Builder();
StringArray.Builder tableTypeBuilder = new StringArray.Builder();
List<IArrowArray?> tableColumnsValues = new List<IArrowArray?>();
List<IArrowArray?> tableConstraintsValues = new List<IArrowArray?>();
ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
int length = 0;
string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.TABLES",
Sanitize(catalog), Sanitize(dbSchema));
if (tableNamePattern != null)
{
query = string.Concat(query, string.Format(" WHERE table_name LIKE '{0}'", Sanitize(tableNamePattern)));
if (tableTypes?.Count > 0)
{
IEnumerable<string> sanitizedTypes = tableTypes.Select(x => Sanitize(x));
query = string.Concat(query, string.Format(" AND table_type IN ('{0}')", string.Join("', '", sanitizedTypes).ToUpper()));
}
}
else
{
if (tableTypes?.Count > 0)
{
IEnumerable<string> sanitizedTypes = tableTypes.Select(x => Sanitize(x));
query = string.Concat(query, string.Format(" WHERE table_type IN ('{0}')", string.Join("', '", sanitizedTypes).ToUpper()));
}
}
BigQueryResults? result = ExecuteQuery(query, parameters: null);
if (result != null)
{
bool includeConstraints = true;
if (this.properties.TryGetValue(BigQueryParameters.IncludeConstraintsWithGetObjects, out string? includeConstraintsValue))
{
bool.TryParse(includeConstraintsValue, out includeConstraints);
}
foreach (BigQueryRow row in result)
{
tableNameBuilder.Append(GetValue(row["table_name"]));
tableTypeBuilder.Append(GetValue(row["table_type"]));
nullBitmapBuffer.Append(true);
length++;
if (includeConstraints)
{
tableConstraintsValues.Add(GetConstraintSchema(
depth, catalog, dbSchema, GetValue(row["table_name"]), columnNamePattern));
}
else
{
tableConstraintsValues.Add(null);
}
if (depth == GetObjectsDepth.Tables)
{
tableColumnsValues.Add(null);
}
else
{
tableColumnsValues.Add(GetColumnSchema(catalog, dbSchema, GetValue(row["table_name"]), columnNamePattern));
}
}
}
IArrowArray[] dataArrays = new IArrowArray[]
{
tableNameBuilder.Build(),
tableTypeBuilder.Build(),
tableColumnsValues.BuildListArrayForType(new StructType(StandardSchemas.ColumnSchema)),
tableConstraintsValues.BuildListArrayForType(new StructType(StandardSchemas.ConstraintSchema))
};
StandardSchemas.TableSchema.Validate(dataArrays);
return new StructArray(
new StructType(StandardSchemas.TableSchema),
length,
dataArrays,
nullBitmapBuffer.Build());
}
private StructArray GetColumnSchema(
string catalog,
string dbSchema,
string table,
string? columnNamePattern)
{
StringArray.Builder columnNameBuilder = new StringArray.Builder();
Int32Array.Builder ordinalPositionBuilder = new Int32Array.Builder();
StringArray.Builder remarksBuilder = new StringArray.Builder();
Int16Array.Builder xdbcDataTypeBuilder = new Int16Array.Builder();
StringArray.Builder xdbcTypeNameBuilder = new StringArray.Builder();
Int32Array.Builder xdbcColumnSizeBuilder = new Int32Array.Builder();
Int16Array.Builder xdbcDecimalDigitsBuilder = new Int16Array.Builder();
Int16Array.Builder xdbcNumPrecRadixBuilder = new Int16Array.Builder();
Int16Array.Builder xdbcNullableBuilder = new Int16Array.Builder();
StringArray.Builder xdbcColumnDefBuilder = new StringArray.Builder();
Int16Array.Builder xdbcSqlDataTypeBuilder = new Int16Array.Builder();
Int16Array.Builder xdbcDatetimeSubBuilder = new Int16Array.Builder();
Int32Array.Builder xdbcCharOctetLengthBuilder = new Int32Array.Builder();
StringArray.Builder xdbcIsNullableBuilder = new StringArray.Builder();
StringArray.Builder xdbcScopeCatalogBuilder = new StringArray.Builder();
StringArray.Builder xdbcScopeSchemaBuilder = new StringArray.Builder();
StringArray.Builder xdbcScopeTableBuilder = new StringArray.Builder();
BooleanArray.Builder xdbcIsAutoincrementBuilder = new BooleanArray.Builder();
BooleanArray.Builder xdbcIsGeneratedcolumnBuilder = new BooleanArray.Builder();
ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
int length = 0;
string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{2}'",
Sanitize(catalog), Sanitize(dbSchema), Sanitize(table));
if (columnNamePattern != null)
{
query = string.Concat(query, string.Format("AND column_name LIKE '{0}'", Sanitize(columnNamePattern)));
}
BigQueryResults? result = ExecuteQuery(query, parameters: null);
if (result != null)
{
foreach (BigQueryRow row in result)
{
columnNameBuilder.Append(GetValue(row["column_name"]));
ordinalPositionBuilder.Append((int)(long)row["ordinal_position"]);
remarksBuilder.Append("");
string dataType = ToTypeName(GetValue(row["data_type"]));
if (dataType.StartsWith("NUMERIC") || dataType.StartsWith("DECIMAL") || dataType.StartsWith("BIGNUMERIC") || dataType.StartsWith("BIGDECIMAL"))
{
ParsedDecimalValues values = ParsePrecisionAndScale(dataType);
xdbcColumnSizeBuilder.Append(values.Precision);
xdbcDecimalDigitsBuilder.Append(Convert.ToInt16(values.Scale));
}
else
{
xdbcColumnSizeBuilder.AppendNull();
xdbcDecimalDigitsBuilder.AppendNull();
}
xdbcDataTypeBuilder.AppendNull();
xdbcTypeNameBuilder.Append(dataType);
xdbcNumPrecRadixBuilder.AppendNull();
xdbcNullableBuilder.AppendNull();
xdbcColumnDefBuilder.AppendNull();
xdbcSqlDataTypeBuilder.Append((short)ToXdbcDataType(dataType));
xdbcDatetimeSubBuilder.AppendNull();
xdbcCharOctetLengthBuilder.AppendNull();
xdbcIsNullableBuilder.Append(row["is_nullable"].ToString());
xdbcScopeCatalogBuilder.AppendNull();
xdbcScopeSchemaBuilder.AppendNull();
xdbcScopeTableBuilder.AppendNull();
xdbcIsAutoincrementBuilder.AppendNull();
xdbcIsGeneratedcolumnBuilder.Append(GetValue(row["is_generated"]).ToUpper() == "YES");
nullBitmapBuffer.Append(true);
length++;
}
}
IArrowArray[] dataArrays = new IArrowArray[]
{
columnNameBuilder.Build(),
ordinalPositionBuilder.Build(),
remarksBuilder.Build(),
xdbcDataTypeBuilder.Build(),
xdbcTypeNameBuilder.Build(),
xdbcColumnSizeBuilder.Build(),
xdbcDecimalDigitsBuilder.Build(),
xdbcNumPrecRadixBuilder.Build(),
xdbcNullableBuilder.Build(),
xdbcColumnDefBuilder.Build(),
xdbcSqlDataTypeBuilder.Build(),
xdbcDatetimeSubBuilder.Build(),
xdbcCharOctetLengthBuilder.Build(),
xdbcIsNullableBuilder.Build(),
xdbcScopeCatalogBuilder.Build(),
xdbcScopeSchemaBuilder.Build(),
xdbcScopeTableBuilder.Build(),
xdbcIsAutoincrementBuilder.Build(),
xdbcIsGeneratedcolumnBuilder.Build()
};
StandardSchemas.ColumnSchema.Validate(dataArrays);
return new StructArray(
new StructType(StandardSchemas.ColumnSchema),
length,
dataArrays,
nullBitmapBuffer.Build());
}
private StructArray GetConstraintSchema(
GetObjectsDepth depth,
string catalog,
string dbSchema,
string table,
string? columnNamePattern)
{
StringArray.Builder constraintNameBuilder = new StringArray.Builder();
StringArray.Builder constraintTypeBuilder = new StringArray.Builder();
List<IArrowArray?> constraintColumnNamesValues = new List<IArrowArray?>();
List<IArrowArray?> constraintColumnUsageValues = new List<IArrowArray?>();
ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
int length = 0;
string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE table_name = '{2}'",
Sanitize(catalog), Sanitize(dbSchema), Sanitize(table));
BigQueryResults? result = ExecuteQuery(query, parameters: null);
if (result != null)
{
foreach (BigQueryRow row in result)
{
string constraintName = GetValue(row["constraint_name"]);
constraintNameBuilder.Append(constraintName);
string constraintType = GetValue(row["constraint_type"]);
constraintTypeBuilder.Append(constraintType);
nullBitmapBuffer.Append(true);
length++;
if (depth == GetObjectsDepth.All || depth == GetObjectsDepth.Tables)
{
constraintColumnNamesValues.Add(GetConstraintColumnNames(
catalog, dbSchema, table, constraintName));
if (constraintType.ToUpper() == "FOREIGN KEY")
{
constraintColumnUsageValues.Add(GetConstraintsUsage(
catalog, dbSchema, table, constraintName));
}
else
{
constraintColumnUsageValues.Add(null);
}
}
else
{
constraintColumnNamesValues.Add(null);
constraintColumnUsageValues.Add(null);
}
}
}
IArrowArray[] dataArrays = new IArrowArray[]
{
constraintNameBuilder.Build(),
constraintTypeBuilder.Build(),
constraintColumnNamesValues.BuildListArrayForType(StringType.Default),
constraintColumnUsageValues.BuildListArrayForType(new StructType(StandardSchemas.UsageSchema))
};
StandardSchemas.ConstraintSchema.Validate(dataArrays);
return new StructArray(
new StructType(StandardSchemas.ConstraintSchema),
length,
dataArrays,
nullBitmapBuffer.Build());
}
private StringArray GetConstraintColumnNames(
string catalog,
string dbSchema,
string table,
string constraintName)
{
string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE table_name = '{2}' AND constraint_name = '{3}' ORDER BY ordinal_position",
Sanitize(catalog), Sanitize(dbSchema), Sanitize(table), Sanitize(constraintName));
StringArray.Builder constraintColumnNamesBuilder = new StringArray.Builder();
BigQueryResults? result = ExecuteQuery(query, parameters: null);
if (result != null)
{
foreach (BigQueryRow row in result)
{
string column = GetValue(row["column_name"]);
constraintColumnNamesBuilder.Append(column);
}
}
return constraintColumnNamesBuilder.Build();
}
private StructArray GetConstraintsUsage(
string catalog,
string dbSchema,
string table,
string constraintName)
{
StringArray.Builder constraintFkCatalogBuilder = new StringArray.Builder();
StringArray.Builder constraintFkDbSchemaBuilder = new StringArray.Builder();
StringArray.Builder constraintFkTableBuilder = new StringArray.Builder();
StringArray.Builder constraintFkColumnNameBuilder = new StringArray.Builder();
ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
int length = 0;
string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE WHERE constraint_name = '{2}'",
Sanitize(catalog), Sanitize(dbSchema), Sanitize(constraintName));
BigQueryResults? result = ExecuteQuery(query, parameters: null);
if (result != null)
{
foreach (BigQueryRow row in result)
{
string constraint_catalog = GetValue(row["constraint_catalog"]);
string constraint_schema = GetValue(row["constraint_schema"]);
string table_name = GetValue(row["table_name"]);
string column_name = GetValue(row["column_name"]);
constraintFkCatalogBuilder.Append(constraint_catalog);
constraintFkDbSchemaBuilder.Append(constraint_schema);
constraintFkTableBuilder.Append(table_name);
constraintFkColumnNameBuilder.Append(column_name);
nullBitmapBuffer.Append(true);
length++;
}
}
IArrowArray[] dataArrays = new IArrowArray[]
{
constraintFkCatalogBuilder.Build(),
constraintFkDbSchemaBuilder.Build(),
constraintFkTableBuilder.Build(),
constraintFkColumnNameBuilder.Build()
};
StandardSchemas.UsageSchema.Validate(dataArrays);
return new StructArray(
new StructType(StandardSchemas.UsageSchema),
length,
dataArrays,
nullBitmapBuffer.Build());
}
private string PatternToRegEx(string? pattern)
{
if (pattern == null)
return ".*";
StringBuilder builder = new StringBuilder("(?i)^");
string convertedPattern = pattern.Replace("_", ".").Replace("%", ".*");
builder.Append(convertedPattern);
builder.Append("$");
return builder.ToString();
}
private string ToTypeName(string type)
{
int index = Math.Min(type.IndexOf("("), type.IndexOf("<"));
string dataType = index == -1 ? type : type.Substring(0, index);
return dataType;
}
private XdbcDataType ToXdbcDataType(string type)
{
switch (type)
{
case "INTEGER" or "INT64":
return XdbcDataType.XdbcDataType_XDBC_INTEGER;
case "FLOAT" or "FLOAT64":
return XdbcDataType.XdbcDataType_XDBC_FLOAT;
case "BOOL" or "BOOLEAN":
return XdbcDataType.XdbcDataType_XDBC_BIT;
case "STRING" or "GEOGRAPHY" or "JSON":
return XdbcDataType.XdbcDataType_XDBC_VARCHAR;
case "BYTES":
return XdbcDataType.XdbcDataType_XDBC_BINARY;
case "DATETIME":
return XdbcDataType.XdbcDataType_XDBC_DATETIME;
case "TIMESTAMP":
return XdbcDataType.XdbcDataType_XDBC_TIMESTAMP;
case "TIME":
return XdbcDataType.XdbcDataType_XDBC_TIME;
case "DATE":
return XdbcDataType.XdbcDataType_XDBC_DATE;
case "RECORD" or "STRUCT":
return XdbcDataType.XdbcDataType_XDBC_VARBINARY;
case "NUMERIC" or "DECIMAL" or "BIGNUMERIC" or "BIGDECIMAL":
return XdbcDataType.XdbcDataType_XDBC_NUMERIC;
default:
// in SqlDecimal, an OverflowException is thrown for decimals with scale > 28
// so the XDBC type needs to map the SqlDecimal type
int decimalMaxScale = 28;
if (type.StartsWith("NUMERIC("))
{
ParsedDecimalValues parsedDecimalValues = ParsePrecisionAndScale(type);
if (parsedDecimalValues.Scale <= decimalMaxScale)
return XdbcDataType.XdbcDataType_XDBC_DECIMAL;
else
return XdbcDataType.XdbcDataType_XDBC_VARCHAR;
}
if (type.StartsWith("BIGNUMERIC("))
{
if (bool.Parse(this.properties[BigQueryParameters.LargeDecimalsAsString]))
{
return XdbcDataType.XdbcDataType_XDBC_VARCHAR;
}
else
{
ParsedDecimalValues parsedDecimalValues = ParsePrecisionAndScale(type);
if (parsedDecimalValues.Scale <= decimalMaxScale)
return XdbcDataType.XdbcDataType_XDBC_DECIMAL;
else
return XdbcDataType.XdbcDataType_XDBC_VARCHAR;
}
}
if (type.StartsWith("STRUCT"))
return XdbcDataType.XdbcDataType_XDBC_VARCHAR;
return XdbcDataType.XdbcDataType_XDBC_UNKNOWN_TYPE;
}
}
public override Schema GetTableSchema(string? catalog, string? dbSchema, string tableName)
{
string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{2}'",
Sanitize(catalog), Sanitize(dbSchema), Sanitize(tableName));
BigQueryResults? result = ExecuteQuery(query, parameters: null);
List<Field> fields = new List<Field>();
if (result != null)
{
foreach (BigQueryRow row in result)
{
fields.Add(DescToField(row));
}
}
return new Schema(fields, null);
}
private Field DescToField(BigQueryRow row)
{
Dictionary<string, string> metaData = new Dictionary<string, string>();
metaData.Add("PRIMARY_KEY", "");
metaData.Add("ORDINAL_POSITION", GetValue(row["ordinal_position"]));
metaData.Add("DATA_TYPE", GetValue(row["data_type"]));
Field.Builder fieldBuilder = SchemaFieldGenerator(GetValue(row["column_name"]).ToLower(), GetValue(row["data_type"]));
fieldBuilder.Metadata(metaData);
if (!GetValue(row["is_nullable"]).Equals("YES", StringComparison.OrdinalIgnoreCase))
{
fieldBuilder.Nullable(false);
}
fieldBuilder.Name(GetValue(row["column_name"]).ToLower());
return fieldBuilder.Build();
}
private string GetValue(object value)
{
switch (value)
{
case string sValue:
return sValue;
default:
if (value != null)
{
string? sValue = value.ToString();
return sValue ?? string.Empty;
}
throw new InvalidOperationException($"Cannot parse {value}");
}
}
private Field.Builder SchemaFieldGenerator(string name, string type)
{
int index = type.IndexOf("(");
index = index == -1 ? type.IndexOf("<") : Math.Max(index, type.IndexOf("<"));
string dataType = index == -1 ? type : type.Substring(0, index);
return GetFieldBuilder(name, type, dataType, index);
}
private Field.Builder GetFieldBuilder(string name, string type, string dataType, int index)
{
Field.Builder fieldBuilder = new Field.Builder();
fieldBuilder.Name(name);
switch (dataType)
{
case "INTEGER" or "INT64":
return fieldBuilder.DataType(Int64Type.Default);
case "FLOAT" or "FLOAT64":
return fieldBuilder.DataType(DoubleType.Default);
case "BOOL" or "BOOLEAN":
return fieldBuilder.DataType(BooleanType.Default);
case "STRING" or "GEOGRAPHY" or "JSON":
return fieldBuilder.DataType(StringType.Default);
case "BYTES":
return fieldBuilder.DataType(BinaryType.Default);
case "DATETIME":
return fieldBuilder.DataType(TimestampType.Default);
case "TIMESTAMP":
return fieldBuilder.DataType(TimestampType.Default);
case "TIME":
return fieldBuilder.DataType(Time64Type.Default);
case "DATE":
return fieldBuilder.DataType(Date64Type.Default);
case "RECORD" or "STRUCT":
string fieldRecords = type.Substring(index + 1);
fieldRecords = fieldRecords.Remove(fieldRecords.Length - 1);
List<Field> nestedFields = new List<Field>();
foreach (string record in fieldRecords.Split(','))
{
string fieldRecord = record.Trim();
string fieldName = fieldRecord.Split(' ')[0];
string fieldType = fieldRecord.Split(' ')[1];
nestedFields.Add(SchemaFieldGenerator(fieldName, fieldType).Build());
}
return fieldBuilder.DataType(new StructType(nestedFields));
case "NUMERIC" or "DECIMAL":
ParsedDecimalValues values128 = ParsePrecisionAndScale(type);
return fieldBuilder.DataType(new Decimal128Type(values128.Precision, values128.Scale));
case "BIGNUMERIC" or "BIGDECIMAL":
ParsedDecimalValues values256 = ParsePrecisionAndScale(type);
return fieldBuilder.DataType(new Decimal256Type(values256.Precision, values256.Scale));
case "ARRAY":
string arrayType = type.Substring(dataType.Length).Replace("<", "").Replace(">", "");
return GetFieldBuilder(name, type, arrayType, index);
default: throw new InvalidOperationException($"{dataType} cannot be handled");
}
}
private class ParsedDecimalValues
{
public int Precision { get; set; }
public int Scale { get; set; }
}
private ParsedDecimalValues ParsePrecisionAndScale(string type)
{
if (string.IsNullOrWhiteSpace(type)) throw new ArgumentNullException(nameof(type));
string[] values = type.Substring(type.IndexOf("(") + 1).TrimEnd(')').Split(",".ToCharArray());
return new ParsedDecimalValues()
{
Precision = Convert.ToInt32(values[0]),
Scale = Convert.ToInt32(values[1])
};
}
public override IArrowArrayStream GetTableTypes()
{
StringArray.Builder tableTypesBuilder = new StringArray.Builder();
tableTypesBuilder.AppendRange(new string[] { "BASE TABLE", "VIEW" });
IArrowArray[] dataArrays = new IArrowArray[]
{
tableTypesBuilder.Build()
};
StandardSchemas.TableTypesSchema.Validate(dataArrays);
return new BigQueryInfoArrowStream(StandardSchemas.TableTypesSchema, dataArrays);
}
public override AdbcStatement CreateStatement()
{
if (this.credential == null)
{
throw new InvalidOperationException();
}
if (this.client == null)
{
this.client = Open();
}
BigQueryStatement statement = new BigQueryStatement(this.client, this.credential);
statement.Options = ParseOptions();
return statement;
}
private IReadOnlyDictionary<string, string> ParseOptions()
{
Dictionary<string, string> options = new Dictionary<string, string>();
string[] statementOptions = new string[] {
BigQueryParameters.AllowLargeResults,
BigQueryParameters.UseLegacySQL,
BigQueryParameters.LargeDecimalsAsString,
BigQueryParameters.LargeResultsDestinationTable,
BigQueryParameters.GetQueryResultsOptionsTimeoutMinutes
};
foreach (string key in statementOptions)
{
if (properties.TryGetValue(key, out string? value))
{
options[key] = value;
}
}
return new ReadOnlyDictionary<string, string>(options);
}
public override void Dispose()
{
this.client?.Dispose();
this.client = null;
}
private static Regex sanitizedInputRegex = new Regex("^[a-zA-Z0-9_-]+");
private string Sanitize(string? input)
{
if (string.IsNullOrEmpty(input))
return string.Empty;
bool isValidInput = sanitizedInputRegex.IsMatch(input);
if (isValidInput)
{
return input!;
}
else
{
throw new AdbcException($"{input} is invalid", AdbcStatusCode.InvalidArgument);
}
}
/// <summary>
/// Gets the access token from the token endpoint.
/// </summary>
/// <param name="clientId"></param>
/// <param name="clientSecret"></param>
/// <param name="refreshToken"></param>
/// <param name="tokenEndpoint"></param>
/// <returns></returns>
private string? GetAccessToken(string clientId, string clientSecret, string refreshToken, string tokenEndpoint)
{
string body = string.Format(
"grant_type=refresh_token&client_id={0}&client_secret={1}&refresh_token={2}",
clientId,
clientSecret,
Uri.EscapeDataString(refreshToken));
HttpClient httpClient = new HttpClient();
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, tokenEndpoint);
request.Headers.Add("Accept", "application/json");
request.Content = new StringContent(body, Encoding.UTF8, "application/x-www-form-urlencoded");
HttpResponseMessage response = httpClient.SendAsync(request).Result;
string responseBody = response.Content.ReadAsStringAsync().Result;
BigQueryTokenResponse? bigQueryTokenResponse = JsonSerializer.Deserialize<BigQueryTokenResponse>(responseBody);
return bigQueryTokenResponse?.AccessToken;
}
enum XdbcDataType
{
XdbcDataType_XDBC_UNKNOWN_TYPE = 0,
XdbcDataType_XDBC_CHAR = 1,
XdbcDataType_XDBC_NUMERIC = 2,
XdbcDataType_XDBC_DECIMAL = 3,
XdbcDataType_XDBC_INTEGER = 4,
XdbcDataType_XDBC_SMALLINT = 5,
XdbcDataType_XDBC_FLOAT = 6,
XdbcDataType_XDBC_REAL = 7,
XdbcDataType_XDBC_DOUBLE = 8,
XdbcDataType_XDBC_DATETIME = 9,
XdbcDataType_XDBC_INTERVAL = 10,
XdbcDataType_XDBC_VARCHAR = 12,
XdbcDataType_XDBC_DATE = 91,
XdbcDataType_XDBC_TIME = 92,
XdbcDataType_XDBC_TIMESTAMP = 93,
XdbcDataType_XDBC_LONGVARCHAR = -1,
XdbcDataType_XDBC_BINARY = -2,
XdbcDataType_XDBC_VARBINARY = -3,
XdbcDataType_XDBC_LONGVARBINARY = -4,
XdbcDataType_XDBC_BIGINT = -5,
XdbcDataType_XDBC_TINYINT = -6,
XdbcDataType_XDBC_BIT = -7,
XdbcDataType_XDBC_WCHAR = -8,
XdbcDataType_XDBC_WVARCHAR = -9,
}
}
}