spark get minimum value in column that satisfies a condition











up vote
0
down vote

favorite












I have a DataFrame in spark that looks like this:



id |  flag
----------
0 | true
1 | true
2 | false
3 | true
4 | true
5 | true
6 | false
7 | false
8 | true
9 | false


I want to get another Column with the current rowNumber if it has flag == false, or the rowNumber of the next false value, so the output would be like this:



id |  flag | nextOrCurrentFalse
-------------------------------
0 | true | 2
1 | true | 2
2 | false | 2
3 | true | 6
4 | true | 6
5 | true | 6
6 | false | 6
7 | false | 7
8 | true | 9
9 | false | 9


I want to do this in a vectorized way (not iterating by row). So I effectively want the logic to be:




  • For each row, get the min id greater than or equal to the current rowNum which has a flag == false










share|improve this question
























  • I always wonder about the performance of such things. How many entries do you have? I presume you mean vectorized way as in not in SQL. I would not attempt SQL here.
    – thebluephantom
    Nov 19 at 21:12










  • In addition, with partitioning aspects I suspect some errors would result.
    – thebluephantom
    Nov 19 at 21:14










  • no actually, an SQL solution would be perfect. I just don't want one that relies on iterating through the dataset (which is probably way more efficient in this case, but I don't want to do that because I want to extend this to other use cases where I can't do that).
    – user3685285
    Nov 19 at 21:14










  • Tell me how many rows?
    – thebluephantom
    Nov 19 at 21:14






  • 1




    I am using this to check how good Catalyst and such really is in the next few weeks - at scale of course.
    – thebluephantom
    Dec 1 at 14:04















up vote
0
down vote

favorite












I have a DataFrame in spark that looks like this:



id |  flag
----------
0 | true
1 | true
2 | false
3 | true
4 | true
5 | true
6 | false
7 | false
8 | true
9 | false


I want to get another Column with the current rowNumber if it has flag == false, or the rowNumber of the next false value, so the output would be like this:



id |  flag | nextOrCurrentFalse
-------------------------------
0 | true | 2
1 | true | 2
2 | false | 2
3 | true | 6
4 | true | 6
5 | true | 6
6 | false | 6
7 | false | 7
8 | true | 9
9 | false | 9


I want to do this in a vectorized way (not iterating by row). So I effectively want the logic to be:




  • For each row, get the min id greater than or equal to the current rowNum which has a flag == false










share|improve this question
























  • I always wonder about the performance of such things. How many entries do you have? I presume you mean vectorized way as in not in SQL. I would not attempt SQL here.
    – thebluephantom
    Nov 19 at 21:12










  • In addition, with partitioning aspects I suspect some errors would result.
    – thebluephantom
    Nov 19 at 21:14










  • no actually, an SQL solution would be perfect. I just don't want one that relies on iterating through the dataset (which is probably way more efficient in this case, but I don't want to do that because I want to extend this to other use cases where I can't do that).
    – user3685285
    Nov 19 at 21:14










  • Tell me how many rows?
    – thebluephantom
    Nov 19 at 21:14






  • 1




    I am using this to check how good Catalyst and such really is in the next few weeks - at scale of course.
    – thebluephantom
    Dec 1 at 14:04













up vote
0
down vote

favorite









up vote
0
down vote

favorite











I have a DataFrame in spark that looks like this:



id |  flag
----------
0 | true
1 | true
2 | false
3 | true
4 | true
5 | true
6 | false
7 | false
8 | true
9 | false


I want to get another Column with the current rowNumber if it has flag == false, or the rowNumber of the next false value, so the output would be like this:



id |  flag | nextOrCurrentFalse
-------------------------------
0 | true | 2
1 | true | 2
2 | false | 2
3 | true | 6
4 | true | 6
5 | true | 6
6 | false | 6
7 | false | 7
8 | true | 9
9 | false | 9


I want to do this in a vectorized way (not iterating by row). So I effectively want the logic to be:




  • For each row, get the min id greater than or equal to the current rowNum which has a flag == false










share|improve this question















I have a DataFrame in spark that looks like this:



id |  flag
----------
0 | true
1 | true
2 | false
3 | true
4 | true
5 | true
6 | false
7 | false
8 | true
9 | false


I want to get another Column with the current rowNumber if it has flag == false, or the rowNumber of the next false value, so the output would be like this:



id |  flag | nextOrCurrentFalse
-------------------------------
0 | true | 2
1 | true | 2
2 | false | 2
3 | true | 6
4 | true | 6
5 | true | 6
6 | false | 6
7 | false | 7
8 | true | 9
9 | false | 9


I want to do this in a vectorized way (not iterating by row). So I effectively want the logic to be:




  • For each row, get the min id greater than or equal to the current rowNum which has a flag == false







scala apache-spark dataframe






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 19 at 21:48

























asked Nov 19 at 19:48









user3685285

1,54731737




1,54731737












  • I always wonder about the performance of such things. How many entries do you have? I presume you mean vectorized way as in not in SQL. I would not attempt SQL here.
    – thebluephantom
    Nov 19 at 21:12










  • In addition, with partitioning aspects I suspect some errors would result.
    – thebluephantom
    Nov 19 at 21:14










  • no actually, an SQL solution would be perfect. I just don't want one that relies on iterating through the dataset (which is probably way more efficient in this case, but I don't want to do that because I want to extend this to other use cases where I can't do that).
    – user3685285
    Nov 19 at 21:14










  • Tell me how many rows?
    – thebluephantom
    Nov 19 at 21:14






  • 1




    I am using this to check how good Catalyst and such really is in the next few weeks - at scale of course.
    – thebluephantom
    Dec 1 at 14:04


















  • I always wonder about the performance of such things. How many entries do you have? I presume you mean vectorized way as in not in SQL. I would not attempt SQL here.
    – thebluephantom
    Nov 19 at 21:12










  • In addition, with partitioning aspects I suspect some errors would result.
    – thebluephantom
    Nov 19 at 21:14










  • no actually, an SQL solution would be perfect. I just don't want one that relies on iterating through the dataset (which is probably way more efficient in this case, but I don't want to do that because I want to extend this to other use cases where I can't do that).
    – user3685285
    Nov 19 at 21:14










  • Tell me how many rows?
    – thebluephantom
    Nov 19 at 21:14






  • 1




    I am using this to check how good Catalyst and such really is in the next few weeks - at scale of course.
    – thebluephantom
    Dec 1 at 14:04
















I always wonder about the performance of such things. How many entries do you have? I presume you mean vectorized way as in not in SQL. I would not attempt SQL here.
– thebluephantom
Nov 19 at 21:12




I always wonder about the performance of such things. How many entries do you have? I presume you mean vectorized way as in not in SQL. I would not attempt SQL here.
– thebluephantom
Nov 19 at 21:12












In addition, with partitioning aspects I suspect some errors would result.
– thebluephantom
Nov 19 at 21:14




In addition, with partitioning aspects I suspect some errors would result.
– thebluephantom
Nov 19 at 21:14












no actually, an SQL solution would be perfect. I just don't want one that relies on iterating through the dataset (which is probably way more efficient in this case, but I don't want to do that because I want to extend this to other use cases where I can't do that).
– user3685285
Nov 19 at 21:14




no actually, an SQL solution would be perfect. I just don't want one that relies on iterating through the dataset (which is probably way more efficient in this case, but I don't want to do that because I want to extend this to other use cases where I can't do that).
– user3685285
Nov 19 at 21:14












Tell me how many rows?
– thebluephantom
Nov 19 at 21:14




Tell me how many rows?
– thebluephantom
Nov 19 at 21:14




1




1




I am using this to check how good Catalyst and such really is in the next few weeks - at scale of course.
– thebluephantom
Dec 1 at 14:04




I am using this to check how good Catalyst and such really is in the next few weeks - at scale of course.
– thebluephantom
Dec 1 at 14:04












3 Answers
3






active

oldest

votes

















up vote
2
down vote



accepted










Having thought about scaling and such - but not clear whether Catalyst is good enough - I propose a solution that builds on one of the answers that could benefit from partitioning and has far less work to do - simply by thinking about the data. It's about pre-computation and processing, the point that some massaging can beat brute force approaches. Your point on JOIN is less of an issue as this is a bounded JOIN now and no massive generation of data.



Your comment on dataframe approach is slightly jaded in that all that has surpassed here are dataframes. I think you mean that you want to loop through a Data Frame and have a sub loop with an exit. I can find no such example and in fact I am not sure it fits the SPARK paradigm. Same results gotten, with less processing:



import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.Window

val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
@transient val w1 = org.apache.spark.sql.expressions.Window.orderBy("id1")

val ids = df.where("flag = false")
.select($"id".as("id1"))

val ids2 = ids.select($"*", lag("id1",1,-1).over(w1).alias("prev_id"))
val ids3 = ids2.withColumn("prev_id1", col("prev_id")+1).drop("prev_id")

// Less and better performance at scale, this is better theoretically for Catalyst to bound partitions? Less work to do in any event.
// Some understanding of data required! And no grouping and min.
val withNextFalse = df.join(ids3, df("id") >= ids3("prev_id1") && df("id") <= ids3("id1"))
.select($"id", $"flag", $"id1".alias("nextOrCurrentFalse"))
.orderBy(asc("id"),asc("id"))

withNextFalse.show(false)


returns also:



+---+-----+------------------+
|id |flag |nextOrCurrentFalse|
+---+-----+------------------+
|0 |true |2 |
|1 |true |2 |
|2 |false|2 |
|3 |true |6 |
|4 |true |6 |
|5 |true |6 |
|6 |false|6 |
|7 |false|7 |
|8 |true |9 |
|9 |false|9 |
+---+-----+------------------+





share|improve this answer






























    up vote
    2
    down vote













    If flag is fairly sparse, you could do it like this:



    val ids = df.where("flag = false"). 
    select($"id".as("id1"))

    val withNextFalse = df.join(ids, df("id") <= ids("id1")).
    groupBy("id", "flag").
    agg("id1" -> "min")


    In the first step, we make a dataframe of the ids where the flag is false. Then, we join that dataframe to the original data on the desired condition (the original id should be less than or equal to the id of the row where flag is false).



    To get the first such case, group by id and use agg to find the minimum value of id1 (which is the id of a row with flag = false.



    Running on your example data (and sorting on id) gives the desired output:



    +---+-----+--------+
    | id| flag|min(id1)|
    +---+-----+--------+
    | 0| true| 2|
    | 1| true| 2|
    | 2|false| 2|
    | 3| true| 6|
    | 4| true| 6|
    | 5| true| 6|
    | 6|false| 6|
    | 7|false| 7|
    | 8| true| 9|
    | 9|false| 9|
    +---+-----+--------+


    This approach could run into performance trouble if the DataFrame is very large and has many rows where the flag is False. If that's the case, you may be better off with an iterative solution.






    share|improve this answer























    • Great answer! But you're right. This is a bit too resource heavy. Was hoping to find a way to do this without a join. This is something that is simple in MATLAB with smaller data sets. I really want to know how it would be done in spark with DataFrames.
      – user3685285
      Nov 19 at 21:24










    • @Jason Embellished your answer in more than one way
      – thebluephantom
      Nov 20 at 20:48










    • @thebluephantom - Great, thanks!
      – Jason
      Nov 20 at 21:23










    • It's an interesting problem that got a -2 and turned out to be easy to solve - that said I had to brood on it.
      – thebluephantom
      Nov 20 at 21:24


















    up vote
    0
    down vote













    See other answer which is better , but left this here so for SQL educational purposes - possibly.



    This does what you want, but I would be keen to know what others thinks of this at scale. I am going to check Catalyst and see how it works procedurally, but I think that may mean some misses at partition bounaries, I am keen to check that as well.



    import org.apache.spark.sql.functions._
    val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
    df.createOrReplaceTempView("tf")

    // Performance? Need to check at some stage how partitioning works in such a case.
    spark.sql("CACHE TABLE tf")
    val res1 = spark.sql("""
    SELECT tf1.*, tf2.id as id2, tf2.flag as flag2
    FROM tf tf1, tf tf2
    WHERE tf2.id >= tf1.id
    AND tf2.flag = false
    """)

    //res1.show(false)
    res1.createOrReplaceTempView("res1")
    spark.sql("CACHE TABLE res1")

    val res2 = spark.sql(""" SELECT X.id, X.flag, X.id2
    FROM (SELECT *, RANK() OVER (PARTITION BY id ORDER BY id2 ASC) as rank_val
    FROM res1) X
    WHERE X.rank_val = 1
    ORDER BY id
    """)

    res2.show(false)





    share|improve this answer























    • Like the other solution better actually, but the performance issue is noted as well.
      – thebluephantom
      Nov 19 at 21:40











    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53381647%2fspark-get-minimum-value-in-column-that-satisfies-a-condition%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    3 Answers
    3






    active

    oldest

    votes








    3 Answers
    3






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes








    up vote
    2
    down vote



    accepted










    Having thought about scaling and such - but not clear whether Catalyst is good enough - I propose a solution that builds on one of the answers that could benefit from partitioning and has far less work to do - simply by thinking about the data. It's about pre-computation and processing, the point that some massaging can beat brute force approaches. Your point on JOIN is less of an issue as this is a bounded JOIN now and no massive generation of data.



    Your comment on dataframe approach is slightly jaded in that all that has surpassed here are dataframes. I think you mean that you want to loop through a Data Frame and have a sub loop with an exit. I can find no such example and in fact I am not sure it fits the SPARK paradigm. Same results gotten, with less processing:



    import org.apache.spark.sql.functions._
    import spark.implicits._
    import org.apache.spark.sql.expressions.Window

    val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
    @transient val w1 = org.apache.spark.sql.expressions.Window.orderBy("id1")

    val ids = df.where("flag = false")
    .select($"id".as("id1"))

    val ids2 = ids.select($"*", lag("id1",1,-1).over(w1).alias("prev_id"))
    val ids3 = ids2.withColumn("prev_id1", col("prev_id")+1).drop("prev_id")

    // Less and better performance at scale, this is better theoretically for Catalyst to bound partitions? Less work to do in any event.
    // Some understanding of data required! And no grouping and min.
    val withNextFalse = df.join(ids3, df("id") >= ids3("prev_id1") && df("id") <= ids3("id1"))
    .select($"id", $"flag", $"id1".alias("nextOrCurrentFalse"))
    .orderBy(asc("id"),asc("id"))

    withNextFalse.show(false)


    returns also:



    +---+-----+------------------+
    |id |flag |nextOrCurrentFalse|
    +---+-----+------------------+
    |0 |true |2 |
    |1 |true |2 |
    |2 |false|2 |
    |3 |true |6 |
    |4 |true |6 |
    |5 |true |6 |
    |6 |false|6 |
    |7 |false|7 |
    |8 |true |9 |
    |9 |false|9 |
    +---+-----+------------------+





    share|improve this answer



























      up vote
      2
      down vote



      accepted










      Having thought about scaling and such - but not clear whether Catalyst is good enough - I propose a solution that builds on one of the answers that could benefit from partitioning and has far less work to do - simply by thinking about the data. It's about pre-computation and processing, the point that some massaging can beat brute force approaches. Your point on JOIN is less of an issue as this is a bounded JOIN now and no massive generation of data.



      Your comment on dataframe approach is slightly jaded in that all that has surpassed here are dataframes. I think you mean that you want to loop through a Data Frame and have a sub loop with an exit. I can find no such example and in fact I am not sure it fits the SPARK paradigm. Same results gotten, with less processing:



      import org.apache.spark.sql.functions._
      import spark.implicits._
      import org.apache.spark.sql.expressions.Window

      val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
      @transient val w1 = org.apache.spark.sql.expressions.Window.orderBy("id1")

      val ids = df.where("flag = false")
      .select($"id".as("id1"))

      val ids2 = ids.select($"*", lag("id1",1,-1).over(w1).alias("prev_id"))
      val ids3 = ids2.withColumn("prev_id1", col("prev_id")+1).drop("prev_id")

      // Less and better performance at scale, this is better theoretically for Catalyst to bound partitions? Less work to do in any event.
      // Some understanding of data required! And no grouping and min.
      val withNextFalse = df.join(ids3, df("id") >= ids3("prev_id1") && df("id") <= ids3("id1"))
      .select($"id", $"flag", $"id1".alias("nextOrCurrentFalse"))
      .orderBy(asc("id"),asc("id"))

      withNextFalse.show(false)


      returns also:



      +---+-----+------------------+
      |id |flag |nextOrCurrentFalse|
      +---+-----+------------------+
      |0 |true |2 |
      |1 |true |2 |
      |2 |false|2 |
      |3 |true |6 |
      |4 |true |6 |
      |5 |true |6 |
      |6 |false|6 |
      |7 |false|7 |
      |8 |true |9 |
      |9 |false|9 |
      +---+-----+------------------+





      share|improve this answer

























        up vote
        2
        down vote



        accepted







        up vote
        2
        down vote



        accepted






        Having thought about scaling and such - but not clear whether Catalyst is good enough - I propose a solution that builds on one of the answers that could benefit from partitioning and has far less work to do - simply by thinking about the data. It's about pre-computation and processing, the point that some massaging can beat brute force approaches. Your point on JOIN is less of an issue as this is a bounded JOIN now and no massive generation of data.



        Your comment on dataframe approach is slightly jaded in that all that has surpassed here are dataframes. I think you mean that you want to loop through a Data Frame and have a sub loop with an exit. I can find no such example and in fact I am not sure it fits the SPARK paradigm. Same results gotten, with less processing:



        import org.apache.spark.sql.functions._
        import spark.implicits._
        import org.apache.spark.sql.expressions.Window

        val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
        @transient val w1 = org.apache.spark.sql.expressions.Window.orderBy("id1")

        val ids = df.where("flag = false")
        .select($"id".as("id1"))

        val ids2 = ids.select($"*", lag("id1",1,-1).over(w1).alias("prev_id"))
        val ids3 = ids2.withColumn("prev_id1", col("prev_id")+1).drop("prev_id")

        // Less and better performance at scale, this is better theoretically for Catalyst to bound partitions? Less work to do in any event.
        // Some understanding of data required! And no grouping and min.
        val withNextFalse = df.join(ids3, df("id") >= ids3("prev_id1") && df("id") <= ids3("id1"))
        .select($"id", $"flag", $"id1".alias("nextOrCurrentFalse"))
        .orderBy(asc("id"),asc("id"))

        withNextFalse.show(false)


        returns also:



        +---+-----+------------------+
        |id |flag |nextOrCurrentFalse|
        +---+-----+------------------+
        |0 |true |2 |
        |1 |true |2 |
        |2 |false|2 |
        |3 |true |6 |
        |4 |true |6 |
        |5 |true |6 |
        |6 |false|6 |
        |7 |false|7 |
        |8 |true |9 |
        |9 |false|9 |
        +---+-----+------------------+





        share|improve this answer














        Having thought about scaling and such - but not clear whether Catalyst is good enough - I propose a solution that builds on one of the answers that could benefit from partitioning and has far less work to do - simply by thinking about the data. It's about pre-computation and processing, the point that some massaging can beat brute force approaches. Your point on JOIN is less of an issue as this is a bounded JOIN now and no massive generation of data.



        Your comment on dataframe approach is slightly jaded in that all that has surpassed here are dataframes. I think you mean that you want to loop through a Data Frame and have a sub loop with an exit. I can find no such example and in fact I am not sure it fits the SPARK paradigm. Same results gotten, with less processing:



        import org.apache.spark.sql.functions._
        import spark.implicits._
        import org.apache.spark.sql.expressions.Window

        val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
        @transient val w1 = org.apache.spark.sql.expressions.Window.orderBy("id1")

        val ids = df.where("flag = false")
        .select($"id".as("id1"))

        val ids2 = ids.select($"*", lag("id1",1,-1).over(w1).alias("prev_id"))
        val ids3 = ids2.withColumn("prev_id1", col("prev_id")+1).drop("prev_id")

        // Less and better performance at scale, this is better theoretically for Catalyst to bound partitions? Less work to do in any event.
        // Some understanding of data required! And no grouping and min.
        val withNextFalse = df.join(ids3, df("id") >= ids3("prev_id1") && df("id") <= ids3("id1"))
        .select($"id", $"flag", $"id1".alias("nextOrCurrentFalse"))
        .orderBy(asc("id"),asc("id"))

        withNextFalse.show(false)


        returns also:



        +---+-----+------------------+
        |id |flag |nextOrCurrentFalse|
        +---+-----+------------------+
        |0 |true |2 |
        |1 |true |2 |
        |2 |false|2 |
        |3 |true |6 |
        |4 |true |6 |
        |5 |true |6 |
        |6 |false|6 |
        |7 |false|7 |
        |8 |true |9 |
        |9 |false|9 |
        +---+-----+------------------+






        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited Nov 20 at 21:21

























        answered Nov 20 at 20:47









        thebluephantom

        2,3452925




        2,3452925
























            up vote
            2
            down vote













            If flag is fairly sparse, you could do it like this:



            val ids = df.where("flag = false"). 
            select($"id".as("id1"))

            val withNextFalse = df.join(ids, df("id") <= ids("id1")).
            groupBy("id", "flag").
            agg("id1" -> "min")


            In the first step, we make a dataframe of the ids where the flag is false. Then, we join that dataframe to the original data on the desired condition (the original id should be less than or equal to the id of the row where flag is false).



            To get the first such case, group by id and use agg to find the minimum value of id1 (which is the id of a row with flag = false.



            Running on your example data (and sorting on id) gives the desired output:



            +---+-----+--------+
            | id| flag|min(id1)|
            +---+-----+--------+
            | 0| true| 2|
            | 1| true| 2|
            | 2|false| 2|
            | 3| true| 6|
            | 4| true| 6|
            | 5| true| 6|
            | 6|false| 6|
            | 7|false| 7|
            | 8| true| 9|
            | 9|false| 9|
            +---+-----+--------+


            This approach could run into performance trouble if the DataFrame is very large and has many rows where the flag is False. If that's the case, you may be better off with an iterative solution.






            share|improve this answer























            • Great answer! But you're right. This is a bit too resource heavy. Was hoping to find a way to do this without a join. This is something that is simple in MATLAB with smaller data sets. I really want to know how it would be done in spark with DataFrames.
              – user3685285
              Nov 19 at 21:24










            • @Jason Embellished your answer in more than one way
              – thebluephantom
              Nov 20 at 20:48










            • @thebluephantom - Great, thanks!
              – Jason
              Nov 20 at 21:23










            • It's an interesting problem that got a -2 and turned out to be easy to solve - that said I had to brood on it.
              – thebluephantom
              Nov 20 at 21:24















            up vote
            2
            down vote













            If flag is fairly sparse, you could do it like this:



            val ids = df.where("flag = false"). 
            select($"id".as("id1"))

            val withNextFalse = df.join(ids, df("id") <= ids("id1")).
            groupBy("id", "flag").
            agg("id1" -> "min")


            In the first step, we make a dataframe of the ids where the flag is false. Then, we join that dataframe to the original data on the desired condition (the original id should be less than or equal to the id of the row where flag is false).



            To get the first such case, group by id and use agg to find the minimum value of id1 (which is the id of a row with flag = false.



            Running on your example data (and sorting on id) gives the desired output:



            +---+-----+--------+
            | id| flag|min(id1)|
            +---+-----+--------+
            | 0| true| 2|
            | 1| true| 2|
            | 2|false| 2|
            | 3| true| 6|
            | 4| true| 6|
            | 5| true| 6|
            | 6|false| 6|
            | 7|false| 7|
            | 8| true| 9|
            | 9|false| 9|
            +---+-----+--------+


            This approach could run into performance trouble if the DataFrame is very large and has many rows where the flag is False. If that's the case, you may be better off with an iterative solution.






            share|improve this answer























            • Great answer! But you're right. This is a bit too resource heavy. Was hoping to find a way to do this without a join. This is something that is simple in MATLAB with smaller data sets. I really want to know how it would be done in spark with DataFrames.
              – user3685285
              Nov 19 at 21:24










            • @Jason Embellished your answer in more than one way
              – thebluephantom
              Nov 20 at 20:48










            • @thebluephantom - Great, thanks!
              – Jason
              Nov 20 at 21:23










            • It's an interesting problem that got a -2 and turned out to be easy to solve - that said I had to brood on it.
              – thebluephantom
              Nov 20 at 21:24













            up vote
            2
            down vote










            up vote
            2
            down vote









            If flag is fairly sparse, you could do it like this:



            val ids = df.where("flag = false"). 
            select($"id".as("id1"))

            val withNextFalse = df.join(ids, df("id") <= ids("id1")).
            groupBy("id", "flag").
            agg("id1" -> "min")


            In the first step, we make a dataframe of the ids where the flag is false. Then, we join that dataframe to the original data on the desired condition (the original id should be less than or equal to the id of the row where flag is false).



            To get the first such case, group by id and use agg to find the minimum value of id1 (which is the id of a row with flag = false.



            Running on your example data (and sorting on id) gives the desired output:



            +---+-----+--------+
            | id| flag|min(id1)|
            +---+-----+--------+
            | 0| true| 2|
            | 1| true| 2|
            | 2|false| 2|
            | 3| true| 6|
            | 4| true| 6|
            | 5| true| 6|
            | 6|false| 6|
            | 7|false| 7|
            | 8| true| 9|
            | 9|false| 9|
            +---+-----+--------+


            This approach could run into performance trouble if the DataFrame is very large and has many rows where the flag is False. If that's the case, you may be better off with an iterative solution.






            share|improve this answer














            If flag is fairly sparse, you could do it like this:



            val ids = df.where("flag = false"). 
            select($"id".as("id1"))

            val withNextFalse = df.join(ids, df("id") <= ids("id1")).
            groupBy("id", "flag").
            agg("id1" -> "min")


            In the first step, we make a dataframe of the ids where the flag is false. Then, we join that dataframe to the original data on the desired condition (the original id should be less than or equal to the id of the row where flag is false).



            To get the first such case, group by id and use agg to find the minimum value of id1 (which is the id of a row with flag = false.



            Running on your example data (and sorting on id) gives the desired output:



            +---+-----+--------+
            | id| flag|min(id1)|
            +---+-----+--------+
            | 0| true| 2|
            | 1| true| 2|
            | 2|false| 2|
            | 3| true| 6|
            | 4| true| 6|
            | 5| true| 6|
            | 6|false| 6|
            | 7|false| 7|
            | 8| true| 9|
            | 9|false| 9|
            +---+-----+--------+


            This approach could run into performance trouble if the DataFrame is very large and has many rows where the flag is False. If that's the case, you may be better off with an iterative solution.







            share|improve this answer














            share|improve this answer



            share|improve this answer








            edited Nov 20 at 18:54









            thebluephantom

            2,3452925




            2,3452925










            answered Nov 19 at 21:20









            Jason

            614




            614












            • Great answer! But you're right. This is a bit too resource heavy. Was hoping to find a way to do this without a join. This is something that is simple in MATLAB with smaller data sets. I really want to know how it would be done in spark with DataFrames.
              – user3685285
              Nov 19 at 21:24










            • @Jason Embellished your answer in more than one way
              – thebluephantom
              Nov 20 at 20:48










            • @thebluephantom - Great, thanks!
              – Jason
              Nov 20 at 21:23










            • It's an interesting problem that got a -2 and turned out to be easy to solve - that said I had to brood on it.
              – thebluephantom
              Nov 20 at 21:24


















            • Great answer! But you're right. This is a bit too resource heavy. Was hoping to find a way to do this without a join. This is something that is simple in MATLAB with smaller data sets. I really want to know how it would be done in spark with DataFrames.
              – user3685285
              Nov 19 at 21:24










            • @Jason Embellished your answer in more than one way
              – thebluephantom
              Nov 20 at 20:48










            • @thebluephantom - Great, thanks!
              – Jason
              Nov 20 at 21:23










            • It's an interesting problem that got a -2 and turned out to be easy to solve - that said I had to brood on it.
              – thebluephantom
              Nov 20 at 21:24
















            Great answer! But you're right. This is a bit too resource heavy. Was hoping to find a way to do this without a join. This is something that is simple in MATLAB with smaller data sets. I really want to know how it would be done in spark with DataFrames.
            – user3685285
            Nov 19 at 21:24




            Great answer! But you're right. This is a bit too resource heavy. Was hoping to find a way to do this without a join. This is something that is simple in MATLAB with smaller data sets. I really want to know how it would be done in spark with DataFrames.
            – user3685285
            Nov 19 at 21:24












            @Jason Embellished your answer in more than one way
            – thebluephantom
            Nov 20 at 20:48




            @Jason Embellished your answer in more than one way
            – thebluephantom
            Nov 20 at 20:48












            @thebluephantom - Great, thanks!
            – Jason
            Nov 20 at 21:23




            @thebluephantom - Great, thanks!
            – Jason
            Nov 20 at 21:23












            It's an interesting problem that got a -2 and turned out to be easy to solve - that said I had to brood on it.
            – thebluephantom
            Nov 20 at 21:24




            It's an interesting problem that got a -2 and turned out to be easy to solve - that said I had to brood on it.
            – thebluephantom
            Nov 20 at 21:24










            up vote
            0
            down vote













            See other answer which is better , but left this here so for SQL educational purposes - possibly.



            This does what you want, but I would be keen to know what others thinks of this at scale. I am going to check Catalyst and see how it works procedurally, but I think that may mean some misses at partition bounaries, I am keen to check that as well.



            import org.apache.spark.sql.functions._
            val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
            df.createOrReplaceTempView("tf")

            // Performance? Need to check at some stage how partitioning works in such a case.
            spark.sql("CACHE TABLE tf")
            val res1 = spark.sql("""
            SELECT tf1.*, tf2.id as id2, tf2.flag as flag2
            FROM tf tf1, tf tf2
            WHERE tf2.id >= tf1.id
            AND tf2.flag = false
            """)

            //res1.show(false)
            res1.createOrReplaceTempView("res1")
            spark.sql("CACHE TABLE res1")

            val res2 = spark.sql(""" SELECT X.id, X.flag, X.id2
            FROM (SELECT *, RANK() OVER (PARTITION BY id ORDER BY id2 ASC) as rank_val
            FROM res1) X
            WHERE X.rank_val = 1
            ORDER BY id
            """)

            res2.show(false)





            share|improve this answer























            • Like the other solution better actually, but the performance issue is noted as well.
              – thebluephantom
              Nov 19 at 21:40















            up vote
            0
            down vote













            See other answer which is better , but left this here so for SQL educational purposes - possibly.



            This does what you want, but I would be keen to know what others thinks of this at scale. I am going to check Catalyst and see how it works procedurally, but I think that may mean some misses at partition bounaries, I am keen to check that as well.



            import org.apache.spark.sql.functions._
            val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
            df.createOrReplaceTempView("tf")

            // Performance? Need to check at some stage how partitioning works in such a case.
            spark.sql("CACHE TABLE tf")
            val res1 = spark.sql("""
            SELECT tf1.*, tf2.id as id2, tf2.flag as flag2
            FROM tf tf1, tf tf2
            WHERE tf2.id >= tf1.id
            AND tf2.flag = false
            """)

            //res1.show(false)
            res1.createOrReplaceTempView("res1")
            spark.sql("CACHE TABLE res1")

            val res2 = spark.sql(""" SELECT X.id, X.flag, X.id2
            FROM (SELECT *, RANK() OVER (PARTITION BY id ORDER BY id2 ASC) as rank_val
            FROM res1) X
            WHERE X.rank_val = 1
            ORDER BY id
            """)

            res2.show(false)





            share|improve this answer























            • Like the other solution better actually, but the performance issue is noted as well.
              – thebluephantom
              Nov 19 at 21:40













            up vote
            0
            down vote










            up vote
            0
            down vote









            See other answer which is better , but left this here so for SQL educational purposes - possibly.



            This does what you want, but I would be keen to know what others thinks of this at scale. I am going to check Catalyst and see how it works procedurally, but I think that may mean some misses at partition bounaries, I am keen to check that as well.



            import org.apache.spark.sql.functions._
            val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
            df.createOrReplaceTempView("tf")

            // Performance? Need to check at some stage how partitioning works in such a case.
            spark.sql("CACHE TABLE tf")
            val res1 = spark.sql("""
            SELECT tf1.*, tf2.id as id2, tf2.flag as flag2
            FROM tf tf1, tf tf2
            WHERE tf2.id >= tf1.id
            AND tf2.flag = false
            """)

            //res1.show(false)
            res1.createOrReplaceTempView("res1")
            spark.sql("CACHE TABLE res1")

            val res2 = spark.sql(""" SELECT X.id, X.flag, X.id2
            FROM (SELECT *, RANK() OVER (PARTITION BY id ORDER BY id2 ASC) as rank_val
            FROM res1) X
            WHERE X.rank_val = 1
            ORDER BY id
            """)

            res2.show(false)





            share|improve this answer














            See other answer which is better , but left this here so for SQL educational purposes - possibly.



            This does what you want, but I would be keen to know what others thinks of this at scale. I am going to check Catalyst and see how it works procedurally, but I think that may mean some misses at partition bounaries, I am keen to check that as well.



            import org.apache.spark.sql.functions._
            val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
            df.createOrReplaceTempView("tf")

            // Performance? Need to check at some stage how partitioning works in such a case.
            spark.sql("CACHE TABLE tf")
            val res1 = spark.sql("""
            SELECT tf1.*, tf2.id as id2, tf2.flag as flag2
            FROM tf tf1, tf tf2
            WHERE tf2.id >= tf1.id
            AND tf2.flag = false
            """)

            //res1.show(false)
            res1.createOrReplaceTempView("res1")
            spark.sql("CACHE TABLE res1")

            val res2 = spark.sql(""" SELECT X.id, X.flag, X.id2
            FROM (SELECT *, RANK() OVER (PARTITION BY id ORDER BY id2 ASC) as rank_val
            FROM res1) X
            WHERE X.rank_val = 1
            ORDER BY id
            """)

            res2.show(false)






            share|improve this answer














            share|improve this answer



            share|improve this answer








            edited Nov 20 at 20:54

























            answered Nov 19 at 21:35









            thebluephantom

            2,3452925




            2,3452925












            • Like the other solution better actually, but the performance issue is noted as well.
              – thebluephantom
              Nov 19 at 21:40


















            • Like the other solution better actually, but the performance issue is noted as well.
              – thebluephantom
              Nov 19 at 21:40
















            Like the other solution better actually, but the performance issue is noted as well.
            – thebluephantom
            Nov 19 at 21:40




            Like the other solution better actually, but the performance issue is noted as well.
            – thebluephantom
            Nov 19 at 21:40


















            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.





            Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


            Please pay close attention to the following guidance:


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53381647%2fspark-get-minimum-value-in-column-that-satisfies-a-condition%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            "Incorrect syntax near the keyword 'ON'. (on update cascade, on delete cascade,)

            Alcedinidae

            Origin of the phrase “under your belt”?