| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.asterix.external.input.record.reader.azure.parquet; |
| |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.asterix.common.api.IApplicationContext; |
| import org.apache.asterix.common.exceptions.CompilationException; |
| import org.apache.asterix.external.input.HDFSDataSourceFactory; |
| import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher; |
| import org.apache.asterix.external.util.ExternalDataConstants; |
| import org.apache.asterix.external.util.ExternalDataUtils; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; |
| import org.apache.hyracks.api.application.IServiceContext; |
| import org.apache.hyracks.api.exceptions.HyracksDataException; |
| import org.apache.hyracks.api.exceptions.IWarningCollector; |
| |
| import com.azure.storage.blob.BlobServiceClient; |
| import com.azure.storage.blob.models.BlobItem; |
| |
| public class AzureBlobParquetReaderFactory extends HDFSDataSourceFactory { |
| private static final long serialVersionUID = -6140824803254158253L; |
| private static final List<String> recordReaderNames = |
| Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB); |
| |
| @Override |
| public void configure(IServiceContext serviceCtx, Map<String, String> configuration, |
| IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException { |
| IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext(); |
| BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration); |
| //Get endpoint |
| String endPoint = extractEndPoint(blobServiceClient.getAccountUrl()); |
| //Get path |
| String path = buildPathURIs(configuration, warningCollector, blobServiceClient, endPoint); |
| //Put Azure configurations to AsterixDB's Hadoop configuration |
| putAzureBlobConfToHadoopConf(configuration, path); |
| |
| //Configure Hadoop Azure input splits |
| JobConf conf = createHdfsConf(serviceCtx, configuration); |
| ExternalDataUtils.Azure.configureAzureHdfsJobConf(conf, configuration, endPoint); |
| configureHdfsConf(conf, configuration); |
| } |
| |
| @Override |
| public List<String> getRecordReaderNames() { |
| return recordReaderNames; |
| } |
| |
| @Override |
| public Set<String> getReaderSupportedFormats() { |
| return Collections.singleton(ExternalDataConstants.FORMAT_PARQUET); |
| } |
| |
| /** |
| * Prepare Hadoop configurations to read parquet files |
| * |
| * @param path Comma-delimited paths |
| */ |
| private static void putAzureBlobConfToHadoopConf(Map<String, String> configuration, String path) { |
| configuration.put(ExternalDataConstants.KEY_PATH, path); |
| configuration.put(ExternalDataConstants.KEY_INPUT_FORMAT, ExternalDataConstants.INPUT_FORMAT_PARQUET); |
| configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_NOOP); |
| } |
| |
| /** |
| * Build Azure Blob Storage path-style for the requested files |
| * |
| * @param configuration properties |
| * @param warningCollector warning collector |
| * @return Comma-delimited paths (e.g., "wasbs://container@accountName.blob.core.windows.net/file1.parquet, |
| * wasbs://container@accountName.blob.core.windows.net/file2.parquet") |
| * @throws CompilationException Compilation exception |
| */ |
| private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector, |
| BlobServiceClient blobServiceClient, String endPoint) throws CompilationException { |
| IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration); |
| List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration, |
| includeExcludeMatcher, warningCollector); |
| |
| StringBuilder builder = new StringBuilder(); |
| String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); |
| |
| if (!filesOnly.isEmpty()) { |
| appendFileURI(builder, container, endPoint, filesOnly.get(0)); |
| for (int i = 1; i < filesOnly.size(); i++) { |
| builder.append(','); |
| appendFileURI(builder, container, endPoint, filesOnly.get(i)); |
| } |
| } |
| |
| return builder.toString(); |
| } |
| |
| private static String extractEndPoint(String uri) { |
| //The URI is in the form http(s)://<accountName>.blob.core.windows.net |
| //We need to Remove the protocol (i.e., http(s)://) from the URI |
| return uri.substring(uri.indexOf("//") + "//".length()); |
| } |
| |
| private static void appendFileURI(StringBuilder builder, String container, String endPoint, BlobItem file) { |
| builder.append(ExternalDataConstants.Azure.HADOOP_AZURE_BLOB_PROTOCOL); |
| builder.append("://"); |
| builder.append(container); |
| builder.append('@'); |
| builder.append(endPoint); |
| builder.append('/'); |
| builder.append(file.getName()); |
| } |
| } |