PySpark Dataframe create new column based on function return value












1















I've a dataframe and I want to add a new column based on a value returned by a function. The parameters to this functions are four columns from the same dataframe. This one and this one are somewhat similar to what I want to but doesn't answer my question.



Here is my data frame (there are more columns then these four)



 + ------ + ------ + ------ + ------ +
| lat1 | lng1 | lat2 | lng2 |
+ ------ + ------ + ------ + ------ +
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
| -32.92 | 151.80 | -32.89 | 151.71 |
+ ------ + ------ + ------ + ------ +


and I want to add another column "distance" which is the total distance between the two location points(latitude/longitude). I've a function which takes four location points as arguments and returns the difference as Float.



def get_distance(lat_1, lng_1, lat_2, lng_2): 
d_lat = lat_2 - lat_1
d_lng = lng_2 - lng_1

temp = (
math.sin(d_lat / 2) ** 2
+ math.cos(lat_1)
* math.cos(lat_2)
* math.sin(d_lng / 2) ** 2
)

return 6367.0 * (2 * math.asin(math.sqrt(temp)))


Here is my attempt which resulted in an error and I'm not sure about this approach either, its based on the other questions I've already mentioned.



udf_func = udf(lambda lat_1, lng_1, lat_2, lng_2: get_distance(lat_1, lng_1, lat_2, lng_2), returnType=FloatType())

df1 = df.withColumn('difference', udf_func(df.lat1, df_lng1, df.lat2, df.lng2))
df_subset1.show()


Here is the error stack trace



An error occurred while calling o1300.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 4 times, most recent failure: Lost task 0.3 in stage 50.0 (TID 341, data05.dac.local, executor 255): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
process()
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<stdin>", line 2, in <lambda>
File "<stdin>", line 5, in get_distance
TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withNewExecutionId(Dataset.scala:2788)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
at sun.reflect.GeneratedMethodAccessor94.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
process()
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<stdin>", line 2, in <lambda>
File "<stdin>", line 5, in get_distance
TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more


Please guide.










