Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions docs/content/spark/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,62 @@ CREATE TABLE my_table_all (
CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all;
```

### Replace Table

Paimon supports preserving snapshot history for Spark `REPLACE TABLE` from **Spark 3.4**.

```sql
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT,
behavior STRING
) TBLPROPERTIES (
'primary-key' = 'user_id',
'bucket' = '2'
);

INSERT INTO my_table VALUES (1, 10, 'pv');

REPLACE TABLE my_table (
user_id BIGINT,
item_id BIGINT,
category STRING
) TBLPROPERTIES (
'primary-key' = 'user_id',
'bucket' = '4'
);
```

In Paimon, this is not an atomic replacement. Paimon changes Spark's drop+create replace path to
truncate the current table and commit a new schema, while preserving the table location and snapshot
history. The current table becomes empty and uses the new schema, but old snapshots can still be
queried by time travel.

```sql
SELECT * FROM my_table;

SELECT * FROM my_table VERSION AS OF 1;
```

`REPLACE TABLE` requires the table to exist. If the table does not exist, use
`CREATE OR REPLACE TABLE` instead.

`REPLACE TABLE` does not accept `AS SELECT`. To replace a table and populate it with query results,
use `CREATE OR REPLACE TABLE ... AS SELECT`.

```sql
CREATE OR REPLACE TABLE my_table
TBLPROPERTIES (
'primary-key' = 'user_id',
'bucket' = '4'
)
AS SELECT user_id, item_id, behavior FROM source_table;
```

When the existing table and target table use different table types,
uses its fallback drop+create behavior instead of snapshot-preserving replace
behavior.

### Create Table Like

A new table can be created from an existing source table. Available from **Spark 3.4**.
Expand Down
19 changes: 19 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.paimon.rest.requests.RegisterTableRequest;
import org.apache.paimon.rest.requests.RenameBranchRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.rest.requests.ReplaceTableRequest;
import org.apache.paimon.rest.requests.ResetConsumerRequest;
import org.apache.paimon.rest.requests.RollbackSchemaRequest;
import org.apache.paimon.rest.requests.RollbackTableRequest;
Expand Down Expand Up @@ -778,6 +779,24 @@ public void alterTable(Identifier identifier, List<SchemaChange> changes) {
restAuthFunction);
}

/**
* Replace table.
*
* @param identifier database name and table name.
* @param schema schema to replace table
* @throws NoSuchResourceException Exception thrown on HTTP 404 means the table not exists
* @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for
* this table
*/
public void replaceTable(Identifier identifier, Schema schema) {
ReplaceTableRequest request = new ReplaceTableRequest(schema);
client.post(
resourcePaths.replaceTable(
identifier.getDatabaseName(), identifier.getObjectName()),
request,
restAuthFunction);
}

/**
* Auth table query.
*
Expand Down
11 changes: 11 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ public String renameTable() {
return SLASH.join(V1, prefix, TABLES, "rename");
}

public String replaceTable(String databaseName, String objectName) {
return SLASH.join(
V1,
prefix,
DATABASES,
encodeString(databaseName),
TABLES,
encodeString(objectName),
"replace");
}

public String commitTable(String databaseName, String objectName) {
return SLASH.join(
V1,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest.requests;

import org.apache.paimon.rest.RESTRequest;
import org.apache.paimon.schema.Schema;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

/** Request for replacing table. */
@JsonIgnoreProperties(ignoreUnknown = true)
public class ReplaceTableRequest implements RESTRequest {

private static final String FIELD_SCHEMA = "schema";

@JsonProperty(FIELD_SCHEMA)
private final Schema schema;

@JsonCreator
public ReplaceTableRequest(@JsonProperty(FIELD_SCHEMA) Schema schema) {
this.schema = schema;
}

@JsonGetter(FIELD_SCHEMA)
public Schema getSchema() {
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.PagedList;
import org.apache.paimon.Snapshot;
import org.apache.paimon.TableType;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
Expand All @@ -40,6 +41,7 @@
import org.apache.paimon.table.Instant;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableSnapshot;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.SnapshotNotExistException;

Expand Down Expand Up @@ -495,6 +497,94 @@ public void alterTable(
protected abstract void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException;

@Override
public void replaceTable(Identifier identifier, Schema newSchema, boolean ignoreIfNotExists)
throws TableNotExistException {
checkNotBranch(identifier, "replaceTable");
checkNotSystemTable(identifier, "replaceTable");
validateCreateTable(newSchema, false);
validateCustomTablePath(newSchema.options());
copyTableDefaultOptions(newSchema.options());

Table existing;
try {
existing = getTable(identifier);
} catch (TableNotExistException e) {
if (ignoreIfNotExists) {
return;
}
throw e;
}

TableType targetTableType = Options.fromMap(newSchema.options()).get(TYPE);
if (!(existing instanceof FileStoreTable) || !targetTableType.equals(TableType.TABLE)) {
dropAndCreateTable(identifier, newSchema);
return;
}

// todo: support this
List<String> oldPartitionKeys = ((FileStoreTable) existing).schema().partitionKeys();
List<String> newPartitionKeys = newSchema.partitionKeys();
if (!Objects.equals(oldPartitionKeys, newPartitionKeys)) {
throw new UnsupportedOperationException(
"replaceTable does not support changing partition keys (old="
+ oldPartitionKeys
+ ", new="
+ newPartitionKeys
+ "). Drop and re-create the table instead.");
}

replaceTableImpl(identifier, (FileStoreTable) existing, newSchema);
}

private void dropAndCreateTable(Identifier identifier, Schema newSchema)
throws TableNotExistException {
dropTable(identifier, false);
try {
createTable(identifier, newSchema, false);
} catch (TableAlreadyExistException | DatabaseNotExistException e) {
throw new RuntimeException(e);
}
}

/** Truncate visible data first, then append the new schema. Non-atomic on failure. */
protected void replaceTableImpl(
Identifier identifier, FileStoreTable existingTable, Schema newSchema)
throws TableNotExistException {
truncateTable(existingTable);
try {
appendNewSchema(existingTable, newSchema);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/** Append a new schema (id = latest + 1) via atomic CAS. Returns the new schema id. */
protected long appendNewSchema(FileStoreTable existingTable, Schema newSchema)
throws Exception {
SchemaManager sm = existingTable.schemaManager();
while (true) {
TableSchema latest = sm.latestOrThrow("Cannot replace: schema chain is empty.");
TableSchema staged = TableSchema.create(latest.id() + 1, newSchema);
if (sm.commit(staged)) {
return staged.id();
}
}
}

protected void truncateTable(FileStoreTable existingTable) {
try (TableCommitImpl commit =
existingTable.newCommit("replace-table-" + java.util.UUID.randomUUID())) {
commit.truncateTable();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
return CatalogUtils.loadTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
Expand Down Expand Up @@ -224,6 +225,13 @@ public void alterTable(
invalidateTable(identifier);
}

@Override
public void replaceTable(Identifier identifier, Schema newSchema, boolean ignoreIfNotExists)
throws TableNotExistException {
super.replaceTable(identifier, newSchema, ignoreIfNotExists);
invalidateTable(identifier);
}

@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
Table table = tableCache.getIfPresent(identifier);
Expand Down
19 changes: 19 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,25 @@ default void alterTable(Identifier identifier, SchemaChange change, boolean igno
alterTable(identifier, Collections.singletonList(change), ignoreIfNotExists);
}

/**
* Replace an existing table with a new {@link Schema}.
*
* <p>For compatible FileStore tables, it truncates visible data and appends a new schema to the
* schema chain, preserving old schemas and snapshots for time travel. Other table kinds may
* fall back to drop-and-create behavior.
*
* @param identifier path of the table to be replaced
* @param newSchema the new {@link Schema}
* @param ignoreIfNotExists if true, do nothing when the table does not exist
* @throws TableNotExistException if the table does not exist and {@code ignoreIfNotExists} is
* false
*/
default void replaceTable(Identifier identifier, Schema newSchema, boolean ignoreIfNotExists)
throws TableNotExistException {
throw new UnsupportedOperationException(
"Catalog " + getClass().getName() + " does not support replaceTable.");
}

// ======================= partition methods ===============================

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ public void alterTable(
wrapped.alterTable(identifier, changes, ignoreIfNotExists);
}

@Override
public void replaceTable(Identifier identifier, Schema newSchema, boolean ignoreIfNotExists)
throws TableNotExistException {
wrapped.replaceTable(identifier, newSchema, ignoreIfNotExists);
}

@Override
public void registerTable(Identifier identifier, String path)
throws TableAlreadyExistException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -179,6 +180,19 @@ protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
}
}

@Override
protected void replaceTableImpl(
Identifier identifier, FileStoreTable existingTable, Schema newSchema) {
truncateTable(existingTable);
try {
runWithLock(identifier, () -> appendNewSchema(existingTable, newSchema));
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

protected static <T> T uncheck(Callable<T> callable) {
try {
return callable.call();
Expand Down
22 changes: 22 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,28 @@ public void alterTable(
}
}

@Override
public void replaceTable(Identifier identifier, Schema newSchema, boolean ignoreIfNotExists)
throws TableNotExistException {
checkNotBranch(identifier, "replaceTable");
checkNotSystemTable(identifier, "replaceTable");
validateCreateTable(newSchema, dataTokenEnabled);
try {
tableDefaultOptions.forEach(newSchema.options()::putIfAbsent);
api.replaceTable(identifier, newSchema);
} catch (NoSuchResourceException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(identifier);
}
} catch (NotImplementedException e) {
throw new UnsupportedOperationException(e.getMessage());
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
} catch (BadRequestException e) {
throw new IllegalArgumentException(e.getMessage());
}
}

@Override
public TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List<String> select)
throws TableNotExistException {
Expand Down
Loading