Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change array_agg to return null on no input rather than empty list #11299

Merged
merged 13 commits into from
Jul 10, 2024
Prev Previous commit
Next Next commit
return null
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
  • Loading branch information
jayzhan211 committed Jul 6, 2024
commit 13aaf09c05e96e654a0a1ed891744008d4bc5b0e
14 changes: 6 additions & 8 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1978,16 +1978,14 @@ impl ScalarValue {
Self::new_list(values, data_type, true)
}

/// Create empty ListArray with specific data type
/// Create ListArray with Null with specific data type
///
/// This is different from `new_list(&[], data_type, nullable)`, where it has no inner array.
///
/// - new_empty_list(i32): `ListArray[]`
///
/// - new_list(&[], i32, nullable): `ListArray[Int32Array[]]`,
pub fn new_empty_list(data_type: DataType, nullable: bool) -> Self {
/// - new_null_list(i32, nullable, 1): `ListArray[NULL]`
pub fn new_null_list(data_type: DataType, nullable: bool, null_len: usize) -> Self {
let data_type = DataType::List(Field::new_list_field(data_type, nullable).into());
Self::List(Arc::new(ListArray::from(ArrayData::new_empty(&data_type))))
Self::List(Arc::new(ListArray::from(ArrayData::new_null(
&data_type, null_len,
))))
}

/// Converts `IntoIterator<Item = ScalarValue>` where each element has type corresponding to
Expand Down
7 changes: 4 additions & 3 deletions datafusion/physical-expr/src/aggregate/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl AggregateExpr for ArrayAgg {
&self.name,
// This should be the same as return type of AggregateFunction::ArrayAgg
Field::new("item", self.input_data_type.clone(), self.nullable),
false,
true,
))
}

Expand All @@ -86,7 +86,7 @@ impl AggregateExpr for ArrayAgg {
Ok(vec![Field::new_list(
format_state_name(&self.name, "array_agg"),
Field::new("item", self.input_data_type.clone(), self.nullable),
false,
true,
)])
}

