Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] Support for Spark Connect (aka Delta Connect) #3240

Open
1 of 5 tasks
allisonport-db opened this issue Jun 7, 2024 · 0 comments
Open
1 of 5 tasks
Labels
enhancement New feature or request
Milestone

Comments

@allisonport-db
Copy link
Collaborator

Feature request

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Overview

Spark Connect is a new initiative in Apache Spark that adds a decoupled client-server infrastructure which allows Spark applications to connect remotely to a Spark server and run SQL / Dataframe operations. We want to develop what we're calling "Delta Connect" to allow Delta operations to be made in applications running in such client-server mode.

Further details

These are the CUJs we would like to support:

Server

The server is packaged into the io.delta:delta-spark-connect-server_2.13 package, installing this package automatically installs the io.delta:delta-spark-connect-common_2.13 package.

sbin/start-connect-server.sh \
  --packages org.apache.spark:spark-connect_2.13:4.0.0, io.delta:delta-spark-connect-server_2.13:4.0.0 \
  --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

Scala Client

The client is packaged into the io.delta:delta-spark-connect-client_2.13 package, installing this package automatically installs the io.delta:delta-spark-connect-common_2.13 package.

export SPARK_REMOTE="sc://localhost:15002"
spark-connect-repl --packages io.delta:delta-spark-connect-client_2.13:4.0.0

The delta-spark-connect-client_2.13 package uses the exact same class and package names as the delta-spark_2.13 package. Therefore the exact same code can be used as before.

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forName(spark, "my_table")

deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))

Python Client

The Delta Connect Python client is included in the same PyPi package as Delta Spark.

pip install pyspark==4.0.0
pip install delta-spark==4.0.0

There is no difference in usage compared to the classic way. We just need to pass in a remote SparkSession (instead of a local one) to the DeltaTable API.

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()

deltaTable = DeltaTable.forName(spark, "my_table")

deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
1 participant