| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hop.beam.transforms.io; |
| |
| import java.util.List; |
| import java.util.Map; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.hop.beam.core.HopRow; |
| import org.apache.hop.beam.core.transform.BeamOutputTransform; |
| import org.apache.hop.beam.engines.IBeamPipelineEngineRunConfiguration; |
| import org.apache.hop.beam.metadata.FieldDefinition; |
| import org.apache.hop.beam.metadata.FileDefinition; |
| import org.apache.hop.beam.pipeline.IBeamPipelineTransformHandler; |
| import org.apache.hop.core.annotations.Transform; |
| import org.apache.hop.core.exception.HopException; |
| import org.apache.hop.core.exception.HopTransformException; |
| import org.apache.hop.core.logging.ILogChannel; |
| import org.apache.hop.core.row.IRowMeta; |
| import org.apache.hop.core.row.IValueMeta; |
| import org.apache.hop.core.row.JsonRowMeta; |
| import org.apache.hop.core.variables.IVariables; |
| import org.apache.hop.metadata.api.HopMetadataProperty; |
| import org.apache.hop.metadata.api.IHopMetadataProvider; |
| import org.apache.hop.metadata.api.IHopMetadataSerializer; |
| import org.apache.hop.pipeline.PipelineMeta; |
| import org.apache.hop.pipeline.transform.BaseTransformMeta; |
| import org.apache.hop.pipeline.transform.TransformMeta; |
| |
| @Transform( |
| id = "BeamOutput", |
| image = "beam-output.svg", |
| name = "i18n::BeamOutputDialog.DialogTitle", |
| description = "i18n::BeamOutputDialog.Description", |
| categoryDescription = "i18n:org.apache.hop.pipeline.transform:BaseTransform.Category.BigData", |
| keywords = "i18n::BeamOutputMeta.keyword", |
| documentationUrl = "/pipeline/transforms/beamoutput.html") |
| public class BeamOutputMeta extends BaseTransformMeta<BeamOutput, BeamOutputData> |
| implements IBeamPipelineTransformHandler { |
| |
| @HopMetadataProperty(key = "output_location") |
| private String outputLocation; |
| |
| @HopMetadataProperty(key = "file_description_name") |
| private String fileDefinitionName; |
| |
| @HopMetadataProperty(key = "file_prefix") |
| private String filePrefix; |
| |
| @HopMetadataProperty(key = "file_suffix") |
| private String fileSuffix; |
| |
| @HopMetadataProperty private boolean windowed; |
| |
| @Override |
| public String getDialogClassName() { |
| return BeamOutputDialog.class.getName(); |
| } |
| |
| public FileDefinition loadFileDefinition(IHopMetadataProvider metadataProvider) |
| throws HopTransformException { |
| if (StringUtils.isEmpty(fileDefinitionName)) { |
| throw new HopTransformException("No file description name provided"); |
| } |
| FileDefinition fileDefinition; |
| try { |
| IHopMetadataSerializer<FileDefinition> serializer = |
| metadataProvider.getSerializer(FileDefinition.class); |
| fileDefinition = serializer.load(fileDefinitionName); |
| } catch (Exception e) { |
| throw new HopTransformException( |
| "Unable to load file description '" + fileDefinitionName + "' from the metadata", e); |
| } |
| |
| return fileDefinition; |
| } |
| |
| @Override |
| public boolean isInput() { |
| return false; |
| } |
| |
| @Override |
| public boolean isOutput() { |
| return true; |
| } |
| |
| @Override |
| public void handleTransform( |
| ILogChannel log, |
| IVariables variables, |
| String runConfigurationName, |
| IBeamPipelineEngineRunConfiguration runConfiguration, |
| String dataSamplersJson, |
| IHopMetadataProvider metadataProvider, |
| PipelineMeta pipelineMeta, |
| TransformMeta transformMeta, |
| Map<String, PCollection<HopRow>> transformCollectionMap, |
| org.apache.beam.sdk.Pipeline pipeline, |
| IRowMeta rowMeta, |
| List<TransformMeta> previousTransforms, |
| PCollection<HopRow> input, |
| String parentLogChannelId) |
| throws HopException { |
| |
| FileDefinition outputFileDefinition; |
| if (StringUtils.isEmpty(fileDefinitionName)) { |
| // Create a default file definition using standard output and sane defaults... |
| // |
| outputFileDefinition = getDefaultFileDefinition(); |
| } else { |
| outputFileDefinition = loadFileDefinition(metadataProvider); |
| } |
| |
| // Empty file definition? Add all fields in the output |
| // |
| addAllFieldsToEmptyFileDefinition(rowMeta, outputFileDefinition); |
| |
| // Apply the output transform from HopRow to PDone |
| // |
| if (rowMeta == null || rowMeta.isEmpty()) { |
| throw new HopException( |
| "No output fields found in the file definition or from previous transforms"); |
| } |
| |
| BeamOutputTransform beamOutputTransform = |
| new BeamOutputTransform( |
| transformMeta.getName(), |
| variables.resolve(outputLocation), |
| variables.resolve(filePrefix), |
| variables.resolve(fileSuffix), |
| variables.resolve(outputFileDefinition.getSeparator()), |
| variables.resolve(outputFileDefinition.getEnclosure()), |
| windowed, |
| JsonRowMeta.toJson(rowMeta)); |
| |
| // Which transform do we apply this transform to? |
| // Ignore info hops until we figure that out. |
| // |
| if (previousTransforms.size() > 1) { |
| throw new HopException("Combining data from multiple transforms is not supported yet!"); |
| } |
| TransformMeta previousTransform = previousTransforms.get(0); |
| |
| // No need to store this, it's PDone. |
| // |
| input.apply(beamOutputTransform); |
| log.logBasic( |
| "Handled transform (OUTPUT) : " |
| + transformMeta.getName() |
| + ", gets data from " |
| + previousTransform.getName()); |
| } |
| |
| private FileDefinition getDefaultFileDefinition() { |
| FileDefinition fileDefinition = new FileDefinition(); |
| |
| fileDefinition.setName("Default"); |
| fileDefinition.setEnclosure("\""); |
| fileDefinition.setSeparator(","); |
| |
| return fileDefinition; |
| } |
| |
| private void addAllFieldsToEmptyFileDefinition(IRowMeta rowMeta, FileDefinition fileDefinition) { |
| if (fileDefinition.getFieldDefinitions().isEmpty()) { |
| for (IValueMeta valueMeta : rowMeta.getValueMetaList()) { |
| fileDefinition |
| .getFieldDefinitions() |
| .add( |
| new FieldDefinition( |
| valueMeta.getName(), |
| valueMeta.getTypeDesc(), |
| valueMeta.getLength(), |
| valueMeta.getPrecision(), |
| valueMeta.getConversionMask())); |
| } |
| } |
| } |
| |
| /** |
| * Gets outputLocation |
| * |
| * @return value of outputLocation |
| */ |
| public String getOutputLocation() { |
| return outputLocation; |
| } |
| |
| /** |
| * @param outputLocation The outputLocation to set |
| */ |
| public void setOutputLocation(String outputLocation) { |
| this.outputLocation = outputLocation; |
| } |
| |
| /** |
| * Gets fileDescriptionName |
| * |
| * @return value of fileDescriptionName |
| */ |
| public String getFileDefinitionName() { |
| return fileDefinitionName; |
| } |
| |
| /** |
| * @param fileDefinitionName The fileDescriptionName to set |
| */ |
| public void setFileDefinitionName(String fileDefinitionName) { |
| this.fileDefinitionName = fileDefinitionName; |
| } |
| |
| /** |
| * Gets filePrefix |
| * |
| * @return value of filePrefix |
| */ |
| public String getFilePrefix() { |
| return filePrefix; |
| } |
| |
| /** |
| * @param filePrefix The filePrefix to set |
| */ |
| public void setFilePrefix(String filePrefix) { |
| this.filePrefix = filePrefix; |
| } |
| |
| /** |
| * Gets fileSuffix |
| * |
| * @return value of fileSuffix |
| */ |
| public String getFileSuffix() { |
| return fileSuffix; |
| } |
| |
| /** |
| * @param fileSuffix The fileSuffix to set |
| */ |
| public void setFileSuffix(String fileSuffix) { |
| this.fileSuffix = fileSuffix; |
| } |
| |
| /** |
| * Gets windowed |
| * |
| * @return value of windowed |
| */ |
| public boolean isWindowed() { |
| return windowed; |
| } |
| |
| /** |
| * @param windowed The windowed to set |
| */ |
| public void setWindowed(boolean windowed) { |
| this.windowed = windowed; |
| } |
| } |