Expand Down Expand Up @@ -167,9 +167,10 @@ impl Accumulator for ArrayAggAccumulator {
self.values.iter().map(|a| a.as_ref()).collect();

if element_arrays.is_empty() {
return Ok(ScalarValue::new_empty_list(
return Ok(ScalarValue::new_null_list(
self.datatype.clone(),
self.nullable,
1,
));
}

Expand Down
7 changes: 4 additions & 3 deletions datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl AggregateExpr for DistinctArrayAgg {
&self.name,
// This should be the same as return type of AggregateFunction::ArrayAgg
Field::new("item", self.input_data_type.clone(), self.nullable),
false,
true,
))
}

Expand All @@ -90,7 +90,7 @@ impl AggregateExpr for DistinctArrayAgg {
Ok(vec![Field::new_list(
format_state_name(&self.name, "distinct_array_agg"),
Field::new("item", self.input_data_type.clone(), self.nullable),
false,
true,
)])
}

Expand Down Expand Up @@ -166,9 +166,10 @@ impl Accumulator for DistinctArrayAggAccumulator {
fn evaluate(&mut self) -> Result<ScalarValue> {
let values: Vec<ScalarValue> = self.values.iter().cloned().collect();
if values.is_empty() {
return Ok(ScalarValue::new_empty_list(
return Ok(ScalarValue::new_null_list(
self.datatype.clone(),
self.nullable,
1,
));
}
let arr = ScalarValue::new_list(&values, &self.datatype, self.nullable);
Expand Down
90 changes: 33 additions & 57 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1694,6 +1694,7 @@ SELECT array_agg(c13) FROM (SELECT * FROM aggregate_test_100 ORDER BY c13 LIMIT
query ?
SELECT array_agg(c13) FROM (SELECT * FROM aggregate_test_100 LIMIT 0) test
----
NULL

# csv_query_array_agg_one
query ?
Expand Down Expand Up @@ -1752,25 +1753,12 @@ NULL 4 29 1.260869565217 123 -117 23
NULL 5 -194 -13.857142857143 118 -101 14
NULL NULL 781 7.81 125 -117 100

# TODO: array_agg_distinct output is non-deterministic -- rewrite with array_sort(list_sort)
# unnest is also not available, so manually unnesting via CROSS JOIN
# additional count(1) forces array_agg_distinct instead of array_agg over aggregated by c2 data
#
# select with count to forces array_agg_distinct function, since single distinct expression is converted to group by by optimizer
# csv_query_array_agg_distinct
query error DataFusion error: External error: Arrow error: Invalid argument error: all columns in a record batch must have the same length
WITH indices AS (
SELECT 1 AS idx UNION ALL
SELECT 2 AS idx UNION ALL
SELECT 3 AS idx UNION ALL
SELECT 4 AS idx UNION ALL
SELECT 5 AS idx
)
SELECT data.arr[indices.idx] as element, array_length(data.arr) as array_len, dummy
FROM (
SELECT array_agg(distinct c2) as arr, count(1) as dummy FROM aggregate_test_100
) data
CROSS JOIN indices
ORDER BY 1
query ?I
SELECT array_sort(array_agg(distinct c2)), count(1) FROM aggregate_test_100
----
[1, 2, 3, 4, 5] 100

# aggregate_time_min_and_max
query TT
Expand Down Expand Up @@ -2754,66 +2742,51 @@ create table t(a int, b float, c bigint) as values (1, 1.2, 2);
query ?
select array_agg(a) from t where a > 2;
----
NULL

query ?
select array_agg(b) from t where b > 3.1;
----
NULL

query ?
select array_agg(c) from t where c > 3;
----
NULL

query ?I
select array_agg(c), count(1) from t where c > 3;
----
NULL 0

# returns 0 rows if group by is applied
query ?
select array_agg(a) from t where a > 3 group by a;
----

query TT
explain select array_agg(a) from t where a > 3 group by a;
query ?I
select array_agg(a), count(1) from t where a > 3 group by a;
----
logical_plan
01)Projection: ARRAY_AGG(t.a)
02)--Aggregate: groupBy=[[t.a]], aggr=[[ARRAY_AGG(t.a)]]
03)----Filter: t.a > Int32(3)
04)------TableScan: t projection=[a]
physical_plan
01)ProjectionExec: expr=[ARRAY_AGG(t.a)@1 as ARRAY_AGG(t.a)]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[ARRAY_AGG(t.a)]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[ARRAY_AGG(t.a)]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------FilterExec: a@0 > 3
09)----------------MemoryExec: partitions=1, partition_sizes=[1]

# TODO: Expect no row, but got empty list
# TODO: Expect NULL, got empty list
query ?
select array_agg(distinct a) from t where a > 3;
alamb marked this conversation as resolved.
Show resolved Hide resolved
----
[]

query TT
explain select array_agg(distinct a) from t where a > 3;
query ?I
select array_agg(distinct a), count(1) from t where a > 3;
----
NULL 0

# returns 0 rows if group by is applied
query ?
select array_agg(distinct a) from t where a > 3 group by a;
----

query ?I
select array_agg(distinct a), count(1) from t where a > 3 group by a;
----
logical_plan
01)Projection: ARRAY_AGG(alias1) AS ARRAY_AGG(DISTINCT t.a)
02)--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(alias1)]]
03)----Aggregate: groupBy=[[t.a AS alias1]], aggr=[[]]
04)------Filter: t.a > Int32(3)
05)--------TableScan: t projection=[a]
physical_plan
01)ProjectionExec: expr=[ARRAY_AGG(alias1)@0 as ARRAY_AGG(DISTINCT t.a)]
02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(alias1)]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(alias1)]
05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=4
08)--------------AggregateExec: mode=Partial, gby=[a@0 as alias1], aggr=[]
09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
10)------------------CoalesceBatchesExec: target_batch_size=8192
11)--------------------FilterExec: a@0 > 3
12)----------------------MemoryExec: partitions=1, partition_sizes=[1]

statement ok
drop table t;
Expand All @@ -2825,14 +2798,17 @@ create table t(a int, b float, c bigint);
query ?
select array_agg(a) from t;
----
NULL

query ?
select array_agg(b) from t;
----
NULL

query ?
select array_agg(c) from t;
----
NULL

statement ok
drop table t;
Expand Down
Loading