| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi.utilities; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hudi.AvroConversionUtils; |
| import org.apache.hudi.client.SparkRDDWriteClient; |
| import org.apache.hudi.client.WriteStatus; |
| import org.apache.hudi.client.common.HoodieSparkEngineContext; |
| import org.apache.hudi.common.config.DFSPropertiesConfiguration; |
| import org.apache.hudi.common.config.TypedProperties; |
| import org.apache.hudi.common.util.Option; |
| import org.apache.hudi.common.util.ReflectionUtils; |
| import org.apache.hudi.common.util.StringUtils; |
| import org.apache.hudi.common.util.ValidationUtils; |
| import org.apache.hudi.config.HoodieCompactionConfig; |
| import org.apache.hudi.config.HoodieIndexConfig; |
| import org.apache.hudi.config.HoodieWriteConfig; |
| import org.apache.hudi.exception.HoodieException; |
| import org.apache.hudi.exception.HoodieIOException; |
| import org.apache.hudi.index.HoodieIndex; |
| import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; |
| import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; |
| import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; |
| import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; |
| import org.apache.hudi.utilities.schema.SchemaPostProcessor; |
| import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config; |
| import org.apache.hudi.utilities.schema.SchemaProvider; |
| import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor; |
| import org.apache.hudi.utilities.sources.AvroKafkaSource; |
| import org.apache.hudi.utilities.sources.JsonKafkaSource; |
| import org.apache.hudi.utilities.sources.Source; |
| import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; |
| import org.apache.hudi.utilities.transform.ChainedTransformer; |
| import org.apache.hudi.utilities.transform.Transformer; |
| |
| import org.apache.avro.Schema; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.log4j.LogManager; |
| import org.apache.log4j.Logger; |
| import org.apache.spark.Accumulator; |
| import org.apache.spark.SparkConf; |
| import org.apache.spark.api.java.JavaRDD; |
| import org.apache.spark.api.java.JavaSparkContext; |
| import org.apache.spark.launcher.SparkLauncher; |
| import org.apache.spark.sql.SparkSession; |
| import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry; |
| import org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper; |
| import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions; |
| import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils; |
| import org.apache.spark.sql.jdbc.JdbcDialect; |
| import org.apache.spark.sql.jdbc.JdbcDialects; |
| import org.apache.spark.sql.types.StructType; |
| |
| import java.io.BufferedReader; |
| import java.io.IOException; |
| import java.io.StringReader; |
| import java.nio.ByteBuffer; |
| import java.sql.Connection; |
| import java.sql.Driver; |
| import java.sql.DriverManager; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Properties; |
| |
| /** |
| * Bunch of helper methods. |
| */ |
| public class UtilHelpers { |
| private static final Logger LOG = LogManager.getLogger(UtilHelpers.class); |
| |
| public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc, |
| SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) throws IOException { |
| |
| try { |
| if (JsonKafkaSource.class.getName().equals(sourceClass) |
| || AvroKafkaSource.class.getName().equals(sourceClass)) { |
| return (Source) ReflectionUtils.loadClass(sourceClass, |
| new Class<?>[]{TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class, HoodieDeltaStreamerMetrics.class}, cfg, |
| jssc, sparkSession, schemaProvider, metrics); |
| } |
| |
| return (Source) ReflectionUtils.loadClass(sourceClass, |
| new Class<?>[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class}, cfg, |
| jssc, sparkSession, schemaProvider); |
| } catch (Throwable e) { |
| throw new IOException("Could not load source class " + sourceClass, e); |
| } |
| } |
| |
| public static SchemaProvider createSchemaProvider(String schemaProviderClass, TypedProperties cfg, |
| JavaSparkContext jssc) throws IOException { |
| try { |
| return StringUtils.isNullOrEmpty(schemaProviderClass) ? null |
| : (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc); |
| } catch (Throwable e) { |
| throw new IOException("Could not load schema provider class " + schemaProviderClass, e); |
| } |
| } |
| |
| public static SchemaPostProcessor createSchemaPostProcessor( |
| String schemaPostProcessorClass, TypedProperties cfg, JavaSparkContext jssc) { |
| return schemaPostProcessorClass == null |
| ? null |
| : (SchemaPostProcessor) ReflectionUtils.loadClass(schemaPostProcessorClass, cfg, jssc); |
| } |
| |
| public static Option<Transformer> createTransformer(List<String> classNames) throws IOException { |
| try { |
| List<Transformer> transformers = new ArrayList<>(); |
| for (String className : Option.ofNullable(classNames).orElse(Collections.emptyList())) { |
| transformers.add(ReflectionUtils.loadClass(className)); |
| } |
| return transformers.isEmpty() ? Option.empty() : Option.of(new ChainedTransformer(transformers)); |
| } catch (Throwable e) { |
| throw new IOException("Could not load transformer class(es) " + classNames, e); |
| } |
| } |
| |
| public static InitialCheckPointProvider createInitialCheckpointProvider( |
| String className, TypedProperties props) throws IOException { |
| try { |
| return (InitialCheckPointProvider) ReflectionUtils.loadClass(className, new Class<?>[] {TypedProperties.class}, props); |
| } catch (Throwable e) { |
| throw new IOException("Could not load initial checkpoint provider class " + className, e); |
| } |
| } |
| |
| /** |
| * |
| */ |
| public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List<String> overriddenProps) { |
| DFSPropertiesConfiguration conf; |
| try { |
| conf = new DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath); |
| } catch (Exception e) { |
| conf = new DFSPropertiesConfiguration(); |
| LOG.warn("Unexpected error read props file at :" + cfgPath, e); |
| } |
| |
| try { |
| if (!overriddenProps.isEmpty()) { |
| LOG.info("Adding overridden properties to file properties."); |
| conf.addProperties(new BufferedReader(new StringReader(String.join("\n", overriddenProps)))); |
| } |
| } catch (IOException ioe) { |
| throw new HoodieIOException("Unexpected error adding config overrides", ioe); |
| } |
| |
| return conf; |
| } |
| |
| public static TypedProperties buildProperties(List<String> props) { |
| TypedProperties properties = new TypedProperties(); |
| props.forEach(x -> { |
| String[] kv = x.split("="); |
| ValidationUtils.checkArgument(kv.length == 2); |
| properties.setProperty(kv[0], kv[1]); |
| }); |
| return properties; |
| } |
| |
| public static void validateAndAddProperties(String[] configs, SparkLauncher sparkLauncher) { |
| Arrays.stream(configs).filter(config -> config.contains("=") && config.split("=").length == 2).forEach(sparkLauncher::addAppArgs); |
| } |
| |
| /** |
| * Parse Schema from file. |
| * |
| * @param fs File System |
| * @param schemaFile Schema File |
| */ |
| public static String parseSchema(FileSystem fs, String schemaFile) throws Exception { |
| // Read schema file. |
| Path p = new Path(schemaFile); |
| if (!fs.exists(p)) { |
| throw new Exception(String.format("Could not find - %s - schema file.", schemaFile)); |
| } |
| long len = fs.getFileStatus(p).getLen(); |
| ByteBuffer buf = ByteBuffer.allocate((int) len); |
| try (FSDataInputStream inputStream = fs.open(p)) { |
| inputStream.readFully(0, buf.array(), 0, buf.array().length); |
| } |
| return new String(buf.array()); |
| } |
| |
| private static SparkConf buildSparkConf(String appName, String defaultMaster) { |
| return buildSparkConf(appName, defaultMaster, new HashMap<>()); |
| } |
| |
| private static SparkConf buildSparkConf(String appName, String defaultMaster, Map<String, String> additionalConfigs) { |
| final SparkConf sparkConf = new SparkConf().setAppName(appName); |
| String master = sparkConf.get("spark.master", defaultMaster); |
| sparkConf.setMaster(master); |
| if (master.startsWith("yarn")) { |
| sparkConf.set("spark.eventLog.overwrite", "true"); |
| sparkConf.set("spark.eventLog.enabled", "true"); |
| } |
| sparkConf.setIfMissing("spark.driver.maxResultSize", "2g"); |
| sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); |
| sparkConf.set("spark.hadoop.mapred.output.compress", "true"); |
| sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); |
| sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); |
| sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); |
| |
| additionalConfigs.forEach(sparkConf::set); |
| return SparkRDDWriteClient.registerClasses(sparkConf); |
| } |
| |
| public static JavaSparkContext buildSparkContext(String appName, String defaultMaster, Map<String, String> configs) { |
| return new JavaSparkContext(buildSparkConf(appName, defaultMaster, configs)); |
| } |
| |
| public static JavaSparkContext buildSparkContext(String appName, String defaultMaster) { |
| return new JavaSparkContext(buildSparkConf(appName, defaultMaster)); |
| } |
| |
| /** |
| * Build Spark Context for ingestion/compaction. |
| * |
| * @return |
| */ |
| public static JavaSparkContext buildSparkContext(String appName, String sparkMaster, String sparkMemory) { |
| SparkConf sparkConf = buildSparkConf(appName, sparkMaster); |
| sparkConf.set("spark.executor.memory", sparkMemory); |
| return new JavaSparkContext(sparkConf); |
| } |
| |
| /** |
| * Build Hoodie write client. |
| * |
| * @param jsc Java Spark Context |
| * @param basePath Base Path |
| * @param schemaStr Schema |
| * @param parallelism Parallelism |
| */ |
| public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr, |
| int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) { |
| HoodieCompactionConfig compactionConfig = compactionStrategyClass |
| .map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) |
| .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build()) |
| .orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build()); |
| HoodieWriteConfig config = |
| HoodieWriteConfig.newBuilder().withPath(basePath) |
| .withParallelism(parallelism, parallelism) |
| .withBulkInsertParallelism(parallelism) |
| .withDeleteParallelism(parallelism) |
| .withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig) |
| .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) |
| .withProps(properties).build(); |
| return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config); |
| } |
| |
| public static int handleErrors(JavaSparkContext jsc, String instantTime, JavaRDD<WriteStatus> writeResponse) { |
| Accumulator<Integer> errors = jsc.accumulator(0); |
| writeResponse.foreach(writeStatus -> { |
| if (writeStatus.hasErrors()) { |
| errors.add(1); |
| LOG.error(String.format("Error processing records :writeStatus:%s", writeStatus.getStat().toString())); |
| } |
| }); |
| if (errors.value() == 0) { |
| LOG.info(String.format("Table imported into hoodie with %s instant time.", instantTime)); |
| return 0; |
| } |
| LOG.error(String.format("Import failed with %d errors.", errors.value())); |
| return -1; |
| } |
| |
| /** |
| * Returns a factory for creating connections to the given JDBC URL. |
| * |
| * @param options - JDBC options that contains url, table and other information. |
| * @return |
| * @throws SQLException if the driver could not open a JDBC connection. |
| */ |
| private static Connection createConnectionFactory(Map<String, String> options) throws SQLException { |
| String driverClass = options.get(JDBCOptions.JDBC_DRIVER_CLASS()); |
| DriverRegistry.register(driverClass); |
| Enumeration<Driver> drivers = DriverManager.getDrivers(); |
| Driver driver = null; |
| while (drivers.hasMoreElements()) { |
| Driver d = drivers.nextElement(); |
| if (d instanceof DriverWrapper) { |
| if (((DriverWrapper) d).wrapped().getClass().getCanonicalName().equals(driverClass)) { |
| driver = d; |
| } |
| } else if (d.getClass().getCanonicalName().equals(driverClass)) { |
| driver = d; |
| } |
| if (driver != null) { |
| break; |
| } |
| } |
| |
| Objects.requireNonNull(driver, String.format("Did not find registered driver with class %s", driverClass)); |
| |
| Properties properties = new Properties(); |
| properties.putAll(options); |
| Connection connect; |
| String url = options.get(JDBCOptions.JDBC_URL()); |
| connect = driver.connect(url, properties); |
| Objects.requireNonNull(connect, String.format("The driver could not open a JDBC connection. Check the URL: %s", url)); |
| return connect; |
| } |
| |
| /** |
| * Returns true if the table already exists in the JDBC database. |
| */ |
| private static Boolean tableExists(Connection conn, Map<String, String> options) { |
| JdbcDialect dialect = JdbcDialects.get(options.get(JDBCOptions.JDBC_URL())); |
| try (PreparedStatement statement = conn.prepareStatement(dialect.getTableExistsQuery(options.get(JDBCOptions.JDBC_TABLE_NAME())))) { |
| statement.setQueryTimeout(Integer.parseInt(options.get(JDBCOptions.JDBC_QUERY_TIMEOUT()))); |
| statement.executeQuery(); |
| } catch (SQLException e) { |
| return false; |
| } |
| return true; |
| } |
| |
| /*** |
| * call spark function get the schema through jdbc. |
| * The code logic implementation refers to spark 2.4.x and spark 3.x. |
| * @param options |
| * @return |
| * @throws Exception |
| */ |
| public static Schema getJDBCSchema(Map<String, String> options) throws Exception { |
| Connection conn = createConnectionFactory(options); |
| String url = options.get(JDBCOptions.JDBC_URL()); |
| String table = options.get(JDBCOptions.JDBC_TABLE_NAME()); |
| boolean tableExists = tableExists(conn, options); |
| |
| if (tableExists) { |
| JdbcDialect dialect = JdbcDialects.get(url); |
| try (PreparedStatement statement = conn.prepareStatement(dialect.getSchemaQuery(table))) { |
| statement.setQueryTimeout(Integer.parseInt(options.get("queryTimeout"))); |
| try (ResultSet rs = statement.executeQuery()) { |
| StructType structType; |
| if (Boolean.parseBoolean(options.get("nullable"))) { |
| structType = JdbcUtils.getSchema(rs, dialect, true); |
| } else { |
| structType = JdbcUtils.getSchema(rs, dialect, false); |
| } |
| return AvroConversionUtils.convertStructTypeToAvroSchema(structType, table, "hoodie." + table); |
| } |
| } |
| } else { |
| throw new HoodieException(String.format("%s table does not exists!", table)); |
| } |
| } |
| |
| public static DFSPathSelector createSourceSelector(TypedProperties props, |
| Configuration conf) throws IOException { |
| String sourceSelectorClass = |
| props.getString(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR, DFSPathSelector.class.getName()); |
| try { |
| DFSPathSelector selector = (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass, |
| new Class<?>[]{TypedProperties.class, Configuration.class}, |
| props, conf); |
| |
| LOG.info("Using path selector " + selector.getClass().getName()); |
| return selector; |
| } catch (Throwable e) { |
| throw new IOException("Could not load source selector class " + sourceSelectorClass, e); |
| } |
| } |
| |
| public static SchemaProvider getOriginalSchemaProvider(SchemaProvider schemaProvider) { |
| SchemaProvider originalProvider = schemaProvider; |
| if (schemaProvider instanceof SchemaProviderWithPostProcessor) { |
| originalProvider = ((SchemaProviderWithPostProcessor) schemaProvider).getOriginalSchemaProvider(); |
| } else if (schemaProvider instanceof DelegatingSchemaProvider) { |
| originalProvider = ((DelegatingSchemaProvider) schemaProvider).getSourceSchemaProvider(); |
| } |
| return originalProvider; |
| } |
| |
| public static SchemaProviderWithPostProcessor wrapSchemaProviderWithPostProcessor(SchemaProvider provider, |
| TypedProperties cfg, JavaSparkContext jssc) { |
| |
| if (provider == null) { |
| return null; |
| } |
| |
| if (provider instanceof SchemaProviderWithPostProcessor) { |
| return (SchemaProviderWithPostProcessor)provider; |
| } |
| String schemaPostProcessorClass = cfg.getString(Config.SCHEMA_POST_PROCESSOR_PROP, null); |
| return new SchemaProviderWithPostProcessor(provider, |
| Option.ofNullable(createSchemaPostProcessor(schemaPostProcessorClass, cfg, jssc))); |
| } |
| |
| public static SchemaProvider createRowBasedSchemaProvider(StructType structType, |
| TypedProperties cfg, JavaSparkContext jssc) { |
| SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType); |
| return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc); |
| } |
| } |