Apache spark examples java

Overview

At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

This guide shows each of these features in each of Spark’s supported languages. It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.

Linking with Spark

Spark 2.0.0-preview is built and distributed to work with Scala 2.11 by default. (Spark can be built to work with other versions of Scala, too.) To write applications in Scala, you will need to use a compatible Scala version (e.g. 2.11.X).

To write a Spark application, you need to add a Maven dependency on Spark. Spark is available through Maven Central at:

groupId = org.apache.spark artifactId = spark-core_2.11 version = 2.0.0-preview 

In addition, if you wish to access an HDFS cluster, you need to add a dependency on hadoop-client for your version of HDFS.

groupId = org.apache.hadoop artifactId = hadoop-client version =

Finally, you need to import some Spark classes into your program. Add the following lines:

Читайте также:  Php utf 8 class

Accumulators in the Spark UI

While this code used the built-in support for accumulators of type Int, programmers can also create their own types by subclassing AccumulatorParam. The AccumulatorParam interface has two methods: zero for providing a “zero value” for your data type, and addInPlace for adding two values together. For example, supposing we had a Vector class representing mathematical vectors, we could write:

object VectorAccumulatorParam extends AccumulatorParam[Vector]  def zero(initialValue: Vector): Vector =  Vector.zeros(initialValue.size) > def addInPlace(v1: Vector, v2: Vector): Vector =  v1 += v2 > > // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(. ))(VectorAccumulatorParam)

In Scala, Spark also supports the more general Accumulable interface to accumulate data where the resulting type is not the same as the elements added (e.g. build a list by collecting together elements), and the SparkContext.accumulableCollection method for accumulating common Scala collection types.

AccumulatorInteger> accum = sc.accumulator(0); sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x)); // . // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s accum.value(); // returns 10

While this code used the built-in support for accumulators of type Integer, programmers can also create their own types by subclassing AccumulatorParam. The AccumulatorParam interface has two methods: zero for providing a “zero value” for your data type, and addInPlace for adding two values together. For example, supposing we had a Vector class representing mathematical vectors, we could write:

class VectorAccumulatorParam implements AccumulatorParamVector>  public Vector zero(Vector initialValue)  return Vector.zeros(initialValue.size()); > public Vector addInPlace(Vector v1, Vector v2)  v1.addInPlace(v2); return v1; > > // Then, create an Accumulator of this type: AccumulatorVector> vecAccum = sc.accumulator(new Vector(. ), new VectorAccumulatorParam());

In Java, Spark also supports the more general Accumulable interface to accumulate data where the resulting type is not the same as the elements added (e.g. build a list by collecting together elements).

>>> accum = sc.accumulator(0) Accumulatorid=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) . 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value 10

While this code used the built-in support for accumulators of type Int, programmers can also create their own types by subclassing AccumulatorParam. The AccumulatorParam interface has two methods: zero for providing a “zero value” for your data type, and addInPlace for adding two values together. For example, supposing we had a Vector class representing mathematical vectors, we could write:

class VectorAccumulatorParam(AccumulatorParam): def zero(self, initialValue): return Vector.zeros(initialValue.size) def addInPlace(self, v1, v2): v1 += v2 return v1 # Then, create an Accumulator of this type: vecAccum = sc.accumulator(Vector(. ), VectorAccumulatorParam())

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map() . The below code fragment demonstrates this property:

val accum = sc.accumulator(0) data.map  x => accum += x; x > // Here, accum is still 0 because no actions have caused the map operation to be computed.
AccumulatorInteger> accum = sc.accumulator(0); data.map(x ->  accum.add(x); return f(x); >); // Here, accum is still 0 because no actions have caused the `map` to be computed.
accum = sc.accumulator(0) def g(x): accum.add(x) return f(x) data.map(g) # Here, accum is still 0 because no actions have caused the `map` to be computed.

Deploying to a Cluster

The application submission guide describes how to submit applications to a cluster. In short, once you package your application into a JAR (for Java/Scala) or a set of .py or .zip files (for Python), the bin/spark-submit script lets you submit it to any supported cluster manager.

Launching Spark jobs from Java / Scala

The org.apache.spark.launcher package provides classes for launching Spark jobs as child processes using a simple Java API.

Unit Testing

Spark is friendly to unit testing with any popular unit test framework. Simply create a SparkContext in your test with the master URL set to local , run your operations, and then call SparkContext.stop() to tear it down. Make sure you stop the context within a finally block or the test framework’s tearDown method, as Spark does not support two contexts running concurrently in the same program.

Migrating from pre-1.0 Versions of Spark

Spark 1.0 freezes the API of Spark Core for the 1.X series, in that any API available today that is not marked “experimental” or “developer API” will be supported in future versions. The only change for Scala users is that the grouping operations, e.g. groupByKey , cogroup and join , have changed from returning (Key, Seq[Value]) pairs to (Key, Iterable[Value]) .

Spark 1.0 freezes the API of Spark Core for the 1.X series, in that any API available today that is not marked “experimental” or “developer API” will be supported in future versions. Several changes were made to the Java API:

  • The Function classes in org.apache.spark.api.java.function became interfaces in 1.0, meaning that old code that extends Function should implement Function instead.
  • New variants of the map transformations, like mapToPair and mapToDouble , were added to create RDDs of special data types.
  • Grouping operations like groupByKey , cogroup and join have changed from returning (Key, List) pairs to (Key, Iterable) .

Spark 1.0 freezes the API of Spark Core for the 1.X series, in that any API available today that is not marked “experimental” or “developer API” will be supported in future versions. The only change for Python users is that the grouping operations, e.g. groupByKey , cogroup and join , have changed from returning (key, list of values) pairs to (key, iterable of values).

Migration guides are also available for Spark Streaming, MLlib and GraphX.

Where to Go from Here

You can see some example Spark programs on the Spark website. In addition, Spark includes several samples in the examples directory (Scala, Java, Python, R). You can run Java and Scala examples by passing the class name to Spark’s bin/run-example script; for instance:

For Python examples, use spark-submit instead:

./bin/spark-submit examples/src/main/python/pi.py 

For R examples, use spark-submit instead:

./bin/spark-submit examples/src/main/r/dataframe.R 

For help on optimizing your programs, the configuration and tuning guides provide information on best practices. They are especially important for making sure that your data is stored in memory in an efficient format. For help on deploying, the cluster mode overview describes the components involved in distributed operation and supported cluster managers.

Finally, full API documentation is available in Scala, Java, Python and R.

Источник

Оцените статью