diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManager.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManager.java index 45c22b1f0d55..5ca6a8c4655c 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManager.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManager.java @@ -156,6 +156,20 @@ private synchronized Table getTableIfExists(String tableId) throws IllegalStateE return table; } + private void validateTableCreationInputs(String tableName, Schema schema) { + BigQueryResourceManagerUtils.checkValidTableId(tableName); + if (schema == null) { + throw new IllegalArgumentException("A valid schema must be provided to create a table."); + } + } + + private void ensureDatasetExists() { + if (dataset == null) { + createDataset(DEFAULT_DATASET_REGION); + } + checkHasDataset(); + } + /** * Helper method for logging individual errors thrown by inserting rows to a table. This method is * used to log errors thrown by inserting certain rows when other rows were successful. @@ -235,18 +249,8 @@ public synchronized TableId createTable(String tableName, Schema schema) public synchronized TableId createTable( String tableName, Schema schema, Long expirationTimeMillis) throws BigQueryResourceManagerException { - // Check table ID - BigQueryResourceManagerUtils.checkValidTableId(tableName); - - // Check schema - if (schema == null) { - throw new IllegalArgumentException("A valid schema must be provided to create a table."); - } - // Create a default dataset if this resource manager has not already created one - if (dataset == null) { - createDataset(DEFAULT_DATASET_REGION); - } - checkHasDataset(); + validateTableCreationInputs(tableName, schema); + ensureDatasetExists(); LOG.info("Creating table using tableName '{}'.", tableName); // Create the table if it does not already exist in the dataset @@ -313,13 +317,7 @@ public synchronized TableId createTimePartitionedTable( public synchronized TableId createTimePartitionedTable( String tableName, Schema schema, TimePartitioning timePartitioning, Long expirationTimeMillis) throws BigQueryResourceManagerException { - // Check table ID - BigQueryResourceManagerUtils.checkValidTableId(tableName); - - // Check schema - if (schema == null) { - throw new IllegalArgumentException("A valid schema must be provided to create a table."); - } + validateTableCreationInputs(tableName, schema); // Check time partition details if (timePartitioning == null) { @@ -327,11 +325,7 @@ public synchronized TableId createTimePartitionedTable( "A valid TimePartition object must be provided to create a time paritioned table. Use createTable instead to create non-partitioned tables."); } - // Create a default dataset if this resource manager has not already created one - if (dataset == null) { - createDataset(DEFAULT_DATASET_REGION); - } - checkHasDataset(); + ensureDatasetExists(); LOG.info( "Creating time partitioned table using tableName '{}' on field '{}'.",