-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12133][STREAMING] Streaming dynamic allocation #12154
Conversation
12133 |
@@ -43,7 +43,7 @@ import org.apache.spark.storage.StorageLevel | |||
import org.apache.spark.streaming.StreamingContextState._ | |||
import org.apache.spark.streaming.dstream._ | |||
import org.apache.spark.streaming.receiver.Receiver | |||
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} | |||
import org.apache.spark.streaming.scheduler.{ExecutorAllocationManager, JobScheduler, StreamingListener} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the ExecutorAllocationManager
import necessary? It doesn't seem to be referenced here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just pushed some more changes. Its now needed.
e4df62f
to
0c6d94b
Compare
Test build #54893 has finished for PR 12154 at commit
|
Test build #54905 has finished for PR 12154 at commit
|
Test build #2750 has finished for PR 12154 at commit
|
@@ -1360,6 +1360,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |||
listenerBus.addListener(listener) | |||
} | |||
|
|||
private[spark] override def getExecutorIds(): Seq[String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A newbie question: if a method has no side effect and return values, do code standard in spark suggest to remove parenthesis in method declaration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
@tdas Looks great. I think you could add more comments in the code but the rest is pretty good. |
|
||
val MIN_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.minExecutors" | ||
|
||
val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use these two configurations minExecutors
and maxExecutors
derived from Spark ExecutorAllocationManager
?
Basically is there any semantic difference for min and max executors between here and Spark's dynamic allocation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is very confusing if configs inside spark.streaming.dynamicAllocation.*
depends on configs in spark.dynamicAllocation.*
. Very non intuitive and defeats the whole purpose of having config names be scoped with .
s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdas Is there any particular reason, why initExecutors is not supported in streaming.dynamicAllocation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdas @andrewor14 I also have to ask: Any reason initExecutors
is not supported for streaming with dynamic allocation? I'm having issues with my application because it needs a minimum executors count to start behaving good with the Kinesis stream.
@andrewor14 Updated. Please take a look. |
Test build #2759 has started for PR 12154 at commit |
3b501a0
to
0598c85
Compare
LGTM |
logInfo(s"Requested total $targetTotalExecutors executors") | ||
} | ||
|
||
/** Kill a executors that is not running a receiver */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grammar
0598c85
to
ce36c76
Compare
Test build #55136 has finished for PR 12154 at commit
|
Test build #55140 has finished for PR 12154 at commit
|
Merged into master thanks. |
@tdas or @andrewor14 does this depend on any 2.0 APIs, I would like to backport this to 1.5 or 1.6 if possible. need to run multiple concurrent streaming jobs on mesos |
@jayv big new features like this are never backported into older branches. |
I understand that, but I want to port this feature to our internal custom 1.6 build, if it's not too much trouble. |
I see. I don't believe this depends on new APIs. You may have some difficulty just backporting into 1.6 in general for big patches, however. |
Add missing API to support backport of SPARK-12133 Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Jo Voordeckers <jo.voordeckers@gmail.com>
Is there a way to specify the Initial executors? |
@tdas - Why we cannot see this in the documentation and I am not sure if AWS EMR supports this feature? |
What changes were proposed in this pull request?
Added a new Executor Allocation Manager for the Streaming scheduler for doing Streaming Dynamic Allocation.
How was this patch tested
Unit tests, and cluster tests.