Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12133][STREAMING] Streaming dynamic allocation #12154

Closed
wants to merge 7 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Apr 4, 2016

What changes were proposed in this pull request?

Added a new Executor Allocation Manager for the Streaming scheduler for doing Streaming Dynamic Allocation.

How was this patch tested

Unit tests, and cluster tests.

@tdas tdas changed the title Streaming dynamic allocation Apr 4, 2016
@tdas
Copy link
Contributor Author

tdas commented Apr 4, 2016

@andrewor14
Copy link
Contributor

12133

@tdas tdas changed the title [SPARK-XXX][STREAMING] Streaming dynamic allocation Apr 4, 2016
@@ -43,7 +43,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.scheduler.{ExecutorAllocationManager, JobScheduler, StreamingListener}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the ExecutorAllocationManager import necessary? It doesn't seem to be referenced here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just pushed some more changes. Its now needed.

@tdas tdas force-pushed the streaming-dynamic-allocation branch from e4df62f to 0c6d94b Compare April 4, 2016 22:43
@SparkQA
Copy link

SparkQA commented Apr 4, 2016

Test build #54893 has finished for PR 12154 at commit 81ad1dd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA
Copy link

SparkQA commented Apr 5, 2016

Test build #54905 has finished for PR 12154 at commit 0c6d94b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA
Copy link

SparkQA commented Apr 5, 2016

Test build #2750 has finished for PR 12154 at commit 0c6d94b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@@ -1360,6 +1360,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
listenerBus.addListener(listener)
}

private[spark] override def getExecutorIds(): Seq[String] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A newbie question: if a method has no side effect and return values, do code standard in spark suggest to remove parenthesis in method declaration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

@andrewor14
Copy link
Contributor

@tdas Looks great. I think you could add more comments in the code but the rest is pretty good.


val MIN_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.minExecutors"

val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use these two configurations minExecutors and maxExecutors derived from Spark ExecutorAllocationManager?

Basically is there any semantic difference for min and max executors between here and Spark's dynamic allocation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is very confusing if configs inside spark.streaming.dynamicAllocation.* depends on configs in spark.dynamicAllocation.*. Very non intuitive and defeats the whole purpose of having config names be scoped with .s

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas Is there any particular reason, why initExecutors is not supported in streaming.dynamicAllocation?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas @andrewor14 I also have to ask: Any reason initExecutors is not supported for streaming with dynamic allocation? I'm having issues with my application because it needs a minimum executors count to start behaving good with the Kinesis stream.

@tdas
Copy link
Contributor Author

tdas commented Apr 6, 2016

@andrewor14 Updated. Please take a look.

@SparkQA
Copy link

SparkQA commented Apr 6, 2016

Test build #2759 has started for PR 12154 at commit 3b501a0.

@tdas tdas force-pushed the streaming-dynamic-allocation branch from 3b501a0 to 0598c85 Compare April 6, 2016 18:42
@andrewor14
Copy link
Contributor

LGTM

logInfo(s"Requested total $targetTotalExecutors executors")
}

/** Kill a executors that is not running a receiver */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grammar

@tdas tdas force-pushed the streaming-dynamic-allocation branch from 0598c85 to ce36c76 Compare April 6, 2016 19:31
@SparkQA
Copy link

SparkQA commented Apr 6, 2016

Test build #55136 has finished for PR 12154 at commit 0598c85.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA
Copy link

SparkQA commented Apr 6, 2016

Test build #55140 has finished for PR 12154 at commit ce36c76.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@andrewor14
Copy link
Contributor

Merged into master thanks.

@asfgit asfgit closed this in 9af5423 Apr 6, 2016
@jayv
Copy link
Contributor

jayv commented Apr 20, 2016

@tdas or @andrewor14 does this depend on any 2.0 APIs, I would like to backport this to 1.5 or 1.6 if possible.

need to run multiple concurrent streaming jobs on mesos

@andrewor14
Copy link
Contributor

@jayv big new features like this are never backported into older branches.

@jayv
Copy link
Contributor

jayv commented Apr 20, 2016

I understand that, but I want to port this feature to our internal custom 1.6 build, if it's not too much trouble.

@andrewor14
Copy link
Contributor

I see. I don't believe this depends on new APIs. You may have some difficulty just backporting into 1.6 in general for big patches, however.

jayv pushed a commit to jayv/spark that referenced this pull request Apr 21, 2016
Add missing API to support backport of SPARK-12133

Author:    Tathagata Das <tathagata.das1565@gmail.com>
Author:    Jo Voordeckers <jo.voordeckers@gmail.com>
@sansagara
Copy link

Is there a way to specify the Initial executors?

@sugix
Copy link

sugix commented Apr 21, 2018

@tdas - Why we cannot see this in the documentation and I am not sure if AWS EMR supports this feature?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
10 participants