Java源码示例:org.apache.spark.sql.sources.v2.writer.DataSourceWriter
示例1
@Override
public Optional<DataSourceWriter> createWriter(String jobId, StructType dsStruct, SaveMode mode,
DataSourceOptions options) {
Preconditions.checkArgument(mode == SaveMode.Append || mode == SaveMode.Overwrite,
"Save mode %s is not supported", mode);
Configuration conf = new Configuration(lazyBaseConf());
Table table = getTableAndResolveHadoopConfiguration(options, conf);
Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsStruct);
TypeUtil.validateWriteSchema(table.schema(), writeSchema, checkNullability(options), checkOrdering(options));
SparkUtil.validatePartitionTransforms(table.spec());
String appId = lazySparkSession().sparkContext().applicationId();
String wapId = lazySparkSession().conf().get("spark.wap.id", null);
boolean replacePartitions = mode == SaveMode.Overwrite;
Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption());
return Optional.of(new Writer(
table, io, encryptionManager, options, replacePartitions, appId, wapId, writeSchema, dsStruct));
}
示例2
@Override
public Optional<DataSourceWriter> createWriter(String jobId, StructType dfStruct, SaveMode mode,
DataSourceOptions options) {
Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode);
Table table = findTable(options);
Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct);
List<String> errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema);
if (!errors.isEmpty()) {
StringBuilder sb = new StringBuilder();
sb.append("Cannot write incompatible dataframe to table with schema:\n")
.append(table.schema()).append("\nProblems:");
for (String error : errors) {
sb.append("\n* ").append(error);
}
throw new IllegalArgumentException(sb.toString());
}
Optional<String> formatOption = options.get("iceberg.write.format");
FileFormat format;
if (formatOption.isPresent()) {
format = FileFormat.valueOf(formatOption.get().toUpperCase(Locale.ENGLISH));
} else {
format = FileFormat.valueOf(table.properties()
.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)
.toUpperCase(Locale.ENGLISH));
}
return Optional.of(new Writer(table, lazyConf(), format));
}
示例3
@Override
public Optional<DataSourceWriter> createWriter(String jobId, StructType schema,
SaveMode mode, DataSourceOptions options) {
Map<String, String> params = getOptions(options);
String stagingDirPrefix = HWConf.LOAD_STAGING_DIR.getFromOptionsMap(params);
Path path = new Path(stagingDirPrefix);
Configuration conf = SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();
return Optional.of(getDataSourceWriter(jobId, schema, path, params, conf));
}
示例4
/**
* Spark calls this to create the writer. The data source options are used
* in the same way as above.
* @param jobId
* @param schema
* @param mode
* @param options
* @return
*/
@Override
public Optional<DataSourceWriter> createWriter(
String jobId, StructType schema, SaveMode mode, DataSourceOptions options)
{
// TODO: ned to distinguish between creating the table for the first time
// TODO: (just validate schema and create) vs appending (compare schema)
// TODO: log JobId here and elsewhere whent he partitionId etc are logged
String host = options.get("host").orElse("localhost");
int port = options.getInt("port", -1);
String table = options.get("table").orElse("unknownTable"); // TODO: throw
int partitions = Integer.parseInt(options.get("partitions").orElse("0"));
edb.common.Schema dbSchema = DBClientWrapper.sparkToDbSchema(schema);
boolean truncateOnCommit = false;
DBClientWrapper db = new DBClientWrapper(host, port);
db.connect();
if (db.tableExists(table)) {
switch (mode) {
case ErrorIfExists: {
// check existence and throw if needed
throw new RuntimeException("data already exists");
}
case Append: {
// just check schema compatibility
try {
Schema actualSchema = db.getDBSchema(table);
if (!dbSchema.isCompatible(actualSchema)) {
throw new RuntimeException("Appending to table with incompatible schema");
}
} catch (UnknownTableException ute) {
throw new RuntimeException(ute);
}
break;
}
case Overwrite: {
// two options if table exists: truncate it now or truncate it later
truncateOnCommit = true;
break;
}
case Ignore: {
// check existence and declare victory
return Optional.empty();
}
default:
}
} else {
db.createTable(table, dbSchema);
}
return Optional.of(new Writer(host, port, table, partitions, dbSchema, truncateOnCommit));
}
示例5
@Override
public Optional<DataSourceWriter> createWriter(final String jobId, final StructType schema, final SaveMode mode,
final DataSourceOptions options) {
return Optional.of(createDataSourceWriter(jobId, schema, options));
}
示例6
protected DataSourceWriter getDataSourceWriter(String jobId, StructType schema,
Path path, Map<String, String> options, Configuration conf) {
return new HiveWarehouseDataSourceWriter(options, jobId, schema, path, conf);
}
示例7
@Override
protected DataSourceWriter getDataSourceWriter(String jobId, StructType schema,
Path path, Map<String, String> options, Configuration conf) {
return new MockWriteSupport.MockHiveWarehouseDataSourceWriter(options, jobId, schema, path, conf);
}