PySpark Dataframe create new column based on function return value
I've a dataframe and I want to add a new column based on a value returned by a function. The parameters to this functions are four columns from the same dataframe. This one and this one are somewhat similar to what I want to but doesn't answer my question.
Here is my data frame (there are more columns then these four)
+ ------ + ------ + ------ + ------ +
| lat1 | lng1 | lat2 | lng2 |
+ ------ + ------ + ------ + ------ +
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
+ ------ + ------ + ------ + ------ +
and I want to add another column "distance" which is the total distance between the two location points(latitude/longitude). I've a function which takes four location points as arguments and returns the difference as Float.
def get_distance(lat_1, lng_1, lat_2, lng_2):
d_lat = lat_2 - lat_1
d_lng = lng_2 - lng_1
temp = (
math.sin(d_lat / 2) ** 2
+ math.cos(lat_1)
* math.cos(lat_2)
* math.sin(d_lng / 2) ** 2
)
return 6367.0 * (2 * math.asin(math.sqrt(temp)))
Here is my attempt which resulted in an error and I'm not sure about this approach either, its based on the other questions I've already mentioned.
udf_func = udf(lambda lat_1, lng_1, lat_2, lng_2: get_distance(lat_1, lng_1, lat_2, lng_2), returnType=FloatType())
df1 = df.withColumn('difference', udf_func(df.lat1, df_lng1, df.lat2, df.lng2))
df_subset1.show()
Here is the error stack trace
An error occurred while calling o1300.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 4 times, most recent failure: Lost task 0.3 in stage 50.0 (TID 341, data05.dac.local, executor 255): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
process()
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<stdin>", line 2, in <lambda>
File "<stdin>", line 5, in get_distance
TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withNewExecutionId(Dataset.scala:2788)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
at sun.reflect.GeneratedMethodAccessor94.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
process()
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<stdin>", line 2, in <lambda>
File "<stdin>", line 5, in get_distance
TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Please guide.
python apache-spark dataframe pyspark calculated-columns
add a comment |
I've a dataframe and I want to add a new column based on a value returned by a function. The parameters to this functions are four columns from the same dataframe. This one and this one are somewhat similar to what I want to but doesn't answer my question.
Here is my data frame (there are more columns then these four)
+ ------ + ------ + ------ + ------ +
| lat1 | lng1 | lat2 | lng2 |
+ ------ + ------ + ------ + ------ +
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
+ ------ + ------ + ------ + ------ +
and I want to add another column "distance" which is the total distance between the two location points(latitude/longitude). I've a function which takes four location points as arguments and returns the difference as Float.
def get_distance(lat_1, lng_1, lat_2, lng_2):
d_lat = lat_2 - lat_1
d_lng = lng_2 - lng_1
temp = (
math.sin(d_lat / 2) ** 2
+ math.cos(lat_1)
* math.cos(lat_2)
* math.sin(d_lng / 2) ** 2
)
return 6367.0 * (2 * math.asin(math.sqrt(temp)))
Here is my attempt which resulted in an error and I'm not sure about this approach either, its based on the other questions I've already mentioned.
udf_func = udf(lambda lat_1, lng_1, lat_2, lng_2: get_distance(lat_1, lng_1, lat_2, lng_2), returnType=FloatType())
df1 = df.withColumn('difference', udf_func(df.lat1, df_lng1, df.lat2, df.lng2))
df_subset1.show()
Here is the error stack trace
An error occurred while calling o1300.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 4 times, most recent failure: Lost task 0.3 in stage 50.0 (TID 341, data05.dac.local, executor 255): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
process()
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<stdin>", line 2, in <lambda>
File "<stdin>", line 5, in get_distance
TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withNewExecutionId(Dataset.scala:2788)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
at sun.reflect.GeneratedMethodAccessor94.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
process()
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<stdin>", line 2, in <lambda>
File "<stdin>", line 5, in get_distance
TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Please guide.
python apache-spark dataframe pyspark calculated-columns
add a comment |
I've a dataframe and I want to add a new column based on a value returned by a function. The parameters to this functions are four columns from the same dataframe. This one and this one are somewhat similar to what I want to but doesn't answer my question.
Here is my data frame (there are more columns then these four)
+ ------ + ------ + ------ + ------ +
| lat1 | lng1 | lat2 | lng2 |
+ ------ + ------ + ------ + ------ +
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
+ ------ + ------ + ------ + ------ +
and I want to add another column "distance" which is the total distance between the two location points(latitude/longitude). I've a function which takes four location points as arguments and returns the difference as Float.
def get_distance(lat_1, lng_1, lat_2, lng_2):
d_lat = lat_2 - lat_1
d_lng = lng_2 - lng_1
temp = (
math.sin(d_lat / 2) ** 2
+ math.cos(lat_1)
* math.cos(lat_2)
* math.sin(d_lng / 2) ** 2
)
return 6367.0 * (2 * math.asin(math.sqrt(temp)))
Here is my attempt which resulted in an error and I'm not sure about this approach either, its based on the other questions I've already mentioned.
udf_func = udf(lambda lat_1, lng_1, lat_2, lng_2: get_distance(lat_1, lng_1, lat_2, lng_2), returnType=FloatType())
df1 = df.withColumn('difference', udf_func(df.lat1, df_lng1, df.lat2, df.lng2))
df_subset1.show()
Here is the error stack trace
An error occurred while calling o1300.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 4 times, most recent failure: Lost task 0.3 in stage 50.0 (TID 341, data05.dac.local, executor 255): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
process()
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<stdin>", line 2, in <lambda>
File "<stdin>", line 5, in get_distance
TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withNewExecutionId(Dataset.scala:2788)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
at sun.reflect.GeneratedMethodAccessor94.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
process()
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<stdin>", line 2, in <lambda>
File "<stdin>", line 5, in get_distance
TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Please guide.
python apache-spark dataframe pyspark calculated-columns
I've a dataframe and I want to add a new column based on a value returned by a function. The parameters to this functions are four columns from the same dataframe. This one and this one are somewhat similar to what I want to but doesn't answer my question.
Here is my data frame (there are more columns then these four)
+ ------ + ------ + ------ + ------ +
| lat1 | lng1 | lat2 | lng2 |
+ ------ + ------ + ------ + ------ +
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
+ ------ + ------ + ------ + ------ +
and I want to add another column "distance" which is the total distance between the two location points(latitude/longitude). I've a function which takes four location points as arguments and returns the difference as Float.
def get_distance(lat_1, lng_1, lat_2, lng_2):
d_lat = lat_2 - lat_1
d_lng = lng_2 - lng_1
temp = (
math.sin(d_lat / 2) ** 2
+ math.cos(lat_1)
* math.cos(lat_2)
* math.sin(d_lng / 2) ** 2
)
return 6367.0 * (2 * math.asin(math.sqrt(temp)))
Here is my attempt which resulted in an error and I'm not sure about this approach either, its based on the other questions I've already mentioned.
udf_func = udf(lambda lat_1, lng_1, lat_2, lng_2: get_distance(lat_1, lng_1, lat_2, lng_2), returnType=FloatType())
df1 = df.withColumn('difference', udf_func(df.lat1, df_lng1, df.lat2, df.lng2))
df_subset1.show()
Here is the error stack trace
An error occurred while calling o1300.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 4 times, most recent failure: Lost task 0.3 in stage 50.0 (TID 341, data05.dac.local, executor 255): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
process()
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<stdin>", line 2, in <lambda>
File "<stdin>", line 5, in get_distance
TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withNewExecutionId(Dataset.scala:2788)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
at sun.reflect.GeneratedMethodAccessor94.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
process()
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<stdin>", line 2, in <lambda>
File "<stdin>", line 5, in get_distance
TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Please guide.
python apache-spark dataframe pyspark calculated-columns
python apache-spark dataframe pyspark calculated-columns
edited Nov 22 '18 at 1:22
Ali
asked Nov 22 '18 at 0:52
AliAli
2,68582345
2,68582345
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
The stacktrace part about unicode suggests that the type of the column is StringType since you can't subtract two strings. You can check using df.printSchema()
.
If you convert all your lats and longs to floats (eg float(lat1)
) prior to any calculation your udf should execute fine.
Sir, you are right.
– cph_sto
Nov 22 '18 at 9:26
add a comment |
Let me rewrite it, so that people can understand the context. There are 2 steps -
1.The DataFrame
which was orignally created, was having it's columns in String
format, so calculations can't be done on that. Therefore, as a first step, we must convert all 4 columns into Float
.
2.Apply UDF
on this DataFrame
to create a new column distance
.
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
df = sqlContext.createDataFrame([('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
('-32.92','151.80','-32.89','151.71'),], ("lat1", "lng1", "lat2","lng2"))
print('Original Schema - columns imported as "String"')
df.printSchema() #All colums are Strings.
# Converting String based numbers into float.
df = df.withColumn('lat1', df.lat1.cast("float"))
.withColumn('lng1', df.lng1.cast("float"))
.withColumn('lat2', df.lat2.cast("float"))
.withColumn('lng2', df.lng2.cast("float"))
print('Schema after converting "String" to "Float"')
df.printSchema() #All columns are float now.
df.show()
#Function defined by user, to calculate distance between two points on the globe.
def get_distance(lat_1, lng_1, lat_2, lng_2):
d_lat = lat_2 - lat_1
d_lng = lng_2 - lng_1
temp = (
math.sin(d_lat / 2) ** 2
+ math.cos(lat_1)
* math.cos(lat_2)
* math.sin(d_lng / 2) ** 2
)
return 6367.0 * (2 * math.asin(math.sqrt(temp)))
udf_func = udf(get_distance,FloatType()) #Creating a 'User Defined Function' to calculate distance between two points.
df = df.withColumn("distance",udf_func(df.lat1, df.lng1, df.lat2, df.lng2)) #Creating column "distance" based on function 'get_distance'
df.show()
Output:
Original Schema - columns imported as "String"
root
|-- lat1: string (nullable = true)
|-- lng1: string (nullable = true)
|-- lat2: string (nullable = true)
|-- lng2: string (nullable = true)
Schema after converting "String" to "Float"
root
|-- lat1: float (nullable = true)
|-- lng1: float (nullable = true)
|-- lat2: float (nullable = true)
|-- lng2: float (nullable = true)
+------+-----+------+------+
| lat1| lng1| lat2| lng2|
+------+-----+------+------+
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
+------+-----+------+------+
+------+-----+------+------+---------+
| lat1| lng1| lat2| lng2| distance|
+------+-----+------+------+---------+
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
+------+-----+------+------+---------+
Code works perfectly now.
1
While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.
– Nic3500
Nov 22 '18 at 3:29
Sir, I have made the changes. It should be pretty clear now.
– cph_sto
Nov 22 '18 at 8:55
Thank you very much for your effort - The problem was only with types of the columns. The string columns must be converted to float.
– Ali
Nov 25 '18 at 11:07
Well, your problem stands solved. I am happy.
– cph_sto
Nov 25 '18 at 11:10
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53422473%2fpyspark-dataframe-create-new-column-based-on-function-return-value%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
The stacktrace part about unicode suggests that the type of the column is StringType since you can't subtract two strings. You can check using df.printSchema()
.
If you convert all your lats and longs to floats (eg float(lat1)
) prior to any calculation your udf should execute fine.
Sir, you are right.
– cph_sto
Nov 22 '18 at 9:26
add a comment |
The stacktrace part about unicode suggests that the type of the column is StringType since you can't subtract two strings. You can check using df.printSchema()
.
If you convert all your lats and longs to floats (eg float(lat1)
) prior to any calculation your udf should execute fine.
Sir, you are right.
– cph_sto
Nov 22 '18 at 9:26
add a comment |
The stacktrace part about unicode suggests that the type of the column is StringType since you can't subtract two strings. You can check using df.printSchema()
.
If you convert all your lats and longs to floats (eg float(lat1)
) prior to any calculation your udf should execute fine.
The stacktrace part about unicode suggests that the type of the column is StringType since you can't subtract two strings. You can check using df.printSchema()
.
If you convert all your lats and longs to floats (eg float(lat1)
) prior to any calculation your udf should execute fine.
answered Nov 22 '18 at 5:08
ayplamayplam
855415
855415
Sir, you are right.
– cph_sto
Nov 22 '18 at 9:26
add a comment |
Sir, you are right.
– cph_sto
Nov 22 '18 at 9:26
Sir, you are right.
– cph_sto
Nov 22 '18 at 9:26
Sir, you are right.
– cph_sto
Nov 22 '18 at 9:26
add a comment |
Let me rewrite it, so that people can understand the context. There are 2 steps -
1.The DataFrame
which was orignally created, was having it's columns in String
format, so calculations can't be done on that. Therefore, as a first step, we must convert all 4 columns into Float
.
2.Apply UDF
on this DataFrame
to create a new column distance
.
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
df = sqlContext.createDataFrame([('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
('-32.92','151.80','-32.89','151.71'),], ("lat1", "lng1", "lat2","lng2"))
print('Original Schema - columns imported as "String"')
df.printSchema() #All colums are Strings.
# Converting String based numbers into float.
df = df.withColumn('lat1', df.lat1.cast("float"))
.withColumn('lng1', df.lng1.cast("float"))
.withColumn('lat2', df.lat2.cast("float"))
.withColumn('lng2', df.lng2.cast("float"))
print('Schema after converting "String" to "Float"')
df.printSchema() #All columns are float now.
df.show()
#Function defined by user, to calculate distance between two points on the globe.
def get_distance(lat_1, lng_1, lat_2, lng_2):
d_lat = lat_2 - lat_1
d_lng = lng_2 - lng_1
temp = (
math.sin(d_lat / 2) ** 2
+ math.cos(lat_1)
* math.cos(lat_2)
* math.sin(d_lng / 2) ** 2
)
return 6367.0 * (2 * math.asin(math.sqrt(temp)))
udf_func = udf(get_distance,FloatType()) #Creating a 'User Defined Function' to calculate distance between two points.
df = df.withColumn("distance",udf_func(df.lat1, df.lng1, df.lat2, df.lng2)) #Creating column "distance" based on function 'get_distance'
df.show()
Output:
Original Schema - columns imported as "String"
root
|-- lat1: string (nullable = true)
|-- lng1: string (nullable = true)
|-- lat2: string (nullable = true)
|-- lng2: string (nullable = true)
Schema after converting "String" to "Float"
root
|-- lat1: float (nullable = true)
|-- lng1: float (nullable = true)
|-- lat2: float (nullable = true)
|-- lng2: float (nullable = true)
+------+-----+------+------+
| lat1| lng1| lat2| lng2|
+------+-----+------+------+
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
+------+-----+------+------+
+------+-----+------+------+---------+
| lat1| lng1| lat2| lng2| distance|
+------+-----+------+------+---------+
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
+------+-----+------+------+---------+
Code works perfectly now.
1
While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.
– Nic3500
Nov 22 '18 at 3:29
Sir, I have made the changes. It should be pretty clear now.
– cph_sto
Nov 22 '18 at 8:55
Thank you very much for your effort - The problem was only with types of the columns. The string columns must be converted to float.
– Ali
Nov 25 '18 at 11:07
Well, your problem stands solved. I am happy.
– cph_sto
Nov 25 '18 at 11:10
add a comment |
Let me rewrite it, so that people can understand the context. There are 2 steps -
1.The DataFrame
which was orignally created, was having it's columns in String
format, so calculations can't be done on that. Therefore, as a first step, we must convert all 4 columns into Float
.
2.Apply UDF
on this DataFrame
to create a new column distance
.
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
df = sqlContext.createDataFrame([('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
('-32.92','151.80','-32.89','151.71'),], ("lat1", "lng1", "lat2","lng2"))
print('Original Schema - columns imported as "String"')
df.printSchema() #All colums are Strings.
# Converting String based numbers into float.
df = df.withColumn('lat1', df.lat1.cast("float"))
.withColumn('lng1', df.lng1.cast("float"))
.withColumn('lat2', df.lat2.cast("float"))
.withColumn('lng2', df.lng2.cast("float"))
print('Schema after converting "String" to "Float"')
df.printSchema() #All columns are float now.
df.show()
#Function defined by user, to calculate distance between two points on the globe.
def get_distance(lat_1, lng_1, lat_2, lng_2):
d_lat = lat_2 - lat_1
d_lng = lng_2 - lng_1
temp = (
math.sin(d_lat / 2) ** 2
+ math.cos(lat_1)
* math.cos(lat_2)
* math.sin(d_lng / 2) ** 2
)
return 6367.0 * (2 * math.asin(math.sqrt(temp)))
udf_func = udf(get_distance,FloatType()) #Creating a 'User Defined Function' to calculate distance between two points.
df = df.withColumn("distance",udf_func(df.lat1, df.lng1, df.lat2, df.lng2)) #Creating column "distance" based on function 'get_distance'
df.show()
Output:
Original Schema - columns imported as "String"
root
|-- lat1: string (nullable = true)
|-- lng1: string (nullable = true)
|-- lat2: string (nullable = true)
|-- lng2: string (nullable = true)
Schema after converting "String" to "Float"
root
|-- lat1: float (nullable = true)
|-- lng1: float (nullable = true)
|-- lat2: float (nullable = true)
|-- lng2: float (nullable = true)
+------+-----+------+------+
| lat1| lng1| lat2| lng2|
+------+-----+------+------+
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
+------+-----+------+------+
+------+-----+------+------+---------+
| lat1| lng1| lat2| lng2| distance|
+------+-----+------+------+---------+
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
+------+-----+------+------+---------+
Code works perfectly now.
1
While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.
– Nic3500
Nov 22 '18 at 3:29
Sir, I have made the changes. It should be pretty clear now.
– cph_sto
Nov 22 '18 at 8:55
Thank you very much for your effort - The problem was only with types of the columns. The string columns must be converted to float.
– Ali
Nov 25 '18 at 11:07
Well, your problem stands solved. I am happy.
– cph_sto
Nov 25 '18 at 11:10
add a comment |
Let me rewrite it, so that people can understand the context. There are 2 steps -
1.The DataFrame
which was orignally created, was having it's columns in String
format, so calculations can't be done on that. Therefore, as a first step, we must convert all 4 columns into Float
.
2.Apply UDF
on this DataFrame
to create a new column distance
.
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
df = sqlContext.createDataFrame([('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
('-32.92','151.80','-32.89','151.71'),], ("lat1", "lng1", "lat2","lng2"))
print('Original Schema - columns imported as "String"')
df.printSchema() #All colums are Strings.
# Converting String based numbers into float.
df = df.withColumn('lat1', df.lat1.cast("float"))
.withColumn('lng1', df.lng1.cast("float"))
.withColumn('lat2', df.lat2.cast("float"))
.withColumn('lng2', df.lng2.cast("float"))
print('Schema after converting "String" to "Float"')
df.printSchema() #All columns are float now.
df.show()
#Function defined by user, to calculate distance between two points on the globe.
def get_distance(lat_1, lng_1, lat_2, lng_2):
d_lat = lat_2 - lat_1
d_lng = lng_2 - lng_1
temp = (
math.sin(d_lat / 2) ** 2
+ math.cos(lat_1)
* math.cos(lat_2)
* math.sin(d_lng / 2) ** 2
)
return 6367.0 * (2 * math.asin(math.sqrt(temp)))
udf_func = udf(get_distance,FloatType()) #Creating a 'User Defined Function' to calculate distance between two points.
df = df.withColumn("distance",udf_func(df.lat1, df.lng1, df.lat2, df.lng2)) #Creating column "distance" based on function 'get_distance'
df.show()
Output:
Original Schema - columns imported as "String"
root
|-- lat1: string (nullable = true)
|-- lng1: string (nullable = true)
|-- lat2: string (nullable = true)
|-- lng2: string (nullable = true)
Schema after converting "String" to "Float"
root
|-- lat1: float (nullable = true)
|-- lng1: float (nullable = true)
|-- lat2: float (nullable = true)
|-- lng2: float (nullable = true)
+------+-----+------+------+
| lat1| lng1| lat2| lng2|
+------+-----+------+------+
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
+------+-----+------+------+
+------+-----+------+------+---------+
| lat1| lng1| lat2| lng2| distance|
+------+-----+------+------+---------+
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
+------+-----+------+------+---------+
Code works perfectly now.
Let me rewrite it, so that people can understand the context. There are 2 steps -
1.The DataFrame
which was orignally created, was having it's columns in String
format, so calculations can't be done on that. Therefore, as a first step, we must convert all 4 columns into Float
.
2.Apply UDF
on this DataFrame
to create a new column distance
.
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
df = sqlContext.createDataFrame([('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
('-32.92','151.80','-32.89','151.71'),], ("lat1", "lng1", "lat2","lng2"))
print('Original Schema - columns imported as "String"')
df.printSchema() #All colums are Strings.
# Converting String based numbers into float.
df = df.withColumn('lat1', df.lat1.cast("float"))
.withColumn('lng1', df.lng1.cast("float"))
.withColumn('lat2', df.lat2.cast("float"))
.withColumn('lng2', df.lng2.cast("float"))
print('Schema after converting "String" to "Float"')
df.printSchema() #All columns are float now.
df.show()
#Function defined by user, to calculate distance between two points on the globe.
def get_distance(lat_1, lng_1, lat_2, lng_2):
d_lat = lat_2 - lat_1
d_lng = lng_2 - lng_1
temp = (
math.sin(d_lat / 2) ** 2
+ math.cos(lat_1)
* math.cos(lat_2)
* math.sin(d_lng / 2) ** 2
)
return 6367.0 * (2 * math.asin(math.sqrt(temp)))
udf_func = udf(get_distance,FloatType()) #Creating a 'User Defined Function' to calculate distance between two points.
df = df.withColumn("distance",udf_func(df.lat1, df.lng1, df.lat2, df.lng2)) #Creating column "distance" based on function 'get_distance'
df.show()
Output:
Original Schema - columns imported as "String"
root
|-- lat1: string (nullable = true)
|-- lng1: string (nullable = true)
|-- lat2: string (nullable = true)
|-- lng2: string (nullable = true)
Schema after converting "String" to "Float"
root
|-- lat1: float (nullable = true)
|-- lng1: float (nullable = true)
|-- lat2: float (nullable = true)
|-- lng2: float (nullable = true)
+------+-----+------+------+
| lat1| lng1| lat2| lng2|
+------+-----+------+------+
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
|-32.92|151.8|-32.89|151.71|
+------+-----+------+------+
+------+-----+------+------+---------+
| lat1| lng1| lat2| lng2| distance|
+------+-----+------+------+---------+
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
|-32.92|151.8|-32.89|151.71|196.45587|
+------+-----+------+------+---------+
Code works perfectly now.
edited Nov 22 '18 at 9:09
answered Nov 22 '18 at 1:24
cph_stocph_sto
2,1142421
2,1142421
1
While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.
– Nic3500
Nov 22 '18 at 3:29
Sir, I have made the changes. It should be pretty clear now.
– cph_sto
Nov 22 '18 at 8:55
Thank you very much for your effort - The problem was only with types of the columns. The string columns must be converted to float.
– Ali
Nov 25 '18 at 11:07
Well, your problem stands solved. I am happy.
– cph_sto
Nov 25 '18 at 11:10
add a comment |
1
While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.
– Nic3500
Nov 22 '18 at 3:29
Sir, I have made the changes. It should be pretty clear now.
– cph_sto
Nov 22 '18 at 8:55
Thank you very much for your effort - The problem was only with types of the columns. The string columns must be converted to float.
– Ali
Nov 25 '18 at 11:07
Well, your problem stands solved. I am happy.
– cph_sto
Nov 25 '18 at 11:10
1
1
While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.
– Nic3500
Nov 22 '18 at 3:29
While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.
– Nic3500
Nov 22 '18 at 3:29
Sir, I have made the changes. It should be pretty clear now.
– cph_sto
Nov 22 '18 at 8:55
Sir, I have made the changes. It should be pretty clear now.
– cph_sto
Nov 22 '18 at 8:55
Thank you very much for your effort - The problem was only with types of the columns. The string columns must be converted to float.
– Ali
Nov 25 '18 at 11:07
Thank you very much for your effort - The problem was only with types of the columns. The string columns must be converted to float.
– Ali
Nov 25 '18 at 11:07
Well, your problem stands solved. I am happy.
– cph_sto
Nov 25 '18 at 11:10
Well, your problem stands solved. I am happy.
– cph_sto
Nov 25 '18 at 11:10
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53422473%2fpyspark-dataframe-create-new-column-based-on-function-return-value%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown