Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#6361] feat(paimon):Support specifying primary keys during create paimon table by flink #6362

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.compress.utils.Lists;
Expand All @@ -40,6 +41,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
Expand Down Expand Up @@ -75,7 +77,11 @@
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.indexes.Indexes;

/**
* The BaseCatalog that provides a default implementation for all methods in the {@link
Expand Down Expand Up @@ -276,8 +282,21 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
propertiesConverter.toGravitinoTableProperties(table.getOptions());
Transform[] partitions =
partitionConverter.toGravitinoPartitions(((CatalogTable) table).getPartitionKeys());

try {
catalog().asTableCatalog().createTable(identifier, columns, comment, properties, partitions);

Index[] indices = getGrivatinoIndeics(resolvedTable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use getGrivatinoIndices?

catalog()
.asTableCatalog()
.createTable(
identifier,
columns,
comment,
properties,
partitions,
Distributions.NONE,
new SortOrder[0],
indices);
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(catalogName(), tablePath.getDatabaseName(), e);
} catch (TableAlreadyExistsException e) {
Expand All @@ -289,6 +308,20 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
}

private static Index[] getGrivatinoIndeics(ResolvedCatalogBaseTable<?> resolvedTable) {
Optional<UniqueConstraint> primaryKey = resolvedTable.getResolvedSchema().getPrimaryKey();
List<String> primaryColumns = primaryKey.map(UniqueConstraint::getColumns).orElse(null);
if (primaryColumns == null) {
return new Index[0];
}
String[][] primaryField =
primaryColumns.stream()
.map(primaryColumn -> new String[] {primaryColumn})
.toArray(String[][]::new);
Index primary = Indexes.primary("primary", primaryField);
return new Index[] {primary};
}

/**
* The method only is used to change the comments. To alter columns, use the other alterTable API
* and provide a list of TableChanges.
Expand Down Expand Up @@ -521,12 +554,30 @@ protected CatalogBaseTable toFlinkTable(Table table) {
.column(column.name(), column.nullable() ? flinkType.nullable() : flinkType.notNull())
.withComment(column.comment());
}
handleFlinkPrimaryKey(table, builder);
Map<String, String> flinkTableProperties =
propertiesConverter.toFlinkTableProperties(table.properties());
List<String> partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning());
return CatalogTable.of(builder.build(), table.comment(), partitionKeys, flinkTableProperties);
}

private static void handleFlinkPrimaryKey(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you use String getFlinkPrimaryKey(Table table) to return the primary field name, builder could use field name to build primary key.

Table table, org.apache.flink.table.api.Schema.Builder builder) {
List<Index> primaryKeyList =
Arrays.stream(table.index())
.filter(index -> index.type() == Index.IndexType.PRIMARY_KEY)
.collect(Collectors.toList());
if (primaryKeyList.isEmpty()) {
return;
}
Preconditions.checkArgument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add the check for the size of filednames too?

primaryKeyList.size() == 1, "More than one primary key is not supported.");
builder.primaryKey(
Arrays.stream(primaryKeyList.get(0).fieldNames())
.map(fieldNames -> fieldNames[0])
.collect(Collectors.toList()));
}

private Column toGravitinoColumn(org.apache.flink.table.catalog.Column column) {
return Column.of(
column.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.types.Types;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -80,6 +81,10 @@ protected boolean supportGetSchemaWithoutCommentAndOption() {

protected abstract boolean supportDropCascade();

protected boolean supportsPrimaryKey() {
return true;
}

@Test
public void testCreateSchema() {
doWithCatalog(
Expand Down Expand Up @@ -280,6 +285,91 @@ public void testCreateSimpleTable() {
supportDropCascade());
}

@Test
@EnabledIf("supportsPrimaryKey")
public void testCreateTableWithPrimaryKey() {
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
String databaseName = "test_create_table_with_primary_key_db";
String tableName = "test_create_primary_key_table";
String comment = "test comment";
String key = "test key";
String value = "test value";

doWithSchema(
currentCatalog(),
databaseName,
catalog -> {
sql(
"CREATE TABLE %s "
+ "(aa int, "
+ " bb int,"
+ " cc int,"
+ " PRIMARY KEY (aa,bb) NOT ENFORCED"
+ ")"
+ " COMMENT '%s' WITH ("
+ "'%s' = '%s')",
tableName, comment, key, value);
Table table =
catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName));
Assertions.assertEquals(1, table.index().length);
Index index = table.index()[0];
Assertions.assertEquals("aa", index.fieldNames()[0][0]);
Assertions.assertEquals("bb", index.fieldNames()[1][0]);

TestUtils.assertTableResult(
sql("INSERT INTO %s VALUES(1,2,3)", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(-1));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1));
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 3));

TestUtils.assertTableResult(
sql("INSERT INTO %s VALUES(1,2,4)", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(-1));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1));
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 4));

TestUtils.assertTableResult(
sql("INSERT INTO %s VALUES(1,3,4)", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(-1));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(2));
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1, 2, 4),
Row.of(1, 3, 4));

TestUtils.assertTableResult(
sql("INSERT INTO %s VALUES(2,2,4)", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(-1));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(3));
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1, 2, 4),
Row.of(1, 3, 4),
Row.of(2, 2, 4));
},
true,
supportDropCascade());
}

@Test
@EnabledIf("supportTableOperation")
public void testListTables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {

private static org.apache.gravitino.Catalog hiveCatalog;

@Override
protected boolean supportsPrimaryKey() {
return false;
}

@BeforeAll
void hiveStartUp() {
initDefaultHiveCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public abstract class FlinkIcebergCatalogIT extends FlinkCommonIT {

private static org.apache.gravitino.Catalog icebergCatalog;

@Override
protected boolean supportsPrimaryKey() {
return false;
}

@BeforeAll
public void before() {
Preconditions.checkNotNull(metalake);
Expand Down
Loading