Replacing a character in dataframe string columns in Scala
I have a task to remove all line delimiters (n) from all string columns in a table.
The number of table columns is unknown, the code should process any table.
I wrote the code that would go through all columns in a loop, retrieve a column data type and replace the line delimiter:
//let's assume we already have a dataframe 'df' that can contain any table
df.cache()
val dfTypes = df.dtypes
for ( i <- 0 to (dfTypes.length - 1)) {
var tupCol = dfTypes(i)
if (tupCol._2 == "StringType" )
df.unpersist()
df = df.withColumn(tupCol._1, regexp_replace(col(tupCol._1), "n", " "))
df.cache()
}
df.unpersist()
The code itself works fine, but when I run this code for ~50 tables in parallel I constantly get the following error for one random table:
18/11/20 04:31:41 WARN TaskSetManager: Lost task 9.0 in stage 6.0 (TID 29, ip-10-114-4-145.us-west-2.compute.internal, executor 1): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:260)
at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:190)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.close(UnsafeRowSerializer.scala:96)
at org.apache.spark.storage.DiskBlockObjectWriter.commitAndGet(DiskBlockObjectWriter.scala:173)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:156)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I can run less or more than 50 jobs but the only one (random) keeps failing.
The jobs are running on EMR cluster with the following configuration:
Master node: r4.2xlarge x 1
Core nodes: m5.2xlarge x 3
Task nodes: m5.2xlarge x (Autoscaling from 1 to 10)
I think my code consumes a lot of memory and disc space because it creates new dataframes in a loop. But I do not see any other solution to process a table without knowing a number of string columns.
I need a suggestion of how optimize the code.
Thanks.
scala dataframe regexp-replace
add a comment |
I have a task to remove all line delimiters (n) from all string columns in a table.
The number of table columns is unknown, the code should process any table.
I wrote the code that would go through all columns in a loop, retrieve a column data type and replace the line delimiter:
//let's assume we already have a dataframe 'df' that can contain any table
df.cache()
val dfTypes = df.dtypes
for ( i <- 0 to (dfTypes.length - 1)) {
var tupCol = dfTypes(i)
if (tupCol._2 == "StringType" )
df.unpersist()
df = df.withColumn(tupCol._1, regexp_replace(col(tupCol._1), "n", " "))
df.cache()
}
df.unpersist()
The code itself works fine, but when I run this code for ~50 tables in parallel I constantly get the following error for one random table:
18/11/20 04:31:41 WARN TaskSetManager: Lost task 9.0 in stage 6.0 (TID 29, ip-10-114-4-145.us-west-2.compute.internal, executor 1): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:260)
at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:190)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.close(UnsafeRowSerializer.scala:96)
at org.apache.spark.storage.DiskBlockObjectWriter.commitAndGet(DiskBlockObjectWriter.scala:173)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:156)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I can run less or more than 50 jobs but the only one (random) keeps failing.
The jobs are running on EMR cluster with the following configuration:
Master node: r4.2xlarge x 1
Core nodes: m5.2xlarge x 3
Task nodes: m5.2xlarge x (Autoscaling from 1 to 10)
I think my code consumes a lot of memory and disc space because it creates new dataframes in a loop. But I do not see any other solution to process a table without knowing a number of string columns.
I need a suggestion of how optimize the code.
Thanks.
scala dataframe regexp-replace
You don't need cache after every change, remember that Spark is lazy on its processing. Thus, it doesn't really need to build a new DF for each iteration, but instead let spark build the Graph of all changes and the run it - I'm sure it will update all columns at once, without consuming too much memory.
– Luis Miguel Mejía Suárez
Nov 20 '18 at 22:43
Thanks for your answer, Luis. I've decided to use Scala foldLeft method which can update all columns at once.
– Dmytro
Nov 21 '18 at 23:04
add a comment |
I have a task to remove all line delimiters (n) from all string columns in a table.
The number of table columns is unknown, the code should process any table.
I wrote the code that would go through all columns in a loop, retrieve a column data type and replace the line delimiter:
//let's assume we already have a dataframe 'df' that can contain any table
df.cache()
val dfTypes = df.dtypes
for ( i <- 0 to (dfTypes.length - 1)) {
var tupCol = dfTypes(i)
if (tupCol._2 == "StringType" )
df.unpersist()
df = df.withColumn(tupCol._1, regexp_replace(col(tupCol._1), "n", " "))
df.cache()
}
df.unpersist()
The code itself works fine, but when I run this code for ~50 tables in parallel I constantly get the following error for one random table:
18/11/20 04:31:41 WARN TaskSetManager: Lost task 9.0 in stage 6.0 (TID 29, ip-10-114-4-145.us-west-2.compute.internal, executor 1): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:260)
at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:190)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.close(UnsafeRowSerializer.scala:96)
at org.apache.spark.storage.DiskBlockObjectWriter.commitAndGet(DiskBlockObjectWriter.scala:173)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:156)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I can run less or more than 50 jobs but the only one (random) keeps failing.
The jobs are running on EMR cluster with the following configuration:
Master node: r4.2xlarge x 1
Core nodes: m5.2xlarge x 3
Task nodes: m5.2xlarge x (Autoscaling from 1 to 10)
I think my code consumes a lot of memory and disc space because it creates new dataframes in a loop. But I do not see any other solution to process a table without knowing a number of string columns.
I need a suggestion of how optimize the code.
Thanks.
scala dataframe regexp-replace
I have a task to remove all line delimiters (n) from all string columns in a table.
The number of table columns is unknown, the code should process any table.
I wrote the code that would go through all columns in a loop, retrieve a column data type and replace the line delimiter:
//let's assume we already have a dataframe 'df' that can contain any table
df.cache()
val dfTypes = df.dtypes
for ( i <- 0 to (dfTypes.length - 1)) {
var tupCol = dfTypes(i)
if (tupCol._2 == "StringType" )
df.unpersist()
df = df.withColumn(tupCol._1, regexp_replace(col(tupCol._1), "n", " "))
df.cache()
}
df.unpersist()
The code itself works fine, but when I run this code for ~50 tables in parallel I constantly get the following error for one random table:
18/11/20 04:31:41 WARN TaskSetManager: Lost task 9.0 in stage 6.0 (TID 29, ip-10-114-4-145.us-west-2.compute.internal, executor 1): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:260)
at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:190)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.close(UnsafeRowSerializer.scala:96)
at org.apache.spark.storage.DiskBlockObjectWriter.commitAndGet(DiskBlockObjectWriter.scala:173)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:156)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I can run less or more than 50 jobs but the only one (random) keeps failing.
The jobs are running on EMR cluster with the following configuration:
Master node: r4.2xlarge x 1
Core nodes: m5.2xlarge x 3
Task nodes: m5.2xlarge x (Autoscaling from 1 to 10)
I think my code consumes a lot of memory and disc space because it creates new dataframes in a loop. But I do not see any other solution to process a table without knowing a number of string columns.
I need a suggestion of how optimize the code.
Thanks.
scala dataframe regexp-replace
scala dataframe regexp-replace
asked Nov 20 '18 at 22:36
DmytroDmytro
11
11
You don't need cache after every change, remember that Spark is lazy on its processing. Thus, it doesn't really need to build a new DF for each iteration, but instead let spark build the Graph of all changes and the run it - I'm sure it will update all columns at once, without consuming too much memory.
– Luis Miguel Mejía Suárez
Nov 20 '18 at 22:43
Thanks for your answer, Luis. I've decided to use Scala foldLeft method which can update all columns at once.
– Dmytro
Nov 21 '18 at 23:04
add a comment |
You don't need cache after every change, remember that Spark is lazy on its processing. Thus, it doesn't really need to build a new DF for each iteration, but instead let spark build the Graph of all changes and the run it - I'm sure it will update all columns at once, without consuming too much memory.
– Luis Miguel Mejía Suárez
Nov 20 '18 at 22:43
Thanks for your answer, Luis. I've decided to use Scala foldLeft method which can update all columns at once.
– Dmytro
Nov 21 '18 at 23:04
You don't need cache after every change, remember that Spark is lazy on its processing. Thus, it doesn't really need to build a new DF for each iteration, but instead let spark build the Graph of all changes and the run it - I'm sure it will update all columns at once, without consuming too much memory.
– Luis Miguel Mejía Suárez
Nov 20 '18 at 22:43
You don't need cache after every change, remember that Spark is lazy on its processing. Thus, it doesn't really need to build a new DF for each iteration, but instead let spark build the Graph of all changes and the run it - I'm sure it will update all columns at once, without consuming too much memory.
– Luis Miguel Mejía Suárez
Nov 20 '18 at 22:43
Thanks for your answer, Luis. I've decided to use Scala foldLeft method which can update all columns at once.
– Dmytro
Nov 21 '18 at 23:04
Thanks for your answer, Luis. I've decided to use Scala foldLeft method which can update all columns at once.
– Dmytro
Nov 21 '18 at 23:04
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53402617%2freplacing-a-character-in-dataframe-string-columns-in-scala%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53402617%2freplacing-a-character-in-dataframe-string-columns-in-scala%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
You don't need cache after every change, remember that Spark is lazy on its processing. Thus, it doesn't really need to build a new DF for each iteration, but instead let spark build the Graph of all changes and the run it - I'm sure it will update all columns at once, without consuming too much memory.
– Luis Miguel Mejía Suárez
Nov 20 '18 at 22:43
Thanks for your answer, Luis. I've decided to use Scala foldLeft method which can update all columns at once.
– Dmytro
Nov 21 '18 at 23:04