Replacing a character in dataframe string columns in Scala












0















I have a task to remove all line delimiters (n) from all string columns in a table.
The number of table columns is unknown, the code should process any table.



I wrote the code that would go through all columns in a loop, retrieve a column data type and replace the line delimiter:



  //let's assume we already have a dataframe 'df' that can contain any table
df.cache()
val dfTypes = df.dtypes
for ( i <- 0 to (dfTypes.length - 1)) {
var tupCol = dfTypes(i)
if (tupCol._2 == "StringType" )
df.unpersist()
df = df.withColumn(tupCol._1, regexp_replace(col(tupCol._1), "n", " "))
df.cache()
}
df.unpersist()


The code itself works fine, but when I run this code for ~50 tables in parallel I constantly get the following error for one random table:



18/11/20 04:31:41 WARN TaskSetManager: Lost task 9.0 in stage 6.0 (TID 29, ip-10-114-4-145.us-west-2.compute.internal, executor 1): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:260)
at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:190)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.close(UnsafeRowSerializer.scala:96)
at org.apache.spark.storage.DiskBlockObjectWriter.commitAndGet(DiskBlockObjectWriter.scala:173)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:156)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


I can run less or more than 50 jobs but the only one (random) keeps failing.



The jobs are running on EMR cluster with the following configuration:



Master node: r4.2xlarge x 1



Core nodes: m5.2xlarge x 3



Task nodes: m5.2xlarge x (Autoscaling from 1 to 10)



I think my code consumes a lot of memory and disc space because it creates new dataframes in a loop. But I do not see any other solution to process a table without knowing a number of string columns.
I need a suggestion of how optimize the code.
Thanks.










share|improve this question























  • You don't need cache after every change, remember that Spark is lazy on its processing. Thus, it doesn't really need to build a new DF for each iteration, but instead let spark build the Graph of all changes and the run it - I'm sure it will update all columns at once, without consuming too much memory.

    – Luis Miguel Mejía Suárez
    Nov 20 '18 at 22:43













  • Thanks for your answer, Luis. I've decided to use Scala foldLeft method which can update all columns at once.

    – Dmytro
    Nov 21 '18 at 23:04
















0















I have a task to remove all line delimiters (n) from all string columns in a table.
The number of table columns is unknown, the code should process any table.



I wrote the code that would go through all columns in a loop, retrieve a column data type and replace the line delimiter:



  //let's assume we already have a dataframe 'df' that can contain any table
df.cache()
val dfTypes = df.dtypes
for ( i <- 0 to (dfTypes.length - 1)) {
var tupCol = dfTypes(i)
if (tupCol._2 == "StringType" )
df.unpersist()
df = df.withColumn(tupCol._1, regexp_replace(col(tupCol._1), "n", " "))
df.cache()
}
df.unpersist()


The code itself works fine, but when I run this code for ~50 tables in parallel I constantly get the following error for one random table:



18/11/20 04:31:41 WARN TaskSetManager: Lost task 9.0 in stage 6.0 (TID 29, ip-10-114-4-145.us-west-2.compute.internal, executor 1): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:260)
at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:190)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.close(UnsafeRowSerializer.scala:96)
at org.apache.spark.storage.DiskBlockObjectWriter.commitAndGet(DiskBlockObjectWriter.scala:173)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:156)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


I can run less or more than 50 jobs but the only one (random) keeps failing.



The jobs are running on EMR cluster with the following configuration:



Master node: r4.2xlarge x 1



Core nodes: m5.2xlarge x 3



Task nodes: m5.2xlarge x (Autoscaling from 1 to 10)



I think my code consumes a lot of memory and disc space because it creates new dataframes in a loop. But I do not see any other solution to process a table without knowing a number of string columns.
I need a suggestion of how optimize the code.
Thanks.










share|improve this question























  • You don't need cache after every change, remember that Spark is lazy on its processing. Thus, it doesn't really need to build a new DF for each iteration, but instead let spark build the Graph of all changes and the run it - I'm sure it will update all columns at once, without consuming too much memory.

    – Luis Miguel Mejía Suárez
    Nov 20 '18 at 22:43













  • Thanks for your answer, Luis. I've decided to use Scala foldLeft method which can update all columns at once.

    – Dmytro
    Nov 21 '18 at 23:04














0












0








0








I have a task to remove all line delimiters (n) from all string columns in a table.
The number of table columns is unknown, the code should process any table.



I wrote the code that would go through all columns in a loop, retrieve a column data type and replace the line delimiter:



  //let's assume we already have a dataframe 'df' that can contain any table
df.cache()
val dfTypes = df.dtypes
for ( i <- 0 to (dfTypes.length - 1)) {
var tupCol = dfTypes(i)
if (tupCol._2 == "StringType" )
df.unpersist()
df = df.withColumn(tupCol._1, regexp_replace(col(tupCol._1), "n", " "))
df.cache()
}
df.unpersist()