share|improve this question





























    1















    I've a dataframe and I want to add a new column based on a value returned by a function. The parameters to this functions are four columns from the same dataframe. This one and this one are somewhat similar to what I want to but doesn't answer my question.



    Here is my data frame (there are more columns then these four)



     + ------ + ------ + ------ + ------ +
    | lat1 | lng1 | lat2 | lng2 |
    + ------ + ------ + ------ + ------ +
    | -32.92 | 151.80 | -32.89 | 151.71 |
    | -32.92 | 151.80 | -32.89 | 151.71 |
    | -32.92 | 151.80 | -32.89 | 151.71 |
    | -32.92 | 151.80 | -32.89 | 151.71 |
    | -32.92 | 151.80 | -32.89 | 151.71 |
    + ------ + ------ + ------ + ------ +


    and I want to add another column "distance" which is the total distance between the two location points(latitude/longitude). I've a function which takes four location points as arguments and returns the difference as Float.



    def get_distance(lat_1, lng_1, lat_2, lng_2): 
    d_lat = lat_2 - lat_1
    d_lng = lng_2 - lng_1

    temp = (
    math.sin(d_lat / 2) ** 2
    + math.cos(lat_1)
    * math.cos(lat_2)
    * math.sin(d_lng / 2) ** 2
    )

    return 6367.0 * (2 * math.asin(math.sqrt(temp)))


    Here is my attempt which resulted in an error and I'm not sure about this approach either, its based on the other questions I've already mentioned.



    udf_func = udf(lambda lat_1, lng_1, lat_2, lng_2: get_distance(lat_1, lng_1, lat_2, lng_2), returnType=FloatType())

    df1 = df.withColumn('difference', udf_func(df.lat1, df_lng1, df.lat2, df.lng2))
    df_subset1.show()


    Here is the error stack trace



    An error occurred while calling o1300.showString.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 4 times, most recent failure: Lost task 0.3 in stage 50.0 (TID 341, data05.dac.local, executor 255): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
    File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
    process()
    File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
    File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
    func = lambda _, it: map(mapper, it)
    File "<string>", line 1, in <lambda>
    File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
    File "<stdin>", line 2, in <lambda>
    File "<stdin>", line 5, in get_distance
    TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withNewExecutionId(Dataset.scala:2788)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
    at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
    at sun.reflect.GeneratedMethodAccessor94.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
    File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
    process()
    File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
    File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
    func = lambda _, it: map(mapper, it)
    File "<string>", line 1, in <lambda>
    File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
    File "<stdin>", line 2, in <lambda>
    File "<stdin>", line 5, in get_distance
    TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more


    Please guide.










    share|improve this question



























      1












      1








      1








      I've a dataframe and I want to add a new column based on a value returned by a function. The parameters to this functions are four columns from the same dataframe. This one and this one are somewhat similar to what I want to but doesn't answer my question.



      Here is my data frame (there are more columns then these four)



       + ------ + ------ + ------ + ------ +
      | lat1 | lng1 | lat2 | lng2 |
      + ------ + ------ + ------ + ------ +
      | -32.92 | 151.80 | -32.89 | 151.71 |
      | -32.92 | 151.80 | -32.89 | 151.71 |
      | -32.92 | 151.80 | -32.89 | 151.71 |
      | -32.92 | 151.80 | -32.89 | 151.71 |
      | -32.92 | 151.80 | -32.89 | 151.71 |
      + ------ + ------ + ------ + ------ +


      and I want to add another column "distance" which is the total distance between the two location points(latitude/longitude). I've a function which takes four location points as arguments and returns the difference as Float.



      def get_distance(lat_1, lng_1, lat_2, lng_2): 
      d_lat = lat_2 - lat_1
      d_lng = lng_2 - lng_1

      temp = (
      math.sin(d_lat / 2) ** 2
      + math.cos(lat_1)
      * math.cos(lat_2)
      * math.sin(d_lng / 2) ** 2
      )

      return 6367.0 * (2 * math.asin(math.sqrt(temp)))


      Here is my attempt which resulted in an error and I'm not sure about this approach either, its based on the other questions I've already mentioned.



      udf_func = udf(lambda lat_1, lng_1, lat_2, lng_2: get_distance(lat_1, lng_1, lat_2, lng_2), returnType=FloatType())

      df1 = df.withColumn('difference', udf_func(df.lat1, df_lng1, df.lat2, df.lng2))
      df_subset1.show()


      Here is the error stack trace



      An error occurred while calling o1300.showString.
      : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 4 times, most recent failure: Lost task 0.3 in stage 50.0 (TID 341, data05.dac.local, executor 255): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
      process()
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
      serializer.dump_stream(func(split_index, iterator), outfile)
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
      func = lambda _, it: map(mapper, it)
      File "<string>", line 1, in <lambda>
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
      return lambda *a: f(*a)
      File "<stdin>", line 2, in <lambda>
      File "<stdin>", line 5, in get_distance
      TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
      at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
      at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
      at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
      at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
      at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      Driver stacktrace:
      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
      at scala.Option.foreach(Option.scala:257)
      at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
      at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
      at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
      at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
      at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
      at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withNewExecutionId(Dataset.scala:2788)
      at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
      at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
      at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
      at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
      at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
      at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
      at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
      at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
      at sun.reflect.GeneratedMethodAccessor94.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      at py4j.Gateway.invoke(Gateway.java:280)
      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      at py4j.commands.CallCommand.execute(CallCommand.java:79)
      at py4j.GatewayConnection.run(GatewayConnection.java:214)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
      process()
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
      serializer.dump_stream(func(split_index, iterator), outfile)
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
      func = lambda _, it: map(mapper, it)
      File "<string>", line 1, in <lambda>
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
      return lambda *a: f(*a)
      File "<stdin>", line 2, in <lambda>
      File "<stdin>", line 5, in get_distance
      TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
      at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
      at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
      at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
      at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
      at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      ... 1 more


      Please guide.










      share|improve this question
















      I've a dataframe and I want to add a new column based on a value returned by a function. The parameters to this functions are four columns from the same dataframe. This one and this one are somewhat similar to what I want to but doesn't answer my question.



      Here is my data frame (there are more columns then these four)



       + ------ + ------ + ------ + ------ +
      | lat1 | lng1 | lat2 | lng2 |
      + ------ + ------ + ------ + ------ +
      | -32.92 | 151.80 | -32.89 | 151.71 |
      | -32.92 | 151.80 | -32.89 | 151.71 |
      | -32.92 | 151.80 | -32.89 | 151.71 |
      | -32.92 | 151.80 | -32.89 | 151.71 |
      | -32.92 | 151.80 | -32.89 | 151.71 |
      + ------ + ------ + ------ + ------ +


      and I want to add another column "distance" which is the total distance between the two location points(latitude/longitude). I've a function which takes four location points as arguments and returns the difference as Float.



      def get_distance(lat_1, lng_1, lat_2, lng_2): 
      d_lat = lat_2 - lat_1
      d_lng = lng_2 - lng_1

      temp = (
      math.sin(d_lat / 2) ** 2
      + math.cos(lat_1)
      * math.cos(lat_2)
      * math.sin(d_lng / 2) ** 2
      )

      return 6367.0 * (2 * math.asin(math.sqrt(temp)))


      Here is my attempt which resulted in an error and I'm not sure about this approach either, its based on the other questions I've already mentioned.



      udf_func = udf(lambda lat_1, lng_1, lat_2, lng_2: get_distance(lat_1, lng_1, lat_2, lng_2), returnType=FloatType())

      df1 = df.withColumn('difference', udf_func(df.lat1, df_lng1, df.lat2, df.lng2))
      df_subset1.show()


      Here is the error stack trace



      An error occurred while calling o1300.showString.
      : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 4 times, most recent failure: Lost task 0.3 in stage 50.0 (TID 341, data05.dac.local, executor 255): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
      process()
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
      serializer.dump_stream(func(split_index, iterator), outfile)
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
      func = lambda _, it: map(mapper, it)
      File "<string>", line 1, in <lambda>
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
      return lambda *a: f(*a)
      File "<stdin>", line 2, in <lambda>
      File "<stdin>", line 5, in get_distance
      TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
      at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
      at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
      at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
      at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
      at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      Driver stacktrace:
      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
      at scala.Option.foreach(Option.scala:257)
      at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
      at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
      at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
      at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
      at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
      at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withNewExecutionId(Dataset.scala:2788)
      at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
      at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
      at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
      at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
      at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
      at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
      at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
      at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
      at sun.reflect.GeneratedMethodAccessor94.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      at py4j.Gateway.invoke(Gateway.java:280)
      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      at py4j.commands.CallCommand.execute(CallCommand.java:79)
      at py4j.GatewayConnection.run(GatewayConnection.java:214)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 171, in main
      process()
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 166, in process
      serializer.dump_stream(func(split_index, iterator), outfile)
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 103, in <lambda>
      func = lambda _, it: map(mapper, it)
      File "<string>", line 1, in <lambda>
      File "/hadoop/log/yarn/local/usercache/baiga/appcache/application_1541820416317_0349/container_e122_1541820416317_0349_01_000269/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
      return lambda *a: f(*a)
      File "<stdin>", line 2, in <lambda>
      File "<stdin>", line 5, in get_distance
      TypeError: unsupported operand type(s) for -: 'unicode' and 'unicode'
      at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
      at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
      at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
      at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
      at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      ... 1 more


      Please guide.







      python apache-spark dataframe pyspark calculated-columns






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 22 '18 at 1:22







      Ali

















      asked Nov 22 '18 at 0:52









      AliAli

      2,68582345




      2,68582345
























          2 Answers
          2






          active

          oldest

          votes


















          3














          The stacktrace part about unicode suggests that the type of the column is StringType since you can't subtract two strings. You can check using df.printSchema().



          If you convert all your lats and longs to floats (eg float(lat1)) prior to any calculation your udf should execute fine.






          share|improve this answer
























          • Sir, you are right.

            – cph_sto
            Nov 22 '18 at 9:26



















          4














          Let me rewrite it, so that people can understand the context. There are 2 steps -



          1.The DataFrame which was orignally created, was having it's columns in String format, so calculations can't be done on that. Therefore, as a first step, we must convert all 4 columns into Float.



          2.Apply UDF on this DataFrame to create a new column distance.



          import math
          from pyspark.sql.functions import udf
          from pyspark.sql.types import FloatType
          df = sqlContext.createDataFrame([('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
          ('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
          ('-32.92','151.80','-32.89','151.71'),], ("lat1", "lng1", "lat2","lng2"))
          print('Original Schema - columns imported as "String"')
          df.printSchema() #All colums are Strings.
          # Converting String based numbers into float.
          df = df.withColumn('lat1', df.lat1.cast("float"))
          .withColumn('lng1', df.lng1.cast("float"))
          .withColumn('lat2', df.lat2.cast("float"))
          .withColumn('lng2', df.lng2.cast("float"))

          print('Schema after converting "String" to "Float"')
          df.printSchema() #All columns are float now.
          df.show()
          #Function defined by user, to calculate distance between two points on the globe.
          def get_distance(lat_1, lng_1, lat_2, lng_2):
          d_lat = lat_2 - lat_1
          d_lng = lng_2 - lng_1

          temp = (
          math.sin(d_lat / 2) ** 2
          + math.cos(lat_1)
          * math.cos(lat_2)
          * math.sin(d_lng / 2) ** 2
          )
          return 6367.0 * (2 * math.asin(math.sqrt(temp)))

          udf_func = udf(get_distance,FloatType()) #Creating a 'User Defined Function' to calculate distance between two points.
          df = df.withColumn("distance",udf_func(df.lat1, df.lng1, df.lat2, df.lng2)) #Creating column "distance" based on function 'get_distance'
          df.show()


          Output:



          Original Schema - columns imported as "String"
          root
          |-- lat1: string (nullable = true)
          |-- lng1: string (nullable = true)
          |-- lat2: string (nullable = true)
          |-- lng2: string (nullable = true)

          Schema after converting "String" to "Float"
          root
          |-- lat1: float (nullable = true)
          |-- lng1: float (nullable = true)
          |-- lat2: float (nullable = true)
          |-- lng2: float (nullable = true)

          +------+-----+------+------+
          | lat1| lng1| lat2| lng2|
          +------+-----+------+------+
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          +------+-----+------+------+

          +------+-----+------+------+---------+
          | lat1| lng1| lat2| lng2| distance|
          +------+-----+------+------+---------+
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          +------+-----+------+------+---------+


          Code works perfectly now.






          share|improve this answer





















          • 1





            While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.

            – Nic3500
            Nov 22 '18 at 3:29











          • Sir, I have made the changes. It should be pretty clear now.

            – cph_sto
            Nov 22 '18 at 8:55











          • Thank you very much for your effort - The problem was only with types of the columns. The string columns must be converted to float.

            – Ali
            Nov 25 '18 at 11:07











          • Well, your problem stands solved. I am happy.

            – cph_sto
            Nov 25 '18 at 11:10











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53422473%2fpyspark-dataframe-create-new-column-based-on-function-return-value%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          2 Answers
          2






          active

          oldest

          votes








          2 Answers
          2






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          3














          The stacktrace part about unicode suggests that the type of the column is StringType since you can't subtract two strings. You can check using df.printSchema().



          If you convert all your lats and longs to floats (eg float(lat1)) prior to any calculation your udf should execute fine.






          share|improve this answer
























          • Sir, you are right.

            – cph_sto
            Nov 22 '18 at 9:26
















          3














          The stacktrace part about unicode suggests that the type of the column is StringType since you can't subtract two strings. You can check using df.printSchema().



          If you convert all your lats and longs to floats (eg float(lat1)) prior to any calculation your udf should execute fine.






          share|improve this answer
























          • Sir, you are right.

            – cph_sto
            Nov 22 '18 at 9:26














          3












          3








          3







          The stacktrace part about unicode suggests that the type of the column is StringType since you can't subtract two strings. You can check using df.printSchema().



          If you convert all your lats and longs to floats (eg float(lat1)) prior to any calculation your udf should execute fine.






          share|improve this answer













          The stacktrace part about unicode suggests that the type of the column is StringType since you can't subtract two strings. You can check using df.printSchema().



          If you convert all your lats and longs to floats (eg float(lat1)) prior to any calculation your udf should execute fine.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 22 '18 at 5:08









          ayplamayplam

          855415




          855415













          • Sir, you are right.

            – cph_sto
            Nov 22 '18 at 9:26



















          • Sir, you are right.

            – cph_sto
            Nov 22 '18 at 9:26

















          Sir, you are right.

          – cph_sto
          Nov 22 '18 at 9:26





          Sir, you are right.

          – cph_sto
          Nov 22 '18 at 9:26













          4














          Let me rewrite it, so that people can understand the context. There are 2 steps -



          1.The DataFrame which was orignally created, was having it's columns in String format, so calculations can't be done on that. Therefore, as a first step, we must convert all 4 columns into Float.



          2.Apply UDF on this DataFrame to create a new column distance.



          import math
          from pyspark.sql.functions import udf
          from pyspark.sql.types import FloatType
          df = sqlContext.createDataFrame([('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
          ('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
          ('-32.92','151.80','-32.89','151.71'),], ("lat1", "lng1", "lat2","lng2"))
          print('Original Schema - columns imported as "String"')
          df.printSchema() #All colums are Strings.
          # Converting String based numbers into float.
          df = df.withColumn('lat1', df.lat1.cast("float"))
          .withColumn('lng1', df.lng1.cast("float"))
          .withColumn('lat2', df.lat2.cast("float"))
          .withColumn('lng2', df.lng2.cast("float"))

          print('Schema after converting "String" to "Float"')
          df.printSchema() #All columns are float now.
          df.show()
          #Function defined by user, to calculate distance between two points on the globe.
          def get_distance(lat_1, lng_1, lat_2, lng_2):
          d_lat = lat_2 - lat_1
          d_lng = lng_2 - lng_1

          temp = (
          math.sin(d_lat / 2) ** 2
          + math.cos(lat_1)
          * math.cos(lat_2)
          * math.sin(d_lng / 2) ** 2
          )
          return 6367.0 * (2 * math.asin(math.sqrt(temp)))

          udf_func = udf(get_distance,FloatType()) #Creating a 'User Defined Function' to calculate distance between two points.
          df = df.withColumn("distance",udf_func(df.lat1, df.lng1, df.lat2, df.lng2)) #Creating column "distance" based on function 'get_distance'
          df.show()


          Output:



          Original Schema - columns imported as "String"
          root
          |-- lat1: string (nullable = true)
          |-- lng1: string (nullable = true)
          |-- lat2: string (nullable = true)
          |-- lng2: string (nullable = true)

          Schema after converting "String" to "Float"
          root
          |-- lat1: float (nullable = true)
          |-- lng1: float (nullable = true)
          |-- lat2: float (nullable = true)
          |-- lng2: float (nullable = true)

          +------+-----+------+------+
          | lat1| lng1| lat2| lng2|
          +------+-----+------+------+
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          +------+-----+------+------+

          +------+-----+------+------+---------+
          | lat1| lng1| lat2| lng2| distance|
          +------+-----+------+------+---------+
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          +------+-----+------+------+---------+


          Code works perfectly now.






          share|improve this answer





















          • 1





            While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.

            – Nic3500
            Nov 22 '18 at 3:29











          • Sir, I have made the changes. It should be pretty clear now.

            – cph_sto
            Nov 22 '18 at 8:55











          • Thank you very much for your effort - The problem was only with types of the columns. The string columns must be converted to float.

            – Ali
            Nov 25 '18 at 11:07











          • Well, your problem stands solved. I am happy.

            – cph_sto
            Nov 25 '18 at 11:10
















          4














          Let me rewrite it, so that people can understand the context. There are 2 steps -



          1.The DataFrame which was orignally created, was having it's columns in String format, so calculations can't be done on that. Therefore, as a first step, we must convert all 4 columns into Float.



          2.Apply UDF on this DataFrame to create a new column distance.



          import math
          from pyspark.sql.functions import udf
          from pyspark.sql.types import FloatType
          df = sqlContext.createDataFrame([('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
          ('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
          ('-32.92','151.80','-32.89','151.71'),], ("lat1", "lng1", "lat2","lng2"))
          print('Original Schema - columns imported as "String"')
          df.printSchema() #All colums are Strings.
          # Converting String based numbers into float.
          df = df.withColumn('lat1', df.lat1.cast("float"))
          .withColumn('lng1', df.lng1.cast("float"))
          .withColumn('lat2', df.lat2.cast("float"))
          .withColumn('lng2', df.lng2.cast("float"))

          print('Schema after converting "String" to "Float"')
          df.printSchema() #All columns are float now.
          df.show()
          #Function defined by user, to calculate distance between two points on the globe.
          def get_distance(lat_1, lng_1, lat_2, lng_2):
          d_lat = lat_2 - lat_1
          d_lng = lng_2 - lng_1

          temp = (
          math.sin(d_lat / 2) ** 2
          + math.cos(lat_1)
          * math.cos(lat_2)
          * math.sin(d_lng / 2) ** 2
          )
          return 6367.0 * (2 * math.asin(math.sqrt(temp)))

          udf_func = udf(get_distance,FloatType()) #Creating a 'User Defined Function' to calculate distance between two points.
          df = df.withColumn("distance",udf_func(df.lat1, df.lng1, df.lat2, df.lng2)) #Creating column "distance" based on function 'get_distance'
          df.show()


          Output:



          Original Schema - columns imported as "String"
          root
          |-- lat1: string (nullable = true)
          |-- lng1: string (nullable = true)
          |-- lat2: string (nullable = true)
          |-- lng2: string (nullable = true)

          Schema after converting "String" to "Float"
          root
          |-- lat1: float (nullable = true)
          |-- lng1: float (nullable = true)
          |-- lat2: float (nullable = true)
          |-- lng2: float (nullable = true)

          +------+-----+------+------+
          | lat1| lng1| lat2| lng2|
          +------+-----+------+------+
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          +------+-----+------+------+

          +------+-----+------+------+---------+
          | lat1| lng1| lat2| lng2| distance|
          +------+-----+------+------+---------+
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          +------+-----+------+------+---------+


          Code works perfectly now.






          share|improve this answer





















          • 1





            While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.

            – Nic3500
            Nov 22 '18 at 3:29











          • Sir, I have made the changes. It should be pretty clear now.

            – cph_sto
            Nov 22 '18 at 8:55











          • Thank you very much for your effort - The problem was only with types of the columns. The string columns must be converted to float.

            – Ali
            Nov 25 '18 at 11:07











          • Well, your problem stands solved. I am happy.

            – cph_sto
            Nov 25 '18 at 11:10














          4












          4








          4







          Let me rewrite it, so that people can understand the context. There are 2 steps -



          1.The DataFrame which was orignally created, was having it's columns in String format, so calculations can't be done on that. Therefore, as a first step, we must convert all 4 columns into Float.



          2.Apply UDF on this DataFrame to create a new column distance.



          import math
          from pyspark.sql.functions import udf
          from pyspark.sql.types import FloatType
          df = sqlContext.createDataFrame([('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
          ('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
          ('-32.92','151.80','-32.89','151.71'),], ("lat1", "lng1", "lat2","lng2"))
          print('Original Schema - columns imported as "String"')
          df.printSchema() #All colums are Strings.
          # Converting String based numbers into float.
          df = df.withColumn('lat1', df.lat1.cast("float"))
          .withColumn('lng1', df.lng1.cast("float"))
          .withColumn('lat2', df.lat2.cast("float"))
          .withColumn('lng2', df.lng2.cast("float"))

          print('Schema after converting "String" to "Float"')
          df.printSchema() #All columns are float now.
          df.show()
          #Function defined by user, to calculate distance between two points on the globe.
          def get_distance(lat_1, lng_1, lat_2, lng_2):
          d_lat = lat_2 - lat_1
          d_lng = lng_2 - lng_1

          temp = (
          math.sin(d_lat / 2) ** 2
          + math.cos(lat_1)
          * math.cos(lat_2)
          * math.sin(d_lng / 2) ** 2
          )
          return 6367.0 * (2 * math.asin(math.sqrt(temp)))

          udf_func = udf(get_distance,FloatType()) #Creating a 'User Defined Function' to calculate distance between two points.
          df = df.withColumn("distance",udf_func(df.lat1, df.lng1, df.lat2, df.lng2)) #Creating column "distance" based on function 'get_distance'
          df.show()


          Output:



          Original Schema - columns imported as "String"
          root
          |-- lat1: string (nullable = true)
          |-- lng1: string (nullable = true)
          |-- lat2: string (nullable = true)
          |-- lng2: string (nullable = true)

          Schema after converting "String" to "Float"
          root
          |-- lat1: float (nullable = true)
          |-- lng1: float (nullable = true)
          |-- lat2: float (nullable = true)
          |-- lng2: float (nullable = true)

          +------+-----+------+------+
          | lat1| lng1| lat2| lng2|
          +------+-----+------+------+
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          +------+-----+------+------+

          +------+-----+------+------+---------+
          | lat1| lng1| lat2| lng2| distance|
          +------+-----+------+------+---------+
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          +------+-----+------+------+---------+


          Code works perfectly now.






          share|improve this answer















          Let me rewrite it, so that people can understand the context. There are 2 steps -



          1.The DataFrame which was orignally created, was having it's columns in String format, so calculations can't be done on that. Therefore, as a first step, we must convert all 4 columns into Float.



          2.Apply UDF on this DataFrame to create a new column distance.



          import math
          from pyspark.sql.functions import udf
          from pyspark.sql.types import FloatType
          df = sqlContext.createDataFrame([('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
          ('-32.92','151.80','-32.89','151.71'),('-32.92','151.80','-32.89','151.71'),
          ('-32.92','151.80','-32.89','151.71'),], ("lat1", "lng1", "lat2","lng2"))
          print('Original Schema - columns imported as "String"')
          df.printSchema() #All colums are Strings.
          # Converting String based numbers into float.
          df = df.withColumn('lat1', df.lat1.cast("float"))
          .withColumn('lng1', df.lng1.cast("float"))
          .withColumn('lat2', df.lat2.cast("float"))
          .withColumn('lng2', df.lng2.cast("float"))

          print('Schema after converting "String" to "Float"')
          df.printSchema() #All columns are float now.
          df.show()
          #Function defined by user, to calculate distance between two points on the globe.
          def get_distance(lat_1, lng_1, lat_2, lng_2):
          d_lat = lat_2 - lat_1
          d_lng = lng_2 - lng_1

          temp = (
          math.sin(d_lat / 2) ** 2
          + math.cos(lat_1)
          * math.cos(lat_2)
          * math.sin(d_lng / 2) ** 2
          )
          return 6367.0 * (2 * math.asin(math.sqrt(temp)))

          udf_func = udf(get_distance,FloatType()) #Creating a 'User Defined Function' to calculate distance between two points.
          df = df.withColumn("distance",udf_func(df.lat1, df.lng1, df.lat2, df.lng2)) #Creating column "distance" based on function 'get_distance'
          df.show()


          Output:



          Original Schema - columns imported as "String"
          root
          |-- lat1: string (nullable = true)
          |-- lng1: string (nullable = true)
          |-- lat2: string (nullable = true)
          |-- lng2: string (nullable = true)

          Schema after converting "String" to "Float"
          root
          |-- lat1: float (nullable = true)
          |-- lng1: float (nullable = true)
          |-- lat2: float (nullable = true)
          |-- lng2: float (nullable = true)

          +------+-----+------+------+
          | lat1| lng1| lat2| lng2|
          +------+-----+------+------+
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          |-32.92|151.8|-32.89|151.71|
          +------+-----+------+------+

          +------+-----+------+------+---------+
          | lat1| lng1| lat2| lng2| distance|
          +------+-----+------+------+---------+
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          |-32.92|151.8|-32.89|151.71|196.45587|
          +------+-----+------+------+---------+


          Code works perfectly now.







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 22 '18 at 9:09

























          answered Nov 22 '18 at 1:24









          cph_stocph_sto

          2,1142421




          2,1142421








          • 1





            While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.

            – Nic3500
            Nov 22 '18 at 3:29











          • Sir, I have made the changes. It should be pretty clear now.

            – cph_sto
            Nov 22 '18 at 8:55











          • Thank you very much for your effort - The problem was only with types of the columns. The string columns must be converted to float.

            – Ali
            Nov 25 '18 at 11:07











          • Well, your problem stands solved. I am happy.

            – cph_sto
            Nov 25 '18 at 11:10














          • 1





            While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.

            – Nic3500
            Nov 22 '18 at 3:29











          • Sir, I have made the changes. It should be pretty clear now.

            – cph_sto
            Nov 22 '18 at 8:55











          • Thank you very much for your effort - The problem was only with types of the columns. The string columns must be converted to float.

            – Ali
            Nov 25 '18 at 11:07











          • Well, your problem stands solved. I am happy.

            – cph_sto
            Nov 25 '18 at 11:10








          1




          1





          While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.

          – Nic3500
          Nov 22 '18 at 3:29





          While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.

          – Nic3500
          Nov 22 '18 at 3:29













          Sir, I have made the changes. It should be pretty clear now.

          – cph_sto
          Nov 22 '18 at 8:55





          Sir, I have made the changes. It should be pretty clear now.

          – cph_sto
          Nov 22 '18 at 8:55













          Thank you very much for your effort - The problem was only with types of the columns. The string columns must be converted to float.

          – Ali
          Nov 25 '18 at 11:07





          Thank you very much for your effort - The problem was only with types of the columns. The string columns must be converted to float.

          – Ali
          Nov 25 '18 at 11:07













          Well, your problem stands solved. I am happy.

          – cph_sto
          Nov 25 '18 at 11:10





          Well, your problem stands solved. I am happy.

          – cph_sto
          Nov 25 '18 at 11:10


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53422473%2fpyspark-dataframe-create-new-column-based-on-function-return-value%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          "Incorrect syntax near the keyword 'ON'. (on update cascade, on delete cascade,)

          Alcedinidae

          Origin of the phrase “under your belt”?