Java源码示例:org.apache.spark.sql.sources.v2.writer.DataSourceWriter

示例1
@Override
public Optional<DataSourceWriter> createWriter(String jobId, StructType dsStruct, SaveMode mode,
                                               DataSourceOptions options) {
  Preconditions.checkArgument(mode == SaveMode.Append || mode == SaveMode.Overwrite,
      "Save mode %s is not supported", mode);
  Configuration conf = new Configuration(lazyBaseConf());
  Table table = getTableAndResolveHadoopConfiguration(options, conf);
  Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsStruct);
  TypeUtil.validateWriteSchema(table.schema(), writeSchema, checkNullability(options), checkOrdering(options));
  SparkUtil.validatePartitionTransforms(table.spec());
  String appId = lazySparkSession().sparkContext().applicationId();
  String wapId = lazySparkSession().conf().get("spark.wap.id", null);
  boolean replacePartitions = mode == SaveMode.Overwrite;

  Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
  Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption());

  return Optional.of(new Writer(
      table, io, encryptionManager, options, replacePartitions, appId, wapId, writeSchema, dsStruct));
}
 
示例2
@Override
public Optional<DataSourceWriter> createWriter(String jobId, StructType dfStruct, SaveMode mode,
                                                 DataSourceOptions options) {
  Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode);

  Table table = findTable(options);

  Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct);
  List<String> errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema);
  if (!errors.isEmpty()) {
    StringBuilder sb = new StringBuilder();
    sb.append("Cannot write incompatible dataframe to table with schema:\n")
        .append(table.schema()).append("\nProblems:");
    for (String error : errors) {
      sb.append("\n* ").append(error);
    }
    throw new IllegalArgumentException(sb.toString());
  }

  Optional<String> formatOption = options.get("iceberg.write.format");
  FileFormat format;
  if (formatOption.isPresent()) {
    format = FileFormat.valueOf(formatOption.get().toUpperCase(Locale.ENGLISH));
  } else {
    format = FileFormat.valueOf(table.properties()
        .getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)
        .toUpperCase(Locale.ENGLISH));
  }

  return Optional.of(new Writer(table, lazyConf(), format));
}
 
示例3
@Override
public Optional<DataSourceWriter> createWriter(String jobId, StructType schema,
    SaveMode mode, DataSourceOptions options) {
  Map<String, String> params = getOptions(options);
  String stagingDirPrefix = HWConf.LOAD_STAGING_DIR.getFromOptionsMap(params);
  Path path = new Path(stagingDirPrefix);
  Configuration conf = SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();
  return Optional.of(getDataSourceWriter(jobId, schema, path, params, conf));
}
 
示例4
/**
 * Spark calls this to create the writer. The data source options are used
 * in the same way as above.
 * @param jobId
 * @param schema
 * @param mode
 * @param options
 * @return
 */
@Override
public Optional<DataSourceWriter> createWriter(
        String jobId, StructType schema, SaveMode mode, DataSourceOptions options)
{
    // TODO: ned to distinguish between creating the table for the first time
    // TODO: (just validate schema and create) vs appending (compare schema)

    // TODO: log JobId here and elsewhere whent he partitionId etc are logged

    String host = options.get("host").orElse("localhost");
    int port = options.getInt("port", -1);
    String table = options.get("table").orElse("unknownTable"); // TODO: throw
    int partitions = Integer.parseInt(options.get("partitions").orElse("0"));

    edb.common.Schema dbSchema = DBClientWrapper.sparkToDbSchema(schema);

    boolean truncateOnCommit = false;

    DBClientWrapper db = new DBClientWrapper(host, port);
    db.connect();
    if (db.tableExists(table)) {
        switch (mode) {
            case ErrorIfExists: {
                // check existence and throw if needed
                throw new RuntimeException("data already exists");
            }
            case Append: {
                // just check schema compatibility
                try {
                    Schema actualSchema = db.getDBSchema(table);
                    if (!dbSchema.isCompatible(actualSchema)) {
                        throw new RuntimeException("Appending to table with incompatible schema");
                    }
                } catch (UnknownTableException ute) {
                    throw new RuntimeException(ute);
                }
                break;
            }
            case Overwrite: {
                // two options if table exists: truncate it now or truncate it later
                truncateOnCommit = true;
                break;
            }
            case Ignore: {
                // check existence and declare victory
                return Optional.empty();
            }
            default:
        }
    } else {
        db.createTable(table, dbSchema);
    }

    return Optional.of(new Writer(host, port, table, partitions, dbSchema, truncateOnCommit));
}
 
示例5
@Override
public Optional<DataSourceWriter> createWriter(final String jobId, final StructType schema, final SaveMode mode,
  final DataSourceOptions options) {
  return Optional.of(createDataSourceWriter(jobId, schema, options));
}
 
示例6
protected DataSourceWriter getDataSourceWriter(String jobId, StructType schema,
    Path path, Map<String, String> options, Configuration conf) {
  return new HiveWarehouseDataSourceWriter(options, jobId, schema, path, conf);
}
 
示例7
@Override
protected DataSourceWriter getDataSourceWriter(String jobId, StructType schema,
    Path path, Map<String, String> options, Configuration conf) {
  return new MockWriteSupport.MockHiveWarehouseDataSourceWriter(options, jobId, schema, path, conf);
}