Java源码示例:com.google.api.services.bigquery.model.TableReference
示例1
private Table table(String defaultProjectId, Optional<DatasetReference> defaultDataset, TableConfig config)
{
Optional<String> datasetId = config.dataset().or(defaultDataset.transform(DatasetReference::getDatasetId));
if (!datasetId.isPresent()) {
throw new ConfigException("Bad table reference or configuration: Missing 'dataset'");
}
return new Table()
.setTableReference(new TableReference()
.setProjectId(config.project().or(defaultProjectId))
.setDatasetId(datasetId.get())
.setTableId(config.id()))
.setSchema(config.schema().orNull())
.setFriendlyName(config.friendly_name().orNull())
.setExpirationTime(config.expiration_time()
.transform(p -> p.getTimestamp().toInstant(request.getTimeZone()).toEpochMilli()).orNull())
.setTimePartitioning(config.time_partitioning().orNull())
.setView(config.view().orNull());
}
示例2
public static <T> BigQueryStorageTableSource<T> create(
ValueProvider<TableReference> tableRefProvider,
@Nullable TableReadOptions readOptions,
@Nullable ValueProvider<List<String>> selectedFields,
@Nullable ValueProvider<String> rowRestriction,
SerializableFunction<SchemaAndRecord, T> parseFn,
Coder<T> outputCoder,
BigQueryServices bqServices) {
return new BigQueryStorageTableSource<>(
tableRefProvider,
readOptions,
selectedFields,
rowRestriction,
parseFn,
outputCoder,
bqServices);
}
示例3
@Test
public void testCreateTempTableReference() {
String projectId = "this-is-my-project";
String jobUuid = "this-is-my-job";
TableReference noDataset =
BigQueryHelpers.createTempTableReference(projectId, jobUuid, Optional.empty());
assertEquals(noDataset.getProjectId(), projectId);
assertEquals(noDataset.getDatasetId(), "temp_dataset_" + jobUuid);
assertEquals(noDataset.getTableId(), "temp_table_" + jobUuid);
Optional<String> dataset = Optional.ofNullable("my-tmp-dataset");
TableReference tempTableReference =
BigQueryHelpers.createTempTableReference(projectId, jobUuid, dataset);
assertEquals(tempTableReference.getProjectId(), noDataset.getProjectId());
assertEquals(tempTableReference.getDatasetId(), dataset.get());
assertEquals(tempTableReference.getTableId(), noDataset.getTableId());
assertEquals(dataset.get(), noDataset.setDatasetId(dataset.get()).getDatasetId());
}
示例4
private static void updateTable(Bigquery bigquery, Table table) throws IOException {
TableReference ref = table.getTableReference();
try {
bigquery
.tables()
.update(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), table)
.execute();
} catch (GoogleJsonResponseException e) {
if (e.getDetails() != null && e.getDetails().getCode() == 404) {
bigquery.tables().insert(ref.getProjectId(), ref.getDatasetId(), table).execute();
} else {
logger.atWarning().withCause(e).log(
"UpdateSnapshotViewAction failed, caught exception %s", e.getDetails());
}
}
}
示例5
@Test
public void testCreateTableSucceeds() throws IOException {
TableReference ref =
new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
Table testTable = new Table().setTableReference(ref);
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(200);
when(response.getContent()).thenReturn(toStream(testTable));
BigQueryServicesImpl.DatasetServiceImpl services =
new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
Table ret =
services.tryCreateTable(
testTable, new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF), Sleeper.DEFAULT);
assertEquals(testTable, ret);
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
}
示例6
@Test
public void testIsTableEmptyThrows() throws Exception {
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(401);
TableReference tableRef =
new TableReference()
.setProjectId("projectId")
.setDatasetId("datasetId")
.setTableId("tableId");
BigQueryServicesImpl.DatasetServiceImpl datasetService =
new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
thrown.expect(IOException.class);
thrown.expectMessage(String.format("Unable to list table data: %s", tableRef.getTableId()));
datasetService.isTableEmpty(tableRef, BackOff.STOP_BACKOFF, Sleeper.DEFAULT);
}
示例7
/**
* Parses a string into a TableReference; projectId may be omitted if the caller defines a
* "default" project; in such a case, getProjectId() of the returned TableReference will
* return null.
*
* @param tableRefString A string of the form [projectId]:[datasetId].[tableId].
* @return a TableReference with the parsed components.
*/
public static TableReference parseTableReference(String tableRefString) {
// Logic mirrored from cloud/helix/clients/cli/bigquery_client.py.
TableReference tableRef = new TableReference();
int projectIdEnd = tableRefString.lastIndexOf(':');
String datasetAndTableString = tableRefString;
if (projectIdEnd != -1) {
tableRef.setProjectId(tableRefString.substring(0, projectIdEnd));
// Omit the ':' from the remaining datasetId.tableId substring.
datasetAndTableString = tableRefString.substring(projectIdEnd + 1);
}
Preconditions.checkArgument(datasetAndTableString.matches(DATASET_AND_TABLE_REGEX),
"Invalid datasetAndTableString '%s'; must match regex '%s'.",
datasetAndTableString, DATASET_AND_TABLE_REGEX);
List<String> idParts = DOT_SPLITTER.splitToList(datasetAndTableString);
tableRef.setDatasetId(idParts.get(0));
tableRef.setTableId(idParts.get(1));
return tableRef;
}
示例8
static void verifyTableNotExistOrEmpty(DatasetService datasetService, TableReference tableRef) {
try {
if (datasetService.getTable(tableRef) != null) {
checkState(
datasetService.isTableEmpty(tableRef),
"BigQuery table is not empty: %s.",
toTableSpec(tableRef));
}
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new RuntimeException(
"unable to confirm BigQuery table emptiness for table " + toTableSpec(tableRef), e);
}
}
示例9
static void verifyDatasetPresence(DatasetService datasetService, TableReference table) {
try {
datasetService.getDataset(table.getProjectId(), table.getDatasetId());
} catch (Exception e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
throw new IllegalArgumentException(
String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", toTableSpec(table)), e);
} else if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(
String.format(
UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", toTableSpec(table)),
e);
}
}
}
示例10
static void verifyTablePresence(DatasetService datasetService, TableReference table) {
try {
datasetService.getTable(table);
} catch (Exception e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
throw new IllegalArgumentException(
String.format(RESOURCE_NOT_FOUND_ERROR, "table", toTableSpec(table)), e);
} else if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(
String.format(
UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table", toTableSpec(table)),
e);
}
}
}
示例11
private JobStatus runExtractJob(Job job, JobConfigurationExtract extract)
throws InterruptedException, IOException {
TableReference sourceTable = extract.getSourceTable();
List<TableRow> rows =
datasetService.getAllRows(
sourceTable.getProjectId(), sourceTable.getDatasetId(), sourceTable.getTableId());
TableSchema schema = datasetService.getTable(sourceTable).getSchema();
List<Long> destinationFileCounts = Lists.newArrayList();
for (String destination : extract.getDestinationUris()) {
destinationFileCounts.add(writeRows(sourceTable.getTableId(), rows, schema, destination));
}
job.setStatistics(
new JobStatistics()
.setExtract(new JobStatistics4().setDestinationUriFileCounts(destinationFileCounts)));
return new JobStatus().setState("DONE");
}
示例12
/**
* Updates the specified Bigquery table to reflect the metadata from the input.
*
* <p>Returns the input DestinationTable. If the specified table does not already exist, it will
* be inserted into the dataset.
*
* <p>Clients can call this function directly to update a table on demand, or can pass it to
* Futures.transform() to update a table produced as the asynchronous result of a load or query
* job (e.g. to add a description to it).
*/
private DestinationTable updateTable(final DestinationTable destinationTable) {
Table table = destinationTable.getTable();
TableReference ref = table.getTableReference();
try {
if (checkTableExists(ref.getDatasetId(), ref.getTableId())) {
// Make sure to use patch() rather than update(). The former changes only those properties
// which are specified, while the latter would change everything, blanking out unspecified
// properties.
bigquery
.tables()
.patch(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), table)
.execute();
} else {
bigquery.tables().insert(ref.getProjectId(), ref.getDatasetId(), table).execute();
}
return destinationTable;
} catch (IOException e) {
throw BigqueryJobFailureException.create(e);
}
}
示例13
@Override
public void validate(PipelineOptions pipelineOptions) {
BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class);
// The user specified a table.
if (getJsonTableRef() != null && getJsonTableRef().isAccessible() && getValidate()) {
TableReference table = getTableWithDefaultProject(options).get();
DatasetService datasetService = getBigQueryServices().getDatasetService(options);
// Check for destination table presence and emptiness for early failure notification.
// Note that a presence check can fail when the table or dataset is created by an earlier
// stage of the pipeline. For these cases the #withoutValidation method can be used to
// disable the check.
BigQueryHelpers.verifyDatasetPresence(datasetService, table);
if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
BigQueryHelpers.verifyTablePresence(datasetService, table);
}
if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
}
}
}
示例14
/**
* Returns the table to write, or {@code null} if writing with {@code tableFunction}.
*
* <p>If the table's project is not specified, use the executing project.
*/
@Nullable
ValueProvider<TableReference> getTableWithDefaultProject(BigQueryOptions bqOptions) {
ValueProvider<TableReference> table = getTable();
if (table == null) {
return table;
}
if (!table.isAccessible()) {
LOG.info(
"Using a dynamic value for table input. This must contain a project"
+ " in the table reference: {}",
table);
return table;
}
if (Strings.isNullOrEmpty(table.get().getProjectId())) {
// If user does not specify a project we assume the table to be located in
// the default project.
TableReference tableRef = table.get();
tableRef.setProjectId(bqOptions.getProject());
return NestedValueProvider.of(
StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)),
new JsonTableRefToTableRef());
}
return table;
}
示例15
/**
* Runs a federated import job on BigQuery for the data in the output path in addition to calling
* the delegate's commitJob.
*/
@Override
public void commitJob(JobContext context) throws IOException {
super.commitJob(context);
// Get the destination configuration information.
Configuration conf = context.getConfiguration();
TableReference destTable = BigQueryOutputConfiguration.getTableReference(conf);
String jobProjectId = BigQueryOutputConfiguration.getJobProjectId(conf);
Optional<BigQueryTableSchema> destSchema = BigQueryOutputConfiguration.getTableSchema(conf);
BigQueryFileFormat outputFileFormat = BigQueryOutputConfiguration.getFileFormat(conf);
List<String> sourceUris = getOutputFileURIs();
getBigQueryHelper()
.importFederatedFromGcs(
jobProjectId,
destTable,
destSchema.isPresent() ? destSchema.get().get() : null,
outputFileFormat,
sourceUris);
}
示例16
@BeforeClass
public static void setupTestEnvironment() throws Exception {
PipelineOptionsFactory.register(BigQueryToTableOptions.class);
project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
// Create one BQ dataset for all test cases.
BQ_CLIENT.createNewDataset(project, BIG_QUERY_DATASET_ID);
// Create table and insert data for new type query test cases.
BQ_CLIENT.createNewTable(
project,
BIG_QUERY_DATASET_ID,
new Table()
.setSchema(BigQueryToTableIT.NEW_TYPES_QUERY_TABLE_SCHEMA)
.setTableReference(
new TableReference()
.setTableId(BigQueryToTableIT.NEW_TYPES_QUERY_TABLE_NAME)
.setDatasetId(BIG_QUERY_DATASET_ID)
.setProjectId(project)));
BQ_CLIENT.insertDataToTable(
project,
BIG_QUERY_DATASET_ID,
BigQueryToTableIT.NEW_TYPES_QUERY_TABLE_NAME,
BigQueryToTableIT.NEW_TYPES_QUERY_TABLE_DATA);
}
示例17
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
// Create table reference.
tableRef =
new TableReference()
.setProjectId(dataProjectId)
.setDatasetId(datasetId)
.setTableId(tableId);
Table table =
new Table()
.setTableReference(tableRef)
.setLocation("test_location")
.setNumRows(BigInteger.valueOf(23))
.setNumBytes(3L * 128 * 1024 * 1024);
when(bqHelper.getTable(any(TableReference.class))).thenReturn(table);
config = new JobConf();
config.set(BigQueryConfiguration.PROJECT_ID.getKey(), jobProjectId);
config.set(BigQueryConfiguration.INPUT_PROJECT_ID.getKey(), dataProjectId);
config.set(BigQueryConfiguration.INPUT_DATASET_ID.getKey(), datasetId);
config.set(BigQueryConfiguration.INPUT_TABLE_ID.getKey(), tableId);
config.set(MRJobConfig.NUM_MAPS, "3");
config.set(BigQueryConfiguration.SKEW_LIMIT.getKey(), "1.2");
config.set(BigQueryConfiguration.SQL_FILTER.getKey(), "foo == 0");
config.set(BigQueryConfiguration.SELECTED_FIELDS.getKey(), "foo,bar");
input = new TestDirectBigQueryInputFormat();
}
示例18
/**
* Given a KV containing a destination and a message, return the message content as a {@link
* TableRow} ready to pass to {@link org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO}.
*/
public TableRow kvToTableRow(KV<TableDestination, PubsubMessage> kv) {
if (format == null) {
format = createFormat();
}
final TableReference ref = kv.getKey().getTableReference();
final TableId tableId = TableId.of(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
final PubsubMessage message = kv.getValue();
return Json.asTableRow(format.apply(tableId, message.getAttributeMap(), message.getPayload()));
}
示例19
private static Export constructExport(
Configuration configuration,
ExportFileFormat format,
String exportPath,
BigQueryHelper bigQueryHelper,
InputFormat<LongWritable, Text> delegateInputFormat)
throws IOException {
logger.atFine().log("constructExport() with export path %s", exportPath);
// Extract relevant configuration settings.
Map<String, String> mandatoryConfig =
getMandatoryConfig(configuration, MANDATORY_CONFIG_PROPERTIES_INPUT);
String jobProjectId = mandatoryConfig.get(PROJECT_ID.getKey());
String inputProjectId = mandatoryConfig.get(INPUT_PROJECT_ID.getKey());
String datasetId = mandatoryConfig.get(INPUT_DATASET_ID.getKey());
String tableName = mandatoryConfig.get(INPUT_TABLE_ID.getKey());
TableReference exportTableReference = new TableReference()
.setDatasetId(datasetId)
.setProjectId(inputProjectId)
.setTableId(tableName);
Table table = bigQueryHelper.getTable(exportTableReference);
if (EXTERNAL_TABLE_TYPE.equals(table.getType())) {
logger.atInfo().log("Table is already external, so skipping export");
return new NoopFederatedExportToCloudStorage(
configuration, format, bigQueryHelper, jobProjectId, table, delegateInputFormat);
}
return new UnshardedExportToCloudStorage(
configuration,
exportPath,
format,
bigQueryHelper,
jobProjectId,
table,
delegateInputFormat);
}
示例20
@Override
public TableDestination getTable(String targetTable) {
String changelogTableName = getBigQueryTableName(targetTable, true);
TableReference tableRef = new TableReference()
.setTableId(changelogTableName)
.setDatasetId(changeLogDataset)
.setProjectId(gcpProjectId);
String description = String.format("Changelog Table for {}", targetTable);
return new TableDestination(tableRef, description);
}
示例21
/**
* Generates a {@link BigQueryInsertError} with the {@link GenericRecord} and error message.
*
* @param record payload to be used for the test
* @param errorMessage error message for the test
*/
private static BigQueryInsertError getBigQueryInsertError(GenericRecord record,
String errorMessage) {
Row beamRow = AvroUtils
.toBeamRowStrict(record, AvroUtils.toBeamSchema(record.getSchema()));
TableRow tableRow = BigQueryUtils.toTableRow(beamRow);
TableReference tableReference = new TableReference();
return new BigQueryInsertError(tableRow.clone(), getInsertErrors(errorMessage), tableReference);
}
示例22
/** Tests that {@link BigQueryServicesImpl} retries quota rate limited attempts. */
@Test
public void testCreateTableRetry() throws IOException {
TableReference ref =
new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
Table testTable = new Table().setTableReference(ref);
// First response is 403 rate limited, second response has valid payload.
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(403).thenReturn(200);
when(response.getContent())
.thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
.thenReturn(toStream(testTable));
BigQueryServicesImpl.DatasetServiceImpl services =
new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
Table ret =
services.tryCreateTable(
testTable, new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF), Sleeper.DEFAULT);
assertEquals(testTable, ret);
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
verifyNotNull(ret.getTableReference());
expectedLogs.verifyInfo(
"Quota limit reached when creating table project:dataset.table, "
+ "retrying up to 5.0 minutes");
}
示例23
private static TableReference getWebResourceTableReference(IndexerPipelineOptions options) {
TableReference tableRef = new TableReference();
tableRef.setProjectId(options.getProject());
tableRef.setDatasetId(options.getBigQueryDataset());
tableRef.setTableId(IndexerPipelineUtils.WEBRESOURCE_TABLE);
return tableRef;
}
示例24
private static TableReference getSentimentTableReference(IndexerPipelineOptions options) {
TableReference tableRef = new TableReference();
tableRef.setProjectId(options.getProject());
tableRef.setDatasetId(options.getBigQueryDataset());
tableRef.setTableId(IndexerPipelineUtils.SENTIMENT_TABLE);
return tableRef;
}
示例25
/** Utility to construct an output table reference. */
static TableReference getTable(String projectId, String datasetId, String tableName) {
TableReference table = new TableReference();
table.setDatasetId(datasetId);
table.setProjectId(projectId);
table.setTableId(tableName);
return table;
}
示例26
/**
* Tests the cleanupJob method of GsonBigQueryInputFormat with no intermediate delete.
*/
@Test
public void testCleanupJobWithNoIntermediateDelete()
throws IOException {
config.setBoolean(BigQueryConfiguration.DELETE_EXPORT_FILES_FROM_GCS.getKey(), true);
when(mockBigQueryHelper.getTable(any(TableReference.class)))
.thenReturn(new Table());
Path tempPath = new Path(BigQueryConfiguration.TEMP_GCS_PATH.get(config, config::get));
FileSystem fs = tempPath.getFileSystem(config);
fs.mkdirs(tempPath);
Path dataFile = new Path(tempPath.toString() + "/data-00000.json");
fs.createNewFile(dataFile);
assertThat(fs.exists(tempPath)).isTrue();
assertThat(fs.exists(dataFile)).isTrue();
// Run method and verify calls.
GsonBigQueryInputFormat.cleanupJob(mockBigQueryHelper, config);
assertThat(!fs.exists(tempPath)).isTrue();
assertThat(!fs.exists(dataFile)).isTrue();
verify(mockBigQueryHelper, times(1)).getTable(eq(tableRef));
verifyNoMoreInteractions(mockBigquery, mockBigqueryTables);
}
示例27
private void setupBigQueryTable(
String projectId, String datasetId, String tableId, TableSchema schema) throws IOException {
if (bigQueryClient == null) {
bigQueryClient = newBigQueryClient(options.as(BigQueryOptions.class)).build();
}
Datasets datasetService = bigQueryClient.datasets();
if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) {
Dataset newDataset =
new Dataset()
.setDatasetReference(
new DatasetReference().setProjectId(projectId).setDatasetId(datasetId));
datasetService.insert(projectId, newDataset).execute();
}
Tables tableService = bigQueryClient.tables();
Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId));
if (table == null) {
Table newTable =
new Table()
.setSchema(schema)
.setTableReference(
new TableReference()
.setProjectId(projectId)
.setDatasetId(datasetId)
.setTableId(tableId));
tableService.insert(projectId, datasetId, newTable).execute();
} else if (!table.getSchema().equals(schema)) {
throw new RuntimeException(
"Table exists and schemas do not match, expecting: "
+ schema.toPrettyString()
+ ", actual: "
+ table.getSchema().toPrettyString());
}
}
示例28
@BeforeClass
public static void prepareDatasetAndDataTables() throws Exception {
BIGQUERY_CLIENT.createNewDataset(PROJECT_ID, DATASET_ID);
TableSchema dataTableSchema =
new TableSchema()
.setFields(
Collections.singletonList(
new TableFieldSchema().setName(DATA_FIELD_NAME).setType(DATA_FIELD_TYPE)));
Table dataTableNonEmpty =
new Table()
.setSchema(dataTableSchema)
.setTableReference(
new TableReference()
.setProjectId(PROJECT_ID)
.setDatasetId(DATASET_ID)
.setTableId(DATA_TABLE_ID_NON_EMPTY));
BIGQUERY_CLIENT.createNewTable(PROJECT_ID, DATASET_ID, dataTableNonEmpty);
// Prepopulates dataTableNonEmpty with TEST_DATA
List<Map<String, Object>> rows =
TEST_DATA.stream()
.map(v -> Collections.singletonMap(DATA_FIELD_NAME, (Object) v))
.collect(Collectors.toList());
BIGQUERY_CLIENT.insertDataToTable(PROJECT_ID, DATASET_ID, DATA_TABLE_ID_NON_EMPTY, rows);
Table dataTableEmpty =
new Table()
.setSchema(dataTableSchema)
.setTableReference(
new TableReference()
.setProjectId(PROJECT_ID)
.setDatasetId(DATASET_ID)
.setTableId(DATA_TABLE_ID_EMPTY));
BIGQUERY_CLIENT.createNewTable(PROJECT_ID, DATASET_ID, dataTableEmpty);
}
示例29
@Test
public void testRead() throws Exception {
TableReference bqTable = bigQuery.tableReference();
// Streaming inserts do not work with DIRECT_READ mode, there is a several hour lag.
PCollection<Row> data =
writePipeline.apply(Create.of(row(1, "name1"), row(2, "name2"), row(3, "name3")));
data.apply(
BigQueryIO.<Row>write()
.withSchema(BigQueryUtils.toTableSchema(ID_NAME_SCHEMA))
.withFormatFunction(BigQueryUtils.toTableRow())
.withMethod(Method.FILE_LOADS)
.to(bqTable));
writePipeline.run().waitUntilFinish(Duration.standardMinutes(2));
String tableId =
String.format(
"bigquery.`table`.`%s`.`%s`.`%s`",
bqTable.getProjectId(), bqTable.getDatasetId(), bqTable.getTableId());
readPipeline
.getOptions()
.as(BeamSqlPipelineOptions.class)
.setPlannerName(queryPlanner.getCanonicalName());
try (DataCatalogTableProvider tableProvider =
DataCatalogTableProvider.create(
readPipeline.getOptions().as(DataCatalogPipelineOptions.class))) {
PCollection<Row> result =
readPipeline.apply(
"query",
SqlTransform.query("SELECT id, name FROM " + tableId)
.withDefaultTableProvider("datacatalog", tableProvider));
PAssert.that(result).containsInAnyOrder(row(1, "name1"), row(2, "name2"), row(3, "name3"));
readPipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
}
示例30
public TableDestination(
TableReference tableReference,
@Nullable String tableDescription,
@Nullable String jsonTimePartitioning,
@Nullable String jsonClustering) {
this(
BigQueryHelpers.toTableSpec(tableReference),
tableDescription,
jsonTimePartitioning,
jsonClustering);
}