The code itself works fine, but when I run this code for ~50 tables in parallel I constantly get the following error for one random table:



18/11/20 04:31:41 WARN TaskSetManager: Lost task 9.0 in stage 6.0 (TID 29, ip-10-114-4-145.us-west-2.compute.internal, executor 1): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:260)
at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:190)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.close(UnsafeRowSerializer.scala:96)
at org.apache.spark.storage.DiskBlockObjectWriter.commitAndGet(DiskBlockObjectWriter.scala:173)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:156)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


I can run less or more than 50 jobs but the only one (random) keeps failing.



The jobs are running on EMR cluster with the following configuration:



Master node: r4.2xlarge x 1



Core nodes: m5.2xlarge x 3



Task nodes: m5.2xlarge x (Autoscaling from 1 to 10)



I think my code consumes a lot of memory and disc space because it creates new dataframes in a loop. But I do not see any other solution to process a table without knowing a number of string columns.
I need a suggestion of how optimize the code.
Thanks.










share|improve this question














I have a task to remove all line delimiters (n) from all string columns in a table.
The number of table columns is unknown, the code should process any table.



I wrote the code that would go through all columns in a loop, retrieve a column data type and replace the line delimiter:



  //let's assume we already have a dataframe 'df' that can contain any table
df.cache()
val dfTypes = df.dtypes
for ( i <- 0 to (dfTypes.length - 1)) {
var tupCol = dfTypes(i)
if (tupCol._2 == "StringType" )
df.unpersist()
df = df.withColumn(tupCol._1, regexp_replace(col(tupCol._1), "n", " "))
df.cache()
}
df.unpersist()


The code itself works fine, but when I run this code for ~50 tables in parallel I constantly get the following error for one random table:



18/11/20 04:31:41 WARN TaskSetManager: Lost task 9.0 in stage 6.0 (TID 29, ip-10-114-4-145.us-west-2.compute.internal, executor 1): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:260)
at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:190)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.close(UnsafeRowSerializer.scala:96)
at org.apache.spark.storage.DiskBlockObjectWriter.commitAndGet(DiskBlockObjectWriter.scala:173)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:156)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


I can run less or more than 50 jobs but the only one (random) keeps failing.



The jobs are running on EMR cluster with the following configuration:



Master node: r4.2xlarge x 1



Core nodes: m5.2xlarge x 3



Task nodes: m5.2xlarge x (Autoscaling from 1 to 10)



I think my code consumes a lot of memory and disc space because it creates new dataframes in a loop. But I do not see any other solution to process a table without knowing a number of string columns.
I need a suggestion of how optimize the code.
Thanks.







scala dataframe regexp-replace






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 20 '18 at 22:36









DmytroDmytro

11




11













  • You don't need cache after every change, remember that Spark is lazy on its processing. Thus, it doesn't really need to build a new DF for each iteration, but instead let spark build the Graph of all changes and the run it - I'm sure it will update all columns at once, without consuming too much memory.

    – Luis Miguel Mejía Suárez
    Nov 20 '18 at 22:43













  • Thanks for your answer, Luis. I've decided to use Scala foldLeft method which can update all columns at once.

    – Dmytro
    Nov 21 '18 at 23:04



















  • You don't need cache after every change, remember that Spark is lazy on its processing. Thus, it doesn't really need to build a new DF for each iteration, but instead let spark build the Graph of all changes and the run it - I'm sure it will update all columns at once, without consuming too much memory.

    – Luis Miguel Mejía Suárez
    Nov 20 '18 at 22:43













  • Thanks for your answer, Luis. I've decided to use Scala foldLeft method which can update all columns at once.

    – Dmytro
    Nov 21 '18 at 23:04

















You don't need cache after every change, remember that Spark is lazy on its processing. Thus, it doesn't really need to build a new DF for each iteration, but instead let spark build the Graph of all changes and the run it - I'm sure it will update all columns at once, without consuming too much memory.

– Luis Miguel Mejía Suárez
Nov 20 '18 at 22:43







You don't need cache after every change, remember that Spark is lazy on its processing. Thus, it doesn't really need to build a new DF for each iteration, but instead let spark build the Graph of all changes and the run it - I'm sure it will update all columns at once, without consuming too much memory.

– Luis Miguel Mejía Suárez
Nov 20 '18 at 22:43















Thanks for your answer, Luis. I've decided to use Scala foldLeft method which can update all columns at once.

– Dmytro
Nov 21 '18 at 23:04





Thanks for your answer, Luis. I've decided to use Scala foldLeft method which can update all columns at once.

– Dmytro
Nov 21 '18 at 23:04












0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53402617%2freplacing-a-character-in-dataframe-string-columns-in-scala%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53402617%2freplacing-a-character-in-dataframe-string-columns-in-scala%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

If I really need a card on my start hand, how many mulligans make sense? [duplicate]

Alcedinidae

Can an atomic nucleus contain both particles and antiparticles? [duplicate]