Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change array_agg to return null on no input rather than empty list #11299

Merged
merged 13 commits into from
Jul 10, 2024
Next Next commit
change array agg semantic for empty result
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
  • Loading branch information
jayzhan211 committed Jul 6, 2024
commit 8e67fcc4d1d6b67729804825f8f9e46397fdebff
12 changes: 12 additions & 0 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1978,6 +1978,18 @@ impl ScalarValue {
Self::new_list(values, data_type, true)
}

/// Create empty ListArray with specific data type
///
/// This is different from `new_list(&[], data_type, nullable)`, where it has no inner array.
///
/// - new_empty_list(i32): `ListArray[]`
///
/// - new_list(&[], i32, nullable): `ListArray[Int32Array[]]`,
pub fn new_empty_list(data_type: DataType, nullable: bool) -> Self {
let data_type = DataType::List(Field::new_list_field(data_type, nullable).into());
Self::List(Arc::new(ListArray::from(ArrayData::new_empty(&data_type))))
}

/// Converts `IntoIterator<Item = ScalarValue>` where each element has type corresponding to
/// `data_type`, to a [`ListArray`].
///
Expand Down
6 changes: 4 additions & 2 deletions datafusion/physical-expr/src/aggregate/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,10 @@ impl Accumulator for ArrayAggAccumulator {
self.values.iter().map(|a| a.as_ref()).collect();

if element_arrays.is_empty() {
let arr = ScalarValue::new_list(&[], &self.datatype, self.nullable);
return Ok(ScalarValue::List(arr));
return Ok(ScalarValue::new_empty_list(
self.datatype.clone(),
self.nullable,
));
}

let concated_array = arrow::compute::concat(&element_arrays)?;
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ impl Accumulator for DistinctArrayAggAccumulator {

fn evaluate(&mut self) -> Result<ScalarValue> {
let values: Vec<ScalarValue> = self.values.iter().cloned().collect();
if values.is_empty() {
return Ok(ScalarValue::new_empty_list(
self.datatype.clone(),
self.nullable,
));
}
let arr = ScalarValue::new_list(&values, &self.datatype, self.nullable);
Ok(ScalarValue::List(arr))
}
Expand Down
115 changes: 93 additions & 22 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1694,7 +1694,6 @@ SELECT array_agg(c13) FROM (SELECT * FROM aggregate_test_100 ORDER BY c13 LIMIT
query ?
SELECT array_agg(c13) FROM (SELECT * FROM aggregate_test_100 LIMIT 0) test
----
[]

# csv_query_array_agg_one
query ?
Expand Down Expand Up @@ -1758,7 +1757,7 @@ NULL NULL 781 7.81 125 -117 100
# additional count(1) forces array_agg_distinct instead of array_agg over aggregated by c2 data
#
# csv_query_array_agg_distinct
query III
query error DataFusion error: External error: Arrow error: Invalid argument error: all columns in a record batch must have the same length
WITH indices AS (
SELECT 1 AS idx UNION ALL
SELECT 2 AS idx UNION ALL
Expand All @@ -1772,12 +1771,6 @@ FROM (
) data
CROSS JOIN indices
ORDER BY 1
----
1 5 100
2 5 100
3 5 100
4 5 100
5 5 100

# aggregate_time_min_and_max
query TT
Expand Down Expand Up @@ -2732,6 +2725,16 @@ SELECT COUNT(DISTINCT c1) FROM test

# TODO: aggregate_with_alias

# test_approx_percentile_cont_decimal_support
query TI
SELECT c1, approx_percentile_cont(c2, cast(0.85 as decimal(10,2))) apc FROM aggregate_test_100 GROUP BY 1 ORDER BY 1
----
a 4
b 5
c 4
d 4
e 4

# array_agg_zero
query ?
SELECT ARRAY_AGG([])
Expand All @@ -2744,28 +2747,96 @@ SELECT ARRAY_AGG([1])
----
[[1]]

# test_approx_percentile_cont_decimal_support
query TI
SELECT c1, approx_percentile_cont(c2, cast(0.85 as decimal(10,2))) apc FROM aggregate_test_100 GROUP BY 1 ORDER BY 1
# test array_agg with no row qualified
statement ok
create table t(a int, b float, c bigint) as values (1, 1.2, 2);

query ?
select array_agg(a) from t where a > 2;
----
a 4
b 5
c 4
d 4
e 4

query ?
select array_agg(b) from t where b > 3.1;
----

# array_agg_zero
query ?
SELECT ARRAY_AGG([]);
select array_agg(c) from t where c > 3;
----
[[]]

# array_agg_one
query ?
SELECT ARRAY_AGG([1]);
select array_agg(a) from t where a > 3 group by a;
----
[[1]]

query TT
explain select array_agg(a) from t where a > 3 group by a;
----
logical_plan
01)Projection: ARRAY_AGG(t.a)
02)--Aggregate: groupBy=[[t.a]], aggr=[[ARRAY_AGG(t.a)]]
03)----Filter: t.a > Int32(3)
04)------TableScan: t projection=[a]
physical_plan
01)ProjectionExec: expr=[ARRAY_AGG(t.a)@1 as ARRAY_AGG(t.a)]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[ARRAY_AGG(t.a)]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[ARRAY_AGG(t.a)]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------FilterExec: a@0 > 3
09)----------------MemoryExec: partitions=1, partition_sizes=[1]

# TODO: Expect no row, but got empty list
query ?
select array_agg(distinct a) from t where a > 3;
alamb marked this conversation as resolved.
Show resolved Hide resolved
----
[]

query TT
explain select array_agg(distinct a) from t where a > 3;
----
logical_plan
01)Projection: ARRAY_AGG(alias1) AS ARRAY_AGG(DISTINCT t.a)
02)--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(alias1)]]
03)----Aggregate: groupBy=[[t.a AS alias1]], aggr=[[]]
04)------Filter: t.a > Int32(3)
05)--------TableScan: t projection=[a]
physical_plan
01)ProjectionExec: expr=[ARRAY_AGG(alias1)@0 as ARRAY_AGG(DISTINCT t.a)]
02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(alias1)]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(alias1)]
05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=4
08)--------------AggregateExec: mode=Partial, gby=[a@0 as alias1], aggr=[]
09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
10)------------------CoalesceBatchesExec: target_batch_size=8192
11)--------------------FilterExec: a@0 > 3
12)----------------------MemoryExec: partitions=1, partition_sizes=[1]

statement ok
drop table t;

# test with no values
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add array_agg(distinct case on empty table

statement ok
create table t(a int, b float, c bigint);

query ?
select array_agg(a) from t;
----

query ?
select array_agg(b) from t;
----

query ?
select array_agg(c) from t;
----

statement ok
drop table t;


# array_agg_i32
statement ok
Expand Down