Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35804][table-planner] Fix incorrect calc merge during decorrelate phase #25068

Merged
merged 7 commits into from
Jul 14, 2024

Conversation

zhaorongsheng
Copy link
Contributor

@zhaorongsheng zhaorongsheng commented Jul 10, 2024

What is the purpose of the change

Like the same issue in FLINK-30841.

Take one test as example:

@Test
def testCalcMergeWithNonDeterministicExpr3(): Unit =
{ 
  val sqlUdtfQuery = "SELECT a, b, len FROM MyTable, LATERAL TABLE (length_udtf(c)) AS T(len)" 
  val sqlView1Query = "SELECT a, b, len " + s"FROM ($sqlUdtfQuery) t JOIN MyTable_Join t2 " + "ON t.a = t2.d" 
  val view1 = util.tableEnv.sqlQuery(sqlView1Query) 
  util.tableEnv.createTemporaryView("View1", view1) 
  val sqlView2Query = "SELECT random_udf(b) AS r FROM View1" 
  val view2 = util.tableEnv.sqlQuery(sqlView2Query) 
  util.tableEnv.createTemporaryView("View2", view2) 
  val sqlQuery = "SELECT r FROM View2 WHERE r > 10" 
  util.verifyRelPlan(sqlQuery) 
}

optimized plan will be wrong:

Calc(select=[random_udf(b) AS r])
+- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, b], where=[>(random_udf(b), 10)])
: +- Correlate(invocation=[length_udtf($cor0.c)], correlate=[table(length_udtf($cor0.c))], select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, INTEGER EXPR$0)], joinType=[INNER])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[d]])
+- Calc(select=[d])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable_Join, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])

the expected plan is:

Calc(select=[r], where=[>(r, 10)])
+- Calc(select=[random_udf(b) AS r])
+-Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
:+-Calc(select=[a, b])
:+-Correlate(invocation=[length_udtf($cor0.c)], correlate=[table(length_udtf($cor0.c))], select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, INTEGER EXPR$0)], joinType=[INNER])
:+-LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+-Exchange(distribution=[hash[d]])
+-Calc(select=[d])
+-LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])

Brief change log

change rule related to this problem in RelDecorrelator

Verifying this change

Add new plan test in CalcTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
@flinkbot
Copy link
Collaborator

flinkbot commented Jul 10, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build
@zhaorongsheng zhaorongsheng changed the title [FLINK-35804][table-planner] Fix incorrect calc merge to avoid wrong plans Jul 10, 2024
@zhaorongsheng
Copy link
Contributor Author

@lincoln-lil @godfreyhe Would you mind reviewing this pr? Thanks~

@zhaorongsheng
Copy link
Contributor Author

@flinkbot Could someone review this pr? Thanks~

Copy link
Contributor

@lincoln-lil lincoln-lil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhaorongsheng Thanks for fixing this! The commit msg can be "Fix incorrect calc merge during decorrelate phase" since it happens in RelDecorrelator.

@@ -207,4 +210,19 @@ class CalcTest extends TableTestBase {
val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE random_udf(b) > 10"
util.verifyRelPlan(sqlQuery)
}

@Test
def testCalcMergeWithNonDeterministicExpr3(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use a minimal case to reproduce the error, e.g.,

  @Test
  def testCalcMergeWithCorrelate(): Unit = {
    util.addTemporarySystemFunction("str_split", new StringSplit())
    val sqlQuery =
      """
        |
        |SELECT a, r FROM (
        | SELECT a, random_udf(b) r FROM (
        |  select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1)
        | ) t
        |)
        |WHERE r > 10
        |""".stripMargin
    util.verifyRelPlan(sqlQuery)
  }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. The code and commit msg has been updated. Could you please check it again? Thanks~

@zhaorongsheng zhaorongsheng changed the title [FLINK-35804][table-planner] Fix incorrect calc merge to avoid wrong plans about udtf+join+udf Jul 12, 2024
Copy link
Contributor

@lincoln-lil lincoln-lil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Contributor

@lincoln-lil lincoln-lil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhaorongsheng Thanks for the updating! Just one minor comment before merging.

@lincoln-lil lincoln-lil merged commit a021082 into apache:master Jul 14, 2024
@lincoln-lil
Copy link
Contributor

@zhaorongsheng Could you also cherry pick this patch into release-1.20 branch?

@lincoln-lil
Copy link
Contributor

@zhaorongsheng Could you also cherry pick this patch into release-1.20 branch?

I've created the backport pr #25084 to catch up 1.20's rc1 building.

@zhaorongsheng
Copy link
Contributor Author

@zhaorongsheng Could you also cherry pick this patch into release-1.20 branch?

I've created the backport pr #25084 to catch up 1.20's rc1 building.

OK!

snuyanzin pushed a commit to snuyanzin/flink that referenced this pull request Jul 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment