Skip to content

Commit

Permalink
Revert join types to be compatible with datafusion. (#27)
Browse files Browse the repository at this point in the history
* Revert "Use dolomite join type (#25)"
  • Loading branch information
liurenjie1024 committed Jun 8, 2023
1 parent dc68de1 commit cf884ae
Show file tree
Hide file tree
Showing 16 changed files with 25 additions and 95 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ Cargo.lock
**/*.rs.bk

.idea
.DS_Store
8 changes: 3 additions & 5 deletions datafusion-dolomite-integration/src/conversion/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ fn plan_node_to_df_logical_plan(plan_node: &PlanNode) -> DolomiteResult<LogicalP
right: Arc::new(inputs.remove(0)),
on: expr_to_df_join_condition(join.expr())?,
filter: None,
join_type: join.join_type().try_into()?,
join_type: join.join_type(),
join_constraint: JoinConstraint::On,
schema: Arc::new(plan_node.logical_prop().unwrap().schema().clone()),
null_equals_null: true,
Expand Down Expand Up @@ -126,10 +126,8 @@ fn df_logical_plan_to_plan_node(
})
.reduce(and)
.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true))));
let operator = LogicalOperator::LogicalJoin(Join::new(
join.join_type.try_into()?,
join_cond,
));
let operator =
LogicalOperator::LogicalJoin(Join::new(join.join_type, join_cond));
let inputs = vec![
df_logical_plan_to_plan_node(&join.left, id_gen)?,
df_logical_plan_to_plan_node(&join.right, id_gen)?,
Expand Down
2 changes: 1 addition & 1 deletion datafusion-dolomite-integration/src/conversion/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn plan_node_to_df_physical_plan<'a>(
physical_right,
join_on,
None,
&hash_join.join_type().try_into()?,
&hash_join.join_type(),
PartitionMode::CollectLeft,
&true,
)?) as Arc<dyn ExecutionPlan>)
Expand Down
4 changes: 2 additions & 2 deletions dolomite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ enum_dispatch = "0.3"
log = "^0.4"
enumset = "1"
itertools = "0.10"
enum-as-inner = "0.4"
enum-as-inner = "0.5.1"
derive_more = "0.99"
petgraph = "0.6"
anyhow = {version = "1", features = ["backtrace"] }
smallvec = "1"
prettytable-rs = "^0.8"
prettytable-rs = "0.10"
lazy_static = "1"
strum = "0.24"
strum_macros = "0.24"
Expand Down
4 changes: 2 additions & 2 deletions dolomite/src/cascades/binding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ mod tests {
LogicalJoin, LogicalLimit, LogicalProjection, LogicalScan,
};
use crate::operator::Operator::Logical;
use crate::operator::{Join, JoinType, Limit, Projection, TableScan};
use crate::operator::{Join, Limit, Projection, TableScan};
use crate::optimizer::OptimizerContext;
use crate::plan::{LogicalPlanBuilder, Plan};
use crate::properties::PhysicalPropertySet;
Expand All @@ -141,7 +141,7 @@ mod tests {
use crate::utils::TreeBuilder;
use datafusion::logical_expr::binary_expr;
use datafusion::logical_expr::Operator::Eq;
use datafusion::prelude::col;
use datafusion::prelude::{col, JoinType};

fn create_optimizer(plan: Plan) -> CascadesOptimizer {
CascadesOptimizer {
Expand Down
5 changes: 3 additions & 2 deletions dolomite/src/cascades/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,15 +852,16 @@ pub(super) struct WinnerInput {
mod tests {

use datafusion::logical_expr::{binary_expr, col};
use datafusion::prelude::JoinType;

use crate::cascades::memo::Memo;
use crate::operator::LogicalOperator::{
LogicalJoin, LogicalLimit, LogicalProjection, LogicalScan,
};
use crate::operator::Operator::Logical;
use crate::operator::{
Join, JoinType, Limit as LimitOp, Projection as ProjectionOp,
TableScan as TableScanOp, TableScan,
Join, Limit as LimitOp, Projection as ProjectionOp, TableScan as TableScanOp,
TableScan,
};
use crate::plan::LogicalPlanBuilder;
use datafusion::logical_plan::Operator::Eq;
Expand Down
2 changes: 1 addition & 1 deletion dolomite/src/cascades/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ mod tests {
use crate::cascades::CascadesOptimizer;

use crate::cost::CostModel;
use crate::operator::JoinType;
use crate::optimizer::{Optimizer, OptimizerContext};
use crate::plan::{LogicalPlanBuilder, PhysicalPlanBuilder};
use crate::properties::PhysicalPropertySet;
use crate::rules::{CommutateJoinRule, Join2HashJoinRule, Scan2TableScanRule};
use datafusion::logical_expr::{binary_expr, col};
use datafusion::logical_plan::JoinType;
use datafusion::logical_plan::Operator::Eq;

#[test]
Expand Down
3 changes: 2 additions & 1 deletion dolomite/src/cascades/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,13 +622,14 @@ mod tests {
use crate::cascades::task::{ApplyRule, Task};
use crate::cascades::{CascadesOptimizer, GroupId};
use crate::cost::INF;
use crate::operator::Join;
use crate::operator::LogicalOperator::LogicalJoin;
use crate::operator::Operator::Logical;
use crate::operator::{Join, JoinType};
use crate::plan::LogicalPlanBuilder;
use crate::properties::PhysicalPropertySet;
use crate::rules::CommutateJoinRule;
use datafusion::logical_expr::binary_expr;
use datafusion::logical_plan::JoinType;
use datafusion::logical_plan::Operator::Eq;
use datafusion::prelude::col;

Expand Down
1 change: 0 additions & 1 deletion dolomite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
//! international conference on Management of data (pp. 337-348).
//! 4. Columnbia Project, https://github.com/yongwen/columbia

#[macro_use]
extern crate prettytable;
#[macro_use]
extern crate lazy_static;
Expand Down
67 changes: 1 addition & 66 deletions dolomite/src/operator/join.rs
Original file line number Diff line number Diff line change
@@ -1,79 +1,14 @@
use anyhow::bail;
use datafusion::prelude::JoinType;
use std::fmt::Formatter;

use crate::error::DolomiteResult;
use crate::operator::JoinDirection::{Left, Right};
use crate::operator::JoinType::{Anti, FullOuter, Inner, Outer, Semi};
use crate::operator::{
DerivePropContext, DerivePropResult, DisplayFields, OperatorTrait,
PhysicalOperatorTrait,
};
use crate::optimizer::{OptExpr, OptGroup, Optimizer};
use crate::properties::{LogicalProperty, PhysicalPropertySet};
use crate::Expr;
use datafusion::prelude::JoinType as DFJoinType;

/// See [`JoinType`].
#[derive(Debug, Hash, PartialEq, Eq, Copy, Clone)]
pub enum JoinDirection {
Left,
Right,
}

#[derive(Debug, Hash, PartialEq, Eq, Copy, Clone)]
pub enum JoinType {
Inner,
FullOuter,
/// Left or right outer join.
Outer(JoinDirection),
/// Left or right anti join.
Anti(JoinDirection),
/// Left or right semi join.
Semi(JoinDirection),
/// Left or right marker join.
///
/// Outputs one boolean column to indicate whether joined. For more details please refer to
/// [The Complete Story of Joins (in HyPer)] (https://www.btw2017.informatik.uni-stuttgart.de/slidesandpapers/F1-10-37/paper_web.pdf)
Marker(JoinDirection),
/// Left or right single join.
/// Output exactly one joined row. For more details please refer to
/// [The Complete Story of Joins (in HyPer)] (https://www.btw2017.informatik.uni-stuttgart.de/slidesandpapers/F1-10-37/paper_web.pdf)
Single(JoinDirection),
}

impl TryFrom<DFJoinType> for JoinType {
type Error = anyhow::Error;

fn try_from(df_join_type: DFJoinType) -> Result<Self, Self::Error> {
Ok(match df_join_type {
DFJoinType::Inner => Inner,
DFJoinType::Left => Outer(Left),
DFJoinType::Right => Outer(Right),
DFJoinType::Full => FullOuter,
DFJoinType::Anti => Anti(Left),
DFJoinType::Semi => Semi(Left),
})
}
}

impl TryFrom<JoinType> for DFJoinType {
type Error = anyhow::Error;

fn try_from(join_type: JoinType) -> Result<Self, Self::Error> {
match join_type {
Inner => Ok(DFJoinType::Inner),
FullOuter => Ok(DFJoinType::Full),
Outer(Left) => Ok(DFJoinType::Left),
Outer(Right) => Ok(DFJoinType::Right),
Anti(Left) => Ok(DFJoinType::Anti),
Semi(Left) => Ok(DFJoinType::Semi),
t => bail!(
"Unable to convert dolomite join type {:?} to datafusion join type",
t
),
}
}
}

/// Logical join operator.
#[derive(Clone, Debug, Hash, PartialEq)]
Expand Down
2 changes: 1 addition & 1 deletion dolomite/src/plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ pub fn explain_to_string(plan: &Plan) -> std::io::Result<String> {

#[cfg(test)]
mod tests {
use crate::operator::JoinType;
use crate::plan::explain::explain_to_string;
use crate::plan::{LogicalPlanBuilder, PhysicalPlanBuilder};
use datafusion::logical_expr::{binary_expr, col};
use datafusion::logical_plan::Operator::Eq;
use datafusion::prelude::JoinType;

#[test]
fn test_explain_logical_plan() {
Expand Down
4 changes: 2 additions & 2 deletions dolomite/src/plan/logical.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::operator::LogicalOperator::{LogicalJoin, LogicalProjection, LogicalScan};
use crate::operator::Operator::Logical;
use crate::operator::{Join, JoinType, Limit, LogicalOperator, Projection, TableScan};
use crate::operator::{Join, Limit, LogicalOperator, Projection, TableScan};
use crate::plan::{Plan, PlanNode, PlanNodeId, PlanNodeRef};
use datafusion::prelude::Expr;
use datafusion::prelude::{Expr, JoinType};
use std::sync::Arc;

pub struct LogicalPlanBuilder {
Expand Down
3 changes: 2 additions & 1 deletion dolomite/src/plan/physical.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::operator::Operator::Physical;
use crate::operator::PhysicalOperator::{PhysicalHashJoin, PhysicalTableScan};
use crate::operator::{Join, JoinType, TableScan};
use crate::operator::{Join, TableScan};
use crate::plan::{Plan, PlanNode, PlanNodeId, PlanNodeRef};
use datafusion::logical_plan::JoinType;
use datafusion::prelude::Expr;
use std::sync::Arc;

Expand Down
10 changes: 2 additions & 8 deletions dolomite/src/properties/distribution.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::properties::DistributionSpec::Random;
use datafusion::prelude::Column;

use crate::properties::PhysicalProp;

#[derive(Hash, Debug, Clone, Eq, PartialEq)]
#[derive(Hash, Debug, Clone, Eq, PartialEq, Default)]
pub enum DistributionSpec {
/// The data set is not partitioned and has only one partition.
Singleton,
/// The data set is partitioned according to hash values of columns.
Hashed(Vec<Column>),
/// The data set has several partitions, but the partitioning doesn't following any rule.
#[default]
Random,
}

Expand All @@ -18,9 +18,3 @@ impl PhysicalProp for DistributionSpec {
true
}
}

impl Default for DistributionSpec {
fn default() -> Self {
Random
}
}
2 changes: 1 addition & 1 deletion dolomite/src/properties/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use datafusion::logical_plan::DFSchema;

#[derive(Clone, PartialEq, Eq, Debug)]
#[derive(Clone, PartialEq, Debug)]
pub struct LogicalProperty {
schema: Arc<DFSchema>,
}
Expand Down
2 changes: 1 addition & 1 deletion dolomite/src/rules/join.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::bail;
use datafusion::prelude::JoinType;

use crate::error::DolomiteResult;
use crate::operator::JoinType;
use crate::operator::LogicalOperator::LogicalJoin;
use crate::operator::Operator;
use crate::operator::Operator::{Logical, Physical};
Expand Down

0 comments on commit cf884ae

Please sign in to comment.