-
Notifications
You must be signed in to change notification settings - Fork 208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Improve]Schema change parses ddl sql using jsqlparser framework #422
Conversation
Thanks to @DongLiang-0 for his contribution. It is suggested to control it through a switch here. |
2c60f2d
to
4c5595e
Compare
4c5595e
to
9b39600
Compare
...rc/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
Show resolved
Hide resolved
if (org.apache.commons.lang3.StringUtils.isEmpty(schemaChangeMode)) { | ||
return this; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which schemachangemode is used when it is empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -90,6 +92,7 @@ public static void main(String[] args) throws Exception { | |||
.setTableConfig(tableConfig) | |||
.setCreateTableOnly(false) | |||
.setNewSchemaChange(useNewSchemaChange) | |||
.setSchemaChangeMode(schemaChangeMode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if this is not set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserManager.java
Outdated
Show resolved
Hide resolved
|
||
protected void extractSourceConnector(JsonNode record) { | ||
if (Objects.isNull(sourceConnector)) { | ||
sourceConnector = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will sourceConnector be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the program is null when it is started, because the sourceConnector is extracted from the schema change data.
} else { | ||
LOG.info("Unsupported event type {}", eventType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this branch is missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
List<String> ddlList = tryParserAlterDDLs(recordRoot); | ||
status = executeAlterDDLs(ddlList, recordRoot, dorisTableTuple, status); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing an else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
...c/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserService.java
Outdated
Show resolved
Hide resolved
if (columnSpecs.contains("default")) { | ||
int defaultIndex = columnSpecs.indexOf("default"); | ||
defaultValue = removeQuotes(columnSpecs.get(defaultIndex + 1)); | ||
} else if (columnSpecs.contains("DEFAULT")) { | ||
int defaultIndex = columnSpecs.indexOf("DEFAULT"); | ||
defaultValue = removeQuotes(columnSpecs.get(defaultIndex + 1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be any problem with this judgment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private String extractComment(List<String> columnSpecs) { | ||
String comment = null; | ||
if (columnSpecs.contains("comment")) { | ||
int commentIndex = columnSpecs.indexOf("comment"); | ||
comment = removeQuotes(columnSpecs.get(commentIndex + 1)); | ||
} | ||
if (columnSpecs.contains("COMMENT")) { | ||
int commentIndex = columnSpecs.indexOf("COMMENT"); | ||
comment = removeQuotes(columnSpecs.get(commentIndex + 1)); | ||
} | ||
return comment; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same above, does JSqlParser have any suspicious methods to directly obtain the Default value and Comment?
...doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
Outdated
Show resolved
Hide resolved
...doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Proposed changes
Problem Summary:
Introduce the
com.github.jsqlparser
framework to parse DDL SQL statements when schema changes.The original change principle is to infer the columns that need to be added, deleted or modified in this change based on the last schema structure. In some special scenarios, this method will have certain bugs.
After the jsqlparser framework is introduced, the content of this schema change can be parsed based on the ddl passed by the upstream flink-cdc. Very accurate and no need to rely on the last schema.
At the same time, we also deprecated
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImpl
, which uses regular expressions to match DDL. It is recommended to useorg.apache.doris.flink.sink.writer.serializer.jsondebezium .SQLParserService
.Through
schema-change-mode
, you can specify which way to parse DDL:sql_parser
to use JSQLParser to parse DDL statements;debezium_structure
parameter, which infers schema change changes by parsing the upstream Debezium structure.Checklist(Required)
Further comments
If this is a relatively large or complex change, kick off the discussion at dev@doris.apache.org by explaining why you chose the solution you did and what alternatives you considered, etc...