Running PySpark as a Spark standalone job

This example runs a minimal Spark script that imports PySpark, initializes a SparkContext and performs a distributed calculation on a Spark cluster in standalone mode.

Who is this for?

This example is for users of a Spark cluster that has been configured in standalone mode who wish to run a PySpark job.

Before you start

Download the spark-basic.py example script to the cluster node where you submit Spark jobs.

You need Spark running with the standalone scheduler. You can install Spark using an enterprise Hadoop distribution such as Cloudera CDH or Hortonworks HDP. Some additional configuration might be necessary to use Spark in standalone mode.

Modifying the script

After downloading the spark-basic.py example script, open the file in a text editor on your cluster. Replace HEAD_NODE_HOSTNAME with the hostname or IP address of the Spark master as defined in your Hadoop configuration.

# spark-basic.py
from pyspark import SparkConf
from pyspark import SparkContext

conf = SparkConf()
conf.setMaster('spark://HEAD_NODE_HOSTNAME:7077')
conf.setAppName('spark-basic')
sc = SparkContext(conf=conf)

def mod(x):
    import numpy as np
    return (x, np.mod(x, 2))

rdd = sc.parallelize(range(1000)).map(mod).take(10)
print rdd

Examine the contents of the spark-basic.py example script.

The first code block contains imports from PySpark.

The second code block initializes the SparkContext and sets the application name.

The third code block contains the analysis code that uses the NumPy package to calculate the modulus of a range of numbers up to 1000, then returns and prints the first 10 results.

The fourth code block runs the calculation on the Spark cluster and prints the results. The code uses the NumPy library from Anaconda on each Spark worker.

NOTE: You may need to install NumPy on the cluster nodes using adam scale -n cluster conda install numpy.

Running the job

Run the script by submitting it to your cluster for execution using spark-submit or by running this command:

$ python spark-basic.py

The output from the above command shows the first 10 values returned from the spark-basic.py script:

16/05/05 22:26:53 INFO spark.SparkContext: Running Spark version 1.6.0

[...]

16/05/05 22:27:03 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 3242 bytes)
16/05/05 22:27:04 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:46587 (size: 2.6 KB, free: 530.3 MB)
16/05/05 22:27:04 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 652 ms on localhost (1/1)
16/05/05 22:27:04 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/05/05 22:27:04 INFO scheduler.DAGScheduler: ResultStage 0 (runJob at PythonRDD.scala:393) finished in 4.558 s
16/05/05 22:27:04 INFO scheduler.DAGScheduler: Job 0 finished: runJob at PythonRDD.scala:393, took 4.951328 s
[(0, 0), (1, 1), (2, 0), (3, 1), (4, 0), (5, 1), (6, 0), (7, 1), (8, 0), (9, 1)]

Troubleshooting

If something goes wrong, see Help and support.

Further information

See the Spark and PySpark documentation: