spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mike Hynes <91m...@gmail.com>
Subject Re: Stages with non-arithmetic numbering & Timing metrics in event logs
Date Wed, 10 Jun 2015 05:18:53 GMT
Ahhh---forgive my typo: what I mean is,
(t2 - t1) >= (t_ser + t_deser + t_exec)
is satisfied, empirically.

On 6/10/15, Mike Hynes <91mbbh@gmail.com> wrote:
> Hi Imran,
>
> Thank you for your email.
>
> In examing the condition (t2 - t1) < (t_ser + t_deser + t_exec), I
> have found it to be true, although I have not included the
> t_{wait_for_read} in this, since it is---so far as I can tell---been
> either zero or negligible compared to the task time.
>
> Thanks,
> Mike
>
> On 6/8/15, Imran Rashid <irashid@cloudera.com> wrote:
>> Hi Mike,
>>
>> all good questions, let me take a stab at answering them:
>>
>> 1. Event Logs + Stages:
>>
>> Its normal for stages to get skipped if they are shuffle map stages,
>> which
>> get read multiple times.  Eg., here's a little example program I wrote
>> earlier to demonstrate this: "d3" doesn't need to be re-shuffled since
>> each
>> time its read w/ the same partitioner.  So skipping stages in this way is
>> a
>> good thing:
>>
>> val partitioner = new org.apache.spark.HashPartitioner(10)
>> val d3 = sc.parallelize(1 to 100).map { x => (x % 10) ->
>> x}.partitionBy(partitioner)
>> (0 until 5).foreach { idx =>
>>   val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % 10) ->
>> x}.partitionBy(partitioner)
>>   println(idx + " ---> " + otherData.join(d3).count())
>> }
>>
>> If you run this, f you look in the UI you'd see that all jobs except for
>> the first one have one stage that is skipped.  You will also see this in
>> the log:
>>
>> 15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage: List(Stage
>> 12,
>> Stage 13)
>>
>> 15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage 13)
>>
>> Admittedly that is not very clear, but that is sort of indicating to you
>> that the DAGScheduler first created stage 12 as a necessary step, and
>> then
>> later on changed its mind by realizing that everything it needed for
>> stage
>> 12 already existed, so there was nothing to do.
>>
>>
>> 2. Extracting Event Log Information
>>
>> maybe you are interested in SparkListener ? Though unfortunately, I don't
>> know of a good blog post describing it, hopefully the docs are clear ...
>>
>> 3. Time Metrics in Spark Event Log
>>
>> This is a great question.  I *think* the only exception is that t_gc is
>> really overlapped with t_exec.  So I think you should really expect
>>
>> (t2 - t1) < (t_ser + t_deser + t_exec)
>>
>> I am not 100% sure about this, though.  I'd be curious if that was
>> constraint was ever violated.
>>
>>
>> As for your question on shuffle read vs. shuffle write time -- I wouldn't
>> necessarily expect the same stage to have times for both shuffle read &
>> shuffle write -- in the simplest case, you'll have shuffle write times in
>> one, and shuffle read times in the next one.  But even taking that into
>> account, there is a difference in the way they work & are measured.
>>  shuffle read operations are pipelined and the way we measure shuffle
>> read,
>> its just how much time is spent *waiting* for network transfer.  It could
>> be that there is no (measurable) wait time b/c the next blocks are
>> fetched
>> before they are needed.  Shuffle writes occur in the normal task
>> execution
>> thread, though, so we (try to) measure all of it.
>>
>>
>> On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes <91mbbh@gmail.com> wrote:
>>
>>> Hi Patrick and Akhil,
>>>
>>> Thank you both for your responses. This is a bit of an extended email,
>>> but I'd like to:
>>> 1. Answer your (Patrick) note about the "missing" stages since the IDs
>>> do (briefly) appear in the event logs
>>> 2. Ask for advice/experience with extracting information from the
>>> event logs in a columnar, delimiter-separated format.
>>> 3. Ask about the time metrics reported in the event logs; currently,
>>> the elapsed time for a task does not equal the sum of the times for
>>> its components
>>>
>>> 1. Event Logs + Stages:
>>> =========================
>>>
>>> As I said before, In the spark logs (the log4j configurable ones from
>>> the driver), I only see references to some stages, where the stage IDs
>>> are not arithmetically increasing. In the event logs, however, I will
>>> see reference to *every* stage, although not all stages will have
>>> tasks associated with them.
>>>
>>> For instance, to examine the actual stages that have tasks, you can
>>> see missing stages:
>>> # grep -E '"Event":"SparkListenerTaskEnd"' app.log \
>>> #               | grep -Eo '"Stage ID":[[:digit:]]+'  \
>>> #               | sort -n|uniq | head -n 5
>>> "Stage ID":0
>>> "Stage ID":1
>>> "Stage ID":10
>>> "Stage ID":11
>>> "Stage ID":110
>>>
>>> However, these "missing" stages *do* appear in the event logs as Stage
>>> IDs in the jobs submitted, i.e: for
>>> # grep -E '"Event":"SparkListenerJobStart"' app.log | grep -Eo 'Stage
>>> IDs":\[.*\]' | head -n 5
>>> Stage IDs":[0,1,2]
>>> Stage IDs":[5,3,4]
>>> Stage IDs":[6,7,8]
>>> Stage IDs":[9,10,11]
>>> Stage IDs":[12,13,14]
>>>
>>> I do not know if this amounts to a bug, since I am not familiar with
>>> the scheduler in detail. The stages have seemingly been created
>>> somewhere in the DAG, but then have no associated tasks and never
>>> appear again.
>>>
>>> 2. Extracting Event Log Information
>>> ====================================
>>> Currently we are running scalability tests, and are finding very poor
>>> scalability for certain block matrix algorithms. I would like to have
>>> finer detail about the communication time and bandwidth when data is
>>> transferred between nodes.
>>>
>>> I would really just like to have a file with nothing but task info in
>>> a format such as:
>>> timestamp (ms), task ID, hostname, execution time (ms), GC time (ms),
>>> ...
>>> 0010294, 1, slave-1, 503, 34, ...
>>> 0010392, 2, slave-2, 543, 32, ...
>>> and similarly for jobs/stages/rdd_memory/shuffle output/etc.
>>>
>>> I have extracted the relevant time fields from the spark event logs
>>> with a sed script, but I wonder if there is an even more expedient
>>> way. Unfortunately, I do not immediately see how to do this using the
>>> $SPARK_HOME/conf/metrics.properties file and haven't come across a
>>> blog/etc that describes this. Could anyone please comment on whether
>>> or not a metrics configuation for this already exists?
>>>
>>> 3. Time Metrics in Spark Event Log
>>> ==================================
>>> I am confused about the times reported for tasks in the event log.
>>> There are launch and finish timestamps given for each task (call them
>>> t1 and t2, respectively), as well as GC time (t_gc), execution time
>>> (t_exec), and serialization times (t_ser, t_deser). However the times
>>> do not add up as I would have expected. I would imagine that the
>>> elapsed time t2 - t1 would be slightly larger than the sum of the
>>> component times. However, I can find many instances in the event logs
>>> where:
>>> (t2 - t1) < (t_gc + t_ser + t_deser + t_exec)
>>> The difference can be 500 ms or more, which is not negligible for my
>>> current execution times of ~5000 ms. I have attached a plot that
>>> illustrates this.
>>>
>>> Regarding this, I'd like to ask:
>>> 1. How exactly are these times are being measured?
>>> 2. Should the sum of the component times equal the elapsed (clock)
>>> time for the task?
>>> 3. If not, which component(s) is(are) being excluded, and when do they
>>> occur?
>>> 4. There are occasionally reported measurements for Shuffle Write
>>> time, but not shuffle read time. Is there a method to determine the
>>> time required to shuffle data? Could this be done by look at delays
>>> between the first task in a new stage and the last task in the
>>> previous stage?
>>>
>>> Thank you very much for your time,
>>> Mike
>>>
>>>
>>> On 6/7/15, Patrick Wendell <pwendell@gmail.com> wrote:
>>> > Hey Mike,
>>> >
>>> > Stage ID's are not guaranteed to be sequential because of the way the
>>> > DAG scheduler works (only increasing). In some cases stage ID numbers
>>> > are skipped when stages are generated.
>>> >
>>> > Any stage/ID that appears in the Spark UI is an actual stage, so if
>>> > you see ID's in there, but they are not in the logs, then let us know
>>> > (that would be a bug).
>>> >
>>> > - Patrick
>>> >
>>> > On Sun, Jun 7, 2015 at 9:06 AM, Akhil Das <akhil@sigmoidanalytics.com>
>>> > wrote:
>>> >> Are you seeing the same behavior on the driver UI? (that running on
>>> >> port
>>> >> 4040), If you click on the stage id header you can sort the stages
>>> >> based
>>> >> on
>>> >> IDs.
>>> >>
>>> >> Thanks
>>> >> Best Regards
>>> >>
>>> >> On Fri, Jun 5, 2015 at 10:21 PM, Mike Hynes <91mbbh@gmail.com>
wrote:
>>> >>>
>>> >>> Hi folks,
>>> >>>
>>> >>> When I look at the output logs for an iterative Spark program, I
see
>>> >>> that the stage IDs are not arithmetically numbered---that is, there
>>> >>> are gaps between stages and I might find log information about Stage
>>> >>> 0, 1,2, 5, but not 3 or 4.
>>> >>>
>>> >>> As an example, the output from the Spark logs below shows what I
>>> >>> mean:
>>> >>>
>>> >>> # grep -rE "Stage [[:digit:]]+" spark_stderr  | grep finished
>>> >>> 12048:INFO:DAGScheduler:Stage 0 (mapPartitions at
>>> >>> blockMap.scala:1444)
>>> >>> finished in 7.820 s:
>>> >>> 15994:INFO:DAGScheduler:Stage 1 (map at blockMap.scala:1810)
>>> >>> finished
>>> >>> in 3.874 s:
>>> >>> 18291:INFO:DAGScheduler:Stage 2 (count at blockMap.scala:1179)
>>> >>> finished in 2.237 s:
>>> >>> 20121:INFO:DAGScheduler:Stage 4 (map at blockMap.scala:1817)
>>> >>> finished
>>> >>> in 1.749 s:
>>> >>> 21254:INFO:DAGScheduler:Stage 5 (count at blockMap.scala:1180)
>>> >>> finished in 1.082 s:
>>> >>> 23422:INFO:DAGScheduler:Stage 7 (map at blockMap.scala:1810)
>>> >>> finished
>>> >>> in 2.078 s:
>>> >>> 24773:INFO:DAGScheduler:Stage 8 (count at blockMap.scala:1188)
>>> >>> finished in 1.317 s:
>>> >>> 26455:INFO:DAGScheduler:Stage 10 (map at blockMap.scala:1817)
>>> >>> finished
>>> >>> in 1.638 s:
>>> >>> 27228:INFO:DAGScheduler:Stage 11 (count at blockMap.scala:1189)
>>> >>> finished in 0.732 s:
>>> >>> 27494:INFO:DAGScheduler:Stage 14 (foreach at blockMap.scala:1302)
>>> >>> finished in 0.192 s:
>>> >>> 27709:INFO:DAGScheduler:Stage 17 (foreach at blockMap.scala:1302)
>>> >>> finished in 0.170 s:
>>> >>> 28018:INFO:DAGScheduler:Stage 20 (count at blockMap.scala:1201)
>>> >>> finished in 0.270 s:
>>> >>> 28611:INFO:DAGScheduler:Stage 23 (map at blockMap.scala:1355)
>>> >>> finished
>>> >>> in 0.455 s:
>>> >>> 29598:INFO:DAGScheduler:Stage 24 (count at blockMap.scala:274)
>>> >>> finished in 0.928 s:
>>> >>> 29954:INFO:DAGScheduler:Stage 27 (map at blockMap.scala:1355)
>>> >>> finished
>>> >>> in 0.305 s:
>>> >>> 30390:INFO:DAGScheduler:Stage 28 (count at blockMap.scala:275)
>>> >>> finished in 0.391 s:
>>> >>> 30452:INFO:DAGScheduler:Stage 32 (first at
>>> >>> MatrixFactorizationModel.scala:60) finished in 0.028 s:
>>> >>> 30506:INFO:DAGScheduler:Stage 36 (first at
>>> >>> MatrixFactorizationModel.scala:60) finished in 0.023 s:
>>> >>>
>>> >>> Can anyone comment on this being normal behavior? Is it indicative
>>> >>> of
>>> >>> faults causing stages to be resubmitted? I also cannot find the
>>> >>> missing stages in any stage's parent List(Stage x, Stage y, ...)
>>> >>>
>>> >>> Thanks,
>>> >>> Mike
>>> >>>
>>> >>>
>>> >>> On 6/1/15, Reynold Xin <rxin@databricks.com> wrote:
>>> >>> > Thanks, René. I actually added a warning to the new JDBC
>>> reader/writer
>>> >>> > interface for 1.4.0.
>>> >>> >
>>> >>> > Even with that, I think we should support throttling JDBC;
>>> >>> > otherwise
>>> >>> > it's
>>> >>> > too convenient for our users to DOS their production database
>>> servers!
>>> >>> >
>>> >>> >
>>> >>> >   /**
>>> >>> >    * Construct a [[DataFrame]] representing the database table
>>> >>> > accessible
>>> >>> > via JDBC URL
>>> >>> >    * url named table. Partitions of the table will be retrieved
in
>>> >>> > parallel
>>> >>> > based on the parameters
>>> >>> >    * passed to this function.
>>> >>> >    *
>>> >>> > *   * Don't create too many partitions in parallel on a large
>>> cluster;
>>> >>> > otherwise Spark might crash*
>>> >>> > *   * your external database systems.*
>>> >>> >    *
>>> >>> >    * @param url JDBC database url of the form
>>> >>> > `jdbc:subprotocol:subname`
>>> >>> >    * @param table Name of the table in the external database.
>>> >>> >    * @param columnName the name of a column of integral type
that
>>> will
>>> >>> > be
>>> >>> > used for partitioning.
>>> >>> >    * @param lowerBound the minimum value of `columnName` used
to
>>> >>> > decide
>>> >>> > partition stride
>>> >>> >    * @param upperBound the maximum value of `columnName` used
to
>>> >>> > decide
>>> >>> > partition stride
>>> >>> >    * @param numPartitions the number of partitions.  the range
>>> >>> > `minValue`-`maxValue` will be split
>>> >>> >    *                      evenly into this many partitions
>>> >>> >    * @param connectionProperties JDBC database connection
>>> >>> > arguments,
>>> a
>>> >>> > list
>>> >>> > of arbitrary string
>>> >>> >    *                             tag/value. Normally at least
a
>>> "user"
>>> >>> > and
>>> >>> > "password" property
>>> >>> >    *                             should be included.
>>> >>> >    *
>>> >>> >    * @since 1.4.0
>>> >>> >    */
>>> >>> >
>>> >>> >
>>> >>> > On Mon, Jun 1, 2015 at 1:54 AM, René Treffer <rtreffer@gmail.com>
>>> >>> > wrote:
>>> >>> >
>>> >>> >> Hi,
>>> >>> >>
>>> >>> >> I'm using sqlContext.jdbc(uri, table, where).map(_ =>
>>> >>> >> 1).aggregate(0)(_+_,_+_) on an interactive shell (where
"where"
>>> >>> >> is
>>> an
>>> >>> >> Array[String] of 32 to 48 elements).  (The code is tailored
to
>>> >>> >> your
>>> >>> >> db,
>>> >>> >> specifically through the where conditions, I'd have otherwise
>>> >>> >> post
>>> >>> >> it)
>>> >>> >> That should be the DataFrame API, but I'm just trying to
load
>>> >>> >> everything
>>> >>> >> and discard it as soon as possible :-)
>>> >>> >>
>>> >>> >> (1) Never do a silent drop of the values by default: it
kills
>>> >>> >> confidence.
>>> >>> >> An option sounds reasonable.  Some sort of insight / log
would be
>>> >>> >> great.
>>> >>> >> (How many columns of what type were truncated? why?)
>>> >>> >> Note that I could declare the field as string via JdbcDialects
>>> (thank
>>> >>> >> you
>>> >>> >> guys for merging that :-) ).
>>> >>> >> I have quite bad experiences with silent drops / truncates
of
>>> columns
>>> >>> >> and
>>> >>> >> thus _like_ the strict way of spark. It causes trouble
but
>>> >>> >> noticing
>>> >>> >> later
>>> >>> >> that your data was corrupted during conversion is even
worse.
>>> >>> >>
>>> >>> >> (2) SPARK-8004 https://issues.apache.org/jira/browse/SPARK-8004
>>> >>> >>
>>> >>> >> (3) One option would be to make it safe to use, the other
option
>>> >>> >> would
>>> >>> >> be
>>> >>> >> to document the behavior (s.th. like "WARNING: this method
tries
>>> >>> >> to
>>> >>> >> load
>>> >>> >> as many partitions as possible, make sure your database
can
>>> >>> >> handle
>>> >>> >> the
>>> >>> >> load
>>> >>> >> or load them in chunks and use union"). SPARK-8008
>>> >>> >> https://issues.apache.org/jira/browse/SPARK-8008
>>> >>> >>
>>> >>> >> Regards,
>>> >>> >>   Rene Treffer
>>> >>> >>
>>> >>> >
>>> >>>
>>> >>>
>>> >>> --
>>> >>> Thanks,
>>> >>> Mike
>>> >>>
>>> >>> ---------------------------------------------------------------------
>>> >>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>>> >>> For additional commands, e-mail: dev-help@spark.apache.org
>>> >>>
>>> >>
>>> >
>>>
>>>
>>> --
>>> Thanks,
>>> Mike
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: dev-help@spark.apache.org
>>>
>>
>
>
> --
> Thanks,
> Mike
>


-- 
Thanks,
Mike

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Mime
View raw message