spark get minimum value in column that satisfies a condition
up vote
0
down vote
favorite
I have a DataFrame in spark that looks like this:
id | flag
----------
0 | true
1 | true
2 | false
3 | true
4 | true
5 | true
6 | false
7 | false
8 | true
9 | false
I want to get another Column with the current rowNumber if it has flag == false
, or the rowNumber of the next false value, so the output would be like this:
id | flag | nextOrCurrentFalse
-------------------------------
0 | true | 2
1 | true | 2
2 | false | 2
3 | true | 6
4 | true | 6
5 | true | 6
6 | false | 6
7 | false | 7
8 | true | 9
9 | false | 9
I want to do this in a vectorized way (not iterating by row). So I effectively want the logic to be:
- For each row, get the min id greater than or equal to the current rowNum which has a flag == false
scala apache-spark dataframe
|
show 11 more comments
up vote
0
down vote
favorite
I have a DataFrame in spark that looks like this:
id | flag
----------
0 | true
1 | true
2 | false
3 | true
4 | true
5 | true
6 | false
7 | false
8 | true
9 | false
I want to get another Column with the current rowNumber if it has flag == false
, or the rowNumber of the next false value, so the output would be like this:
id | flag | nextOrCurrentFalse
-------------------------------
0 | true | 2
1 | true | 2
2 | false | 2
3 | true | 6
4 | true | 6
5 | true | 6
6 | false | 6
7 | false | 7
8 | true | 9
9 | false | 9
I want to do this in a vectorized way (not iterating by row). So I effectively want the logic to be:
- For each row, get the min id greater than or equal to the current rowNum which has a flag == false
scala apache-spark dataframe
I always wonder about the performance of such things. How many entries do you have? I presume you mean vectorized way as in not in SQL. I would not attempt SQL here.
– thebluephantom
Nov 19 at 21:12
In addition, with partitioning aspects I suspect some errors would result.
– thebluephantom
Nov 19 at 21:14
no actually, an SQL solution would be perfect. I just don't want one that relies on iterating through the dataset (which is probably way more efficient in this case, but I don't want to do that because I want to extend this to other use cases where I can't do that).
– user3685285
Nov 19 at 21:14
Tell me how many rows?
– thebluephantom
Nov 19 at 21:14
1
I am using this to check how good Catalyst and such really is in the next few weeks - at scale of course.
– thebluephantom
Dec 1 at 14:04
|
show 11 more comments
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I have a DataFrame in spark that looks like this:
id | flag
----------
0 | true
1 | true
2 | false
3 | true
4 | true
5 | true
6 | false
7 | false
8 | true
9 | false
I want to get another Column with the current rowNumber if it has flag == false
, or the rowNumber of the next false value, so the output would be like this:
id | flag | nextOrCurrentFalse
-------------------------------
0 | true | 2
1 | true | 2
2 | false | 2
3 | true | 6
4 | true | 6
5 | true | 6
6 | false | 6
7 | false | 7
8 | true | 9
9 | false | 9
I want to do this in a vectorized way (not iterating by row). So I effectively want the logic to be:
- For each row, get the min id greater than or equal to the current rowNum which has a flag == false
scala apache-spark dataframe
I have a DataFrame in spark that looks like this:
id | flag
----------
0 | true
1 | true
2 | false
3 | true
4 | true
5 | true
6 | false
7 | false
8 | true
9 | false
I want to get another Column with the current rowNumber if it has flag == false
, or the rowNumber of the next false value, so the output would be like this:
id | flag | nextOrCurrentFalse
-------------------------------
0 | true | 2
1 | true | 2
2 | false | 2
3 | true | 6
4 | true | 6
5 | true | 6
6 | false | 6
7 | false | 7
8 | true | 9
9 | false | 9
I want to do this in a vectorized way (not iterating by row). So I effectively want the logic to be:
- For each row, get the min id greater than or equal to the current rowNum which has a flag == false
scala apache-spark dataframe
scala apache-spark dataframe
edited Nov 19 at 21:48
asked Nov 19 at 19:48
user3685285
1,54731737
1,54731737
I always wonder about the performance of such things. How many entries do you have? I presume you mean vectorized way as in not in SQL. I would not attempt SQL here.
– thebluephantom
Nov 19 at 21:12
In addition, with partitioning aspects I suspect some errors would result.
– thebluephantom
Nov 19 at 21:14
no actually, an SQL solution would be perfect. I just don't want one that relies on iterating through the dataset (which is probably way more efficient in this case, but I don't want to do that because I want to extend this to other use cases where I can't do that).
– user3685285
Nov 19 at 21:14
Tell me how many rows?
– thebluephantom
Nov 19 at 21:14
1
I am using this to check how good Catalyst and such really is in the next few weeks - at scale of course.
– thebluephantom
Dec 1 at 14:04
|
show 11 more comments
I always wonder about the performance of such things. How many entries do you have? I presume you mean vectorized way as in not in SQL. I would not attempt SQL here.
– thebluephantom
Nov 19 at 21:12
In addition, with partitioning aspects I suspect some errors would result.
– thebluephantom
Nov 19 at 21:14
no actually, an SQL solution would be perfect. I just don't want one that relies on iterating through the dataset (which is probably way more efficient in this case, but I don't want to do that because I want to extend this to other use cases where I can't do that).
– user3685285
Nov 19 at 21:14
Tell me how many rows?
– thebluephantom
Nov 19 at 21:14
1
I am using this to check how good Catalyst and such really is in the next few weeks - at scale of course.
– thebluephantom
Dec 1 at 14:04
I always wonder about the performance of such things. How many entries do you have? I presume you mean vectorized way as in not in SQL. I would not attempt SQL here.
– thebluephantom
Nov 19 at 21:12
I always wonder about the performance of such things. How many entries do you have? I presume you mean vectorized way as in not in SQL. I would not attempt SQL here.
– thebluephantom
Nov 19 at 21:12
In addition, with partitioning aspects I suspect some errors would result.
– thebluephantom
Nov 19 at 21:14
In addition, with partitioning aspects I suspect some errors would result.
– thebluephantom
Nov 19 at 21:14
no actually, an SQL solution would be perfect. I just don't want one that relies on iterating through the dataset (which is probably way more efficient in this case, but I don't want to do that because I want to extend this to other use cases where I can't do that).
– user3685285
Nov 19 at 21:14
no actually, an SQL solution would be perfect. I just don't want one that relies on iterating through the dataset (which is probably way more efficient in this case, but I don't want to do that because I want to extend this to other use cases where I can't do that).
– user3685285
Nov 19 at 21:14
Tell me how many rows?
– thebluephantom
Nov 19 at 21:14
Tell me how many rows?
– thebluephantom
Nov 19 at 21:14
1
1
I am using this to check how good Catalyst and such really is in the next few weeks - at scale of course.
– thebluephantom
Dec 1 at 14:04
I am using this to check how good Catalyst and such really is in the next few weeks - at scale of course.
– thebluephantom
Dec 1 at 14:04
|
show 11 more comments
3 Answers
3
active
oldest
votes
up vote
2
down vote
accepted
Having thought about scaling and such - but not clear whether Catalyst is good enough - I propose a solution that builds on one of the answers that could benefit from partitioning and has far less work to do - simply by thinking about the data. It's about pre-computation and processing, the point that some massaging can beat brute force approaches. Your point on JOIN is less of an issue as this is a bounded JOIN now and no massive generation of data.
Your comment on dataframe approach is slightly jaded in that all that has surpassed here are dataframes. I think you mean that you want to loop through a Data Frame and have a sub loop with an exit. I can find no such example and in fact I am not sure it fits the SPARK paradigm. Same results gotten, with less processing:
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.Window
val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
@transient val w1 = org.apache.spark.sql.expressions.Window.orderBy("id1")
val ids = df.where("flag = false")
.select($"id".as("id1"))
val ids2 = ids.select($"*", lag("id1",1,-1).over(w1).alias("prev_id"))
val ids3 = ids2.withColumn("prev_id1", col("prev_id")+1).drop("prev_id")
// Less and better performance at scale, this is better theoretically for Catalyst to bound partitions? Less work to do in any event.
// Some understanding of data required! And no grouping and min.
val withNextFalse = df.join(ids3, df("id") >= ids3("prev_id1") && df("id") <= ids3("id1"))
.select($"id", $"flag", $"id1".alias("nextOrCurrentFalse"))
.orderBy(asc("id"),asc("id"))
withNextFalse.show(false)
returns also:
+---+-----+------------------+
|id |flag |nextOrCurrentFalse|
+---+-----+------------------+
|0 |true |2 |
|1 |true |2 |
|2 |false|2 |
|3 |true |6 |
|4 |true |6 |
|5 |true |6 |
|6 |false|6 |
|7 |false|7 |
|8 |true |9 |
|9 |false|9 |
+---+-----+------------------+
add a comment |
up vote
2
down vote
If flag
is fairly sparse, you could do it like this:
val ids = df.where("flag = false").
select($"id".as("id1"))
val withNextFalse = df.join(ids, df("id") <= ids("id1")).
groupBy("id", "flag").
agg("id1" -> "min")
In the first step, we make a dataframe of the ids where the flag is false. Then, we join that dataframe to the original data on the desired condition (the original id should be less than or equal to the id of the row where flag is false).
To get the first such case, group by id and use agg
to find the minimum value of id1
(which is the id of a row with flag = false.
Running on your example data (and sorting on id) gives the desired output:
+---+-----+--------+
| id| flag|min(id1)|
+---+-----+--------+
| 0| true| 2|
| 1| true| 2|
| 2|false| 2|
| 3| true| 6|
| 4| true| 6|
| 5| true| 6|
| 6|false| 6|
| 7|false| 7|
| 8| true| 9|
| 9|false| 9|
+---+-----+--------+
This approach could run into performance trouble if the DataFrame is very large and has many rows where the flag is False. If that's the case, you may be better off with an iterative solution.
Great answer! But you're right. This is a bit too resource heavy. Was hoping to find a way to do this without a join. This is something that is simple in MATLAB with smaller data sets. I really want to know how it would be done in spark with DataFrames.
– user3685285
Nov 19 at 21:24
@Jason Embellished your answer in more than one way
– thebluephantom
Nov 20 at 20:48
@thebluephantom - Great, thanks!
– Jason
Nov 20 at 21:23
It's an interesting problem that got a -2 and turned out to be easy to solve - that said I had to brood on it.
– thebluephantom
Nov 20 at 21:24
add a comment |
up vote
0
down vote
See other answer which is better , but left this here so for SQL educational purposes - possibly.
This does what you want, but I would be keen to know what others thinks of this at scale. I am going to check Catalyst and see how it works procedurally, but I think that may mean some misses at partition bounaries, I am keen to check that as well.
import org.apache.spark.sql.functions._
val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
df.createOrReplaceTempView("tf")
// Performance? Need to check at some stage how partitioning works in such a case.
spark.sql("CACHE TABLE tf")
val res1 = spark.sql("""
SELECT tf1.*, tf2.id as id2, tf2.flag as flag2
FROM tf tf1, tf tf2
WHERE tf2.id >= tf1.id
AND tf2.flag = false
""")
//res1.show(false)
res1.createOrReplaceTempView("res1")
spark.sql("CACHE TABLE res1")
val res2 = spark.sql(""" SELECT X.id, X.flag, X.id2
FROM (SELECT *, RANK() OVER (PARTITION BY id ORDER BY id2 ASC) as rank_val
FROM res1) X
WHERE X.rank_val = 1
ORDER BY id
""")
res2.show(false)
Like the other solution better actually, but the performance issue is noted as well.
– thebluephantom
Nov 19 at 21:40
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53381647%2fspark-get-minimum-value-in-column-that-satisfies-a-condition%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
3 Answers
3
active
oldest
votes
3 Answers
3
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
2
down vote
accepted
Having thought about scaling and such - but not clear whether Catalyst is good enough - I propose a solution that builds on one of the answers that could benefit from partitioning and has far less work to do - simply by thinking about the data. It's about pre-computation and processing, the point that some massaging can beat brute force approaches. Your point on JOIN is less of an issue as this is a bounded JOIN now and no massive generation of data.
Your comment on dataframe approach is slightly jaded in that all that has surpassed here are dataframes. I think you mean that you want to loop through a Data Frame and have a sub loop with an exit. I can find no such example and in fact I am not sure it fits the SPARK paradigm. Same results gotten, with less processing:
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.Window
val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
@transient val w1 = org.apache.spark.sql.expressions.Window.orderBy("id1")
val ids = df.where("flag = false")
.select($"id".as("id1"))
val ids2 = ids.select($"*", lag("id1",1,-1).over(w1).alias("prev_id"))
val ids3 = ids2.withColumn("prev_id1", col("prev_id")+1).drop("prev_id")
// Less and better performance at scale, this is better theoretically for Catalyst to bound partitions? Less work to do in any event.
// Some understanding of data required! And no grouping and min.
val withNextFalse = df.join(ids3, df("id") >= ids3("prev_id1") && df("id") <= ids3("id1"))
.select($"id", $"flag", $"id1".alias("nextOrCurrentFalse"))
.orderBy(asc("id"),asc("id"))
withNextFalse.show(false)
returns also:
+---+-----+------------------+
|id |flag |nextOrCurrentFalse|
+---+-----+------------------+
|0 |true |2 |
|1 |true |2 |
|2 |false|2 |
|3 |true |6 |
|4 |true |6 |
|5 |true |6 |
|6 |false|6 |
|7 |false|7 |
|8 |true |9 |
|9 |false|9 |
+---+-----+------------------+
add a comment |
up vote
2
down vote
accepted
Having thought about scaling and such - but not clear whether Catalyst is good enough - I propose a solution that builds on one of the answers that could benefit from partitioning and has far less work to do - simply by thinking about the data. It's about pre-computation and processing, the point that some massaging can beat brute force approaches. Your point on JOIN is less of an issue as this is a bounded JOIN now and no massive generation of data.
Your comment on dataframe approach is slightly jaded in that all that has surpassed here are dataframes. I think you mean that you want to loop through a Data Frame and have a sub loop with an exit. I can find no such example and in fact I am not sure it fits the SPARK paradigm. Same results gotten, with less processing:
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.Window
val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
@transient val w1 = org.apache.spark.sql.expressions.Window.orderBy("id1")
val ids = df.where("flag = false")
.select($"id".as("id1"))
val ids2 = ids.select($"*", lag("id1",1,-1).over(w1).alias("prev_id"))
val ids3 = ids2.withColumn("prev_id1", col("prev_id")+1).drop("prev_id")
// Less and better performance at scale, this is better theoretically for Catalyst to bound partitions? Less work to do in any event.
// Some understanding of data required! And no grouping and min.
val withNextFalse = df.join(ids3, df("id") >= ids3("prev_id1") && df("id") <= ids3("id1"))
.select($"id", $"flag", $"id1".alias("nextOrCurrentFalse"))
.orderBy(asc("id"),asc("id"))
withNextFalse.show(false)
returns also:
+---+-----+------------------+
|id |flag |nextOrCurrentFalse|
+---+-----+------------------+
|0 |true |2 |
|1 |true |2 |
|2 |false|2 |
|3 |true |6 |
|4 |true |6 |
|5 |true |6 |
|6 |false|6 |
|7 |false|7 |
|8 |true |9 |
|9 |false|9 |
+---+-----+------------------+
add a comment |
up vote
2
down vote
accepted
up vote
2
down vote
accepted
Having thought about scaling and such - but not clear whether Catalyst is good enough - I propose a solution that builds on one of the answers that could benefit from partitioning and has far less work to do - simply by thinking about the data. It's about pre-computation and processing, the point that some massaging can beat brute force approaches. Your point on JOIN is less of an issue as this is a bounded JOIN now and no massive generation of data.
Your comment on dataframe approach is slightly jaded in that all that has surpassed here are dataframes. I think you mean that you want to loop through a Data Frame and have a sub loop with an exit. I can find no such example and in fact I am not sure it fits the SPARK paradigm. Same results gotten, with less processing:
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.Window
val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
@transient val w1 = org.apache.spark.sql.expressions.Window.orderBy("id1")
val ids = df.where("flag = false")
.select($"id".as("id1"))
val ids2 = ids.select($"*", lag("id1",1,-1).over(w1).alias("prev_id"))
val ids3 = ids2.withColumn("prev_id1", col("prev_id")+1).drop("prev_id")
// Less and better performance at scale, this is better theoretically for Catalyst to bound partitions? Less work to do in any event.
// Some understanding of data required! And no grouping and min.
val withNextFalse = df.join(ids3, df("id") >= ids3("prev_id1") && df("id") <= ids3("id1"))
.select($"id", $"flag", $"id1".alias("nextOrCurrentFalse"))
.orderBy(asc("id"),asc("id"))
withNextFalse.show(false)
returns also:
+---+-----+------------------+
|id |flag |nextOrCurrentFalse|
+---+-----+------------------+
|0 |true |2 |
|1 |true |2 |
|2 |false|2 |
|3 |true |6 |
|4 |true |6 |
|5 |true |6 |
|6 |false|6 |
|7 |false|7 |
|8 |true |9 |
|9 |false|9 |
+---+-----+------------------+
Having thought about scaling and such - but not clear whether Catalyst is good enough - I propose a solution that builds on one of the answers that could benefit from partitioning and has far less work to do - simply by thinking about the data. It's about pre-computation and processing, the point that some massaging can beat brute force approaches. Your point on JOIN is less of an issue as this is a bounded JOIN now and no massive generation of data.
Your comment on dataframe approach is slightly jaded in that all that has surpassed here are dataframes. I think you mean that you want to loop through a Data Frame and have a sub loop with an exit. I can find no such example and in fact I am not sure it fits the SPARK paradigm. Same results gotten, with less processing:
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.Window
val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
@transient val w1 = org.apache.spark.sql.expressions.Window.orderBy("id1")
val ids = df.where("flag = false")
.select($"id".as("id1"))
val ids2 = ids.select($"*", lag("id1",1,-1).over(w1).alias("prev_id"))
val ids3 = ids2.withColumn("prev_id1", col("prev_id")+1).drop("prev_id")
// Less and better performance at scale, this is better theoretically for Catalyst to bound partitions? Less work to do in any event.
// Some understanding of data required! And no grouping and min.
val withNextFalse = df.join(ids3, df("id") >= ids3("prev_id1") && df("id") <= ids3("id1"))
.select($"id", $"flag", $"id1".alias("nextOrCurrentFalse"))
.orderBy(asc("id"),asc("id"))
withNextFalse.show(false)
returns also:
+---+-----+------------------+
|id |flag |nextOrCurrentFalse|
+---+-----+------------------+
|0 |true |2 |
|1 |true |2 |
|2 |false|2 |
|3 |true |6 |
|4 |true |6 |
|5 |true |6 |
|6 |false|6 |
|7 |false|7 |
|8 |true |9 |
|9 |false|9 |
+---+-----+------------------+
edited Nov 20 at 21:21
answered Nov 20 at 20:47
thebluephantom
2,3452925
2,3452925
add a comment |
add a comment |
up vote
2
down vote
If flag
is fairly sparse, you could do it like this:
val ids = df.where("flag = false").
select($"id".as("id1"))
val withNextFalse = df.join(ids, df("id") <= ids("id1")).
groupBy("id", "flag").
agg("id1" -> "min")
In the first step, we make a dataframe of the ids where the flag is false. Then, we join that dataframe to the original data on the desired condition (the original id should be less than or equal to the id of the row where flag is false).
To get the first such case, group by id and use agg
to find the minimum value of id1
(which is the id of a row with flag = false.
Running on your example data (and sorting on id) gives the desired output:
+---+-----+--------+
| id| flag|min(id1)|
+---+-----+--------+
| 0| true| 2|
| 1| true| 2|
| 2|false| 2|
| 3| true| 6|
| 4| true| 6|
| 5| true| 6|
| 6|false| 6|
| 7|false| 7|
| 8| true| 9|
| 9|false| 9|
+---+-----+--------+
This approach could run into performance trouble if the DataFrame is very large and has many rows where the flag is False. If that's the case, you may be better off with an iterative solution.
Great answer! But you're right. This is a bit too resource heavy. Was hoping to find a way to do this without a join. This is something that is simple in MATLAB with smaller data sets. I really want to know how it would be done in spark with DataFrames.
– user3685285
Nov 19 at 21:24
@Jason Embellished your answer in more than one way
– thebluephantom
Nov 20 at 20:48
@thebluephantom - Great, thanks!
– Jason
Nov 20 at 21:23
It's an interesting problem that got a -2 and turned out to be easy to solve - that said I had to brood on it.
– thebluephantom
Nov 20 at 21:24
add a comment |
up vote
2
down vote
If flag
is fairly sparse, you could do it like this:
val ids = df.where("flag = false").
select($"id".as("id1"))
val withNextFalse = df.join(ids, df("id") <= ids("id1")).
groupBy("id", "flag").
agg("id1" -> "min")
In the first step, we make a dataframe of the ids where the flag is false. Then, we join that dataframe to the original data on the desired condition (the original id should be less than or equal to the id of the row where flag is false).
To get the first such case, group by id and use agg
to find the minimum value of id1
(which is the id of a row with flag = false.
Running on your example data (and sorting on id) gives the desired output:
+---+-----+--------+
| id| flag|min(id1)|
+---+-----+--------+
| 0| true| 2|
| 1| true| 2|
| 2|false| 2|
| 3| true| 6|
| 4| true| 6|
| 5| true| 6|
| 6|false| 6|
| 7|false| 7|
| 8| true| 9|
| 9|false| 9|
+---+-----+--------+
This approach could run into performance trouble if the DataFrame is very large and has many rows where the flag is False. If that's the case, you may be better off with an iterative solution.
Great answer! But you're right. This is a bit too resource heavy. Was hoping to find a way to do this without a join. This is something that is simple in MATLAB with smaller data sets. I really want to know how it would be done in spark with DataFrames.
– user3685285
Nov 19 at 21:24
@Jason Embellished your answer in more than one way
– thebluephantom
Nov 20 at 20:48
@thebluephantom - Great, thanks!
– Jason
Nov 20 at 21:23
It's an interesting problem that got a -2 and turned out to be easy to solve - that said I had to brood on it.
– thebluephantom
Nov 20 at 21:24
add a comment |
up vote
2
down vote
up vote
2
down vote
If flag
is fairly sparse, you could do it like this:
val ids = df.where("flag = false").
select($"id".as("id1"))
val withNextFalse = df.join(ids, df("id") <= ids("id1")).
groupBy("id", "flag").
agg("id1" -> "min")
In the first step, we make a dataframe of the ids where the flag is false. Then, we join that dataframe to the original data on the desired condition (the original id should be less than or equal to the id of the row where flag is false).
To get the first such case, group by id and use agg
to find the minimum value of id1
(which is the id of a row with flag = false.
Running on your example data (and sorting on id) gives the desired output:
+---+-----+--------+
| id| flag|min(id1)|
+---+-----+--------+
| 0| true| 2|
| 1| true| 2|
| 2|false| 2|
| 3| true| 6|
| 4| true| 6|
| 5| true| 6|
| 6|false| 6|
| 7|false| 7|
| 8| true| 9|
| 9|false| 9|
+---+-----+--------+
This approach could run into performance trouble if the DataFrame is very large and has many rows where the flag is False. If that's the case, you may be better off with an iterative solution.
If flag
is fairly sparse, you could do it like this:
val ids = df.where("flag = false").
select($"id".as("id1"))
val withNextFalse = df.join(ids, df("id") <= ids("id1")).
groupBy("id", "flag").
agg("id1" -> "min")
In the first step, we make a dataframe of the ids where the flag is false. Then, we join that dataframe to the original data on the desired condition (the original id should be less than or equal to the id of the row where flag is false).
To get the first such case, group by id and use agg
to find the minimum value of id1
(which is the id of a row with flag = false.
Running on your example data (and sorting on id) gives the desired output:
+---+-----+--------+
| id| flag|min(id1)|
+---+-----+--------+
| 0| true| 2|
| 1| true| 2|
| 2|false| 2|
| 3| true| 6|
| 4| true| 6|
| 5| true| 6|
| 6|false| 6|
| 7|false| 7|
| 8| true| 9|
| 9|false| 9|
+---+-----+--------+
This approach could run into performance trouble if the DataFrame is very large and has many rows where the flag is False. If that's the case, you may be better off with an iterative solution.
edited Nov 20 at 18:54
thebluephantom
2,3452925
2,3452925
answered Nov 19 at 21:20
Jason
614
614
Great answer! But you're right. This is a bit too resource heavy. Was hoping to find a way to do this without a join. This is something that is simple in MATLAB with smaller data sets. I really want to know how it would be done in spark with DataFrames.
– user3685285
Nov 19 at 21:24
@Jason Embellished your answer in more than one way
– thebluephantom
Nov 20 at 20:48
@thebluephantom - Great, thanks!
– Jason
Nov 20 at 21:23
It's an interesting problem that got a -2 and turned out to be easy to solve - that said I had to brood on it.
– thebluephantom
Nov 20 at 21:24
add a comment |
Great answer! But you're right. This is a bit too resource heavy. Was hoping to find a way to do this without a join. This is something that is simple in MATLAB with smaller data sets. I really want to know how it would be done in spark with DataFrames.
– user3685285
Nov 19 at 21:24
@Jason Embellished your answer in more than one way
– thebluephantom
Nov 20 at 20:48
@thebluephantom - Great, thanks!
– Jason
Nov 20 at 21:23
It's an interesting problem that got a -2 and turned out to be easy to solve - that said I had to brood on it.
– thebluephantom
Nov 20 at 21:24
Great answer! But you're right. This is a bit too resource heavy. Was hoping to find a way to do this without a join. This is something that is simple in MATLAB with smaller data sets. I really want to know how it would be done in spark with DataFrames.
– user3685285
Nov 19 at 21:24
Great answer! But you're right. This is a bit too resource heavy. Was hoping to find a way to do this without a join. This is something that is simple in MATLAB with smaller data sets. I really want to know how it would be done in spark with DataFrames.
– user3685285
Nov 19 at 21:24
@Jason Embellished your answer in more than one way
– thebluephantom
Nov 20 at 20:48
@Jason Embellished your answer in more than one way
– thebluephantom
Nov 20 at 20:48
@thebluephantom - Great, thanks!
– Jason
Nov 20 at 21:23
@thebluephantom - Great, thanks!
– Jason
Nov 20 at 21:23
It's an interesting problem that got a -2 and turned out to be easy to solve - that said I had to brood on it.
– thebluephantom
Nov 20 at 21:24
It's an interesting problem that got a -2 and turned out to be easy to solve - that said I had to brood on it.
– thebluephantom
Nov 20 at 21:24
add a comment |
up vote
0
down vote
See other answer which is better , but left this here so for SQL educational purposes - possibly.
This does what you want, but I would be keen to know what others thinks of this at scale. I am going to check Catalyst and see how it works procedurally, but I think that may mean some misses at partition bounaries, I am keen to check that as well.
import org.apache.spark.sql.functions._
val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
df.createOrReplaceTempView("tf")
// Performance? Need to check at some stage how partitioning works in such a case.
spark.sql("CACHE TABLE tf")
val res1 = spark.sql("""
SELECT tf1.*, tf2.id as id2, tf2.flag as flag2
FROM tf tf1, tf tf2
WHERE tf2.id >= tf1.id
AND tf2.flag = false
""")
//res1.show(false)
res1.createOrReplaceTempView("res1")
spark.sql("CACHE TABLE res1")
val res2 = spark.sql(""" SELECT X.id, X.flag, X.id2
FROM (SELECT *, RANK() OVER (PARTITION BY id ORDER BY id2 ASC) as rank_val
FROM res1) X
WHERE X.rank_val = 1
ORDER BY id
""")
res2.show(false)
Like the other solution better actually, but the performance issue is noted as well.
– thebluephantom
Nov 19 at 21:40
add a comment |
up vote
0
down vote
See other answer which is better , but left this here so for SQL educational purposes - possibly.
This does what you want, but I would be keen to know what others thinks of this at scale. I am going to check Catalyst and see how it works procedurally, but I think that may mean some misses at partition bounaries, I am keen to check that as well.
import org.apache.spark.sql.functions._
val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
df.createOrReplaceTempView("tf")
// Performance? Need to check at some stage how partitioning works in such a case.
spark.sql("CACHE TABLE tf")
val res1 = spark.sql("""
SELECT tf1.*, tf2.id as id2, tf2.flag as flag2
FROM tf tf1, tf tf2
WHERE tf2.id >= tf1.id
AND tf2.flag = false
""")
//res1.show(false)
res1.createOrReplaceTempView("res1")
spark.sql("CACHE TABLE res1")
val res2 = spark.sql(""" SELECT X.id, X.flag, X.id2
FROM (SELECT *, RANK() OVER (PARTITION BY id ORDER BY id2 ASC) as rank_val
FROM res1) X
WHERE X.rank_val = 1
ORDER BY id
""")
res2.show(false)
Like the other solution better actually, but the performance issue is noted as well.
– thebluephantom
Nov 19 at 21:40
add a comment |
up vote
0
down vote
up vote
0
down vote
See other answer which is better , but left this here so for SQL educational purposes - possibly.
This does what you want, but I would be keen to know what others thinks of this at scale. I am going to check Catalyst and see how it works procedurally, but I think that may mean some misses at partition bounaries, I am keen to check that as well.
import org.apache.spark.sql.functions._
val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
df.createOrReplaceTempView("tf")
// Performance? Need to check at some stage how partitioning works in such a case.
spark.sql("CACHE TABLE tf")
val res1 = spark.sql("""
SELECT tf1.*, tf2.id as id2, tf2.flag as flag2
FROM tf tf1, tf tf2
WHERE tf2.id >= tf1.id
AND tf2.flag = false
""")
//res1.show(false)
res1.createOrReplaceTempView("res1")
spark.sql("CACHE TABLE res1")
val res2 = spark.sql(""" SELECT X.id, X.flag, X.id2
FROM (SELECT *, RANK() OVER (PARTITION BY id ORDER BY id2 ASC) as rank_val
FROM res1) X
WHERE X.rank_val = 1
ORDER BY id
""")
res2.show(false)
See other answer which is better , but left this here so for SQL educational purposes - possibly.
This does what you want, but I would be keen to know what others thinks of this at scale. I am going to check Catalyst and see how it works procedurally, but I think that may mean some misses at partition bounaries, I am keen to check that as well.
import org.apache.spark.sql.functions._
val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
df.createOrReplaceTempView("tf")
// Performance? Need to check at some stage how partitioning works in such a case.
spark.sql("CACHE TABLE tf")
val res1 = spark.sql("""
SELECT tf1.*, tf2.id as id2, tf2.flag as flag2
FROM tf tf1, tf tf2
WHERE tf2.id >= tf1.id
AND tf2.flag = false
""")
//res1.show(false)
res1.createOrReplaceTempView("res1")
spark.sql("CACHE TABLE res1")
val res2 = spark.sql(""" SELECT X.id, X.flag, X.id2
FROM (SELECT *, RANK() OVER (PARTITION BY id ORDER BY id2 ASC) as rank_val
FROM res1) X
WHERE X.rank_val = 1
ORDER BY id
""")
res2.show(false)
edited Nov 20 at 20:54
answered Nov 19 at 21:35
thebluephantom
2,3452925
2,3452925
Like the other solution better actually, but the performance issue is noted as well.
– thebluephantom
Nov 19 at 21:40
add a comment |
Like the other solution better actually, but the performance issue is noted as well.
– thebluephantom
Nov 19 at 21:40
Like the other solution better actually, but the performance issue is noted as well.
– thebluephantom
Nov 19 at 21:40
Like the other solution better actually, but the performance issue is noted as well.
– thebluephantom
Nov 19 at 21:40
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53381647%2fspark-get-minimum-value-in-column-that-satisfies-a-condition%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
I always wonder about the performance of such things. How many entries do you have? I presume you mean vectorized way as in not in SQL. I would not attempt SQL here.
– thebluephantom
Nov 19 at 21:12
In addition, with partitioning aspects I suspect some errors would result.
– thebluephantom
Nov 19 at 21:14
no actually, an SQL solution would be perfect. I just don't want one that relies on iterating through the dataset (which is probably way more efficient in this case, but I don't want to do that because I want to extend this to other use cases where I can't do that).
– user3685285
Nov 19 at 21:14
Tell me how many rows?
– thebluephantom
Nov 19 at 21:14
1
I am using this to check how good Catalyst and such really is in the next few weeks - at scale of course.
– thebluephantom
Dec 1 at 14:04