Asynchronous Spark Jobs Using Apache Livy – A Primer

At zeotap, a large amount of structured and unstructured data from data partners is transformed and converted to easily queryable, standardized data sets, which are fanned out to a multifariousness of destinations. For this processing, the transformation jobs use Apache Spark every bit the distributed computing framework, with a fair share of them beingness batch processing jobs. Batch processing is defined as a not-continuous processing of data. A lot of the batch processing done at zeotap regularly processes files at a given frequency, with the input data being gathered in between the processing jobs.

 We currently utilize 2 unlike types of systems to launch and manage these Spark jobs.

  • Apache Oozie
  • Apache Livy

An introduction to Oozie, and how nosotros utilize it has been given hither . In this commodity, nosotros will be focussing on Livy, and how nosotros use it at zeotap.

Oozie vs Livy

Oozie is a workflow direction organisation, which allows for launching and scheduling various MapReduce jobs. A workflow is defined as a Directed Acyclic Graph (DAG) of actions (which can include Spark Batch Jobs), which is defined in an XML File.

Livy, on the other hand, is a Remainder interface with a Spark Cluster, which allows for launching, and tracking of individual Spark Jobs, by directly using snippets of Spark code or precompiled jars.

Both these systems can be used to launch and manage Spark Jobs, merely get virtually them in very different manners. Where can nosotros employ Oozie, and where tin we use Livy?

Oozie

Oozie can be used when the processing catamenia involves multiple steps, each of which is dependent on the previous one. This is why a DAG is required to launch Oozie Workflows. A elementary instance of such a dependency would exist of running a script to cheque whether the expected input files are present or not before the actual processing takes place on those input files.

When oozie launches a spark job, information technology first launches an 'oozie-launcher' container on a core node of the cluster, which in plough launches the bodily Spark Job. Oozie uses this oozie-launcher container to track and await for Spark job processing.

Pros:

  • Workflow Direction – Oozie supports coordinator and workflow management. Information technology is a more than generic framework, as compared to Livy (which is just used for launching and managing Spark Jobs specifically), supporting running and scheduling of workflows.
  • Versatile – Oozie supports much more than just Spark jobs, and is used for managing a broad multifariousness of Hadoop jobs, such every bit Sus scrofa, Hive, Sqoop, Distcp to name a few more. This gives Oozie a large multifariousness of use cases.

Cons:

  • Cluster Choke – Oozie launches an oozie-launcher container (Fig. 2), for each task it launches. When multiple jobs are launched concurrently, there is a possibility that all the oozie-launcher are running on the cluster, but none of the Spark Jobs. Now those oozie-launcher containers are waiting for their respective Spark jobs to consummate, but none of the bodily Spark jobs have been launched. This means the cluster is choked with oozie-launchers.
  • Jar Hell – A pro for Oozie, being its versatility in launching multiple types of Hadoop jobs, besides turns out to be a reason for a con. Each type of job requires its own jars and version of libraries. This can crusade library jar and classpath issues, where different incompatible versions of a library can cause failures in jobs, if not properly prepare.
  • The above indicate tin likewise cause migration between/upgrading to newer Spark versions to get a more than involved procedure, as jar version conflicts are leap to ingather up when upgrading versions, which are frequently difficult to resolve.
  • Workflow Maintenance – Demand to write workflows to launch even the simplest of jobs. (Fig. one)

Livy

Livy tin can exist used to launch 'asynchronous' Spark Jobs which are not dependent on, and besides exercise not have any jobs dependent on them. By 'asynchronous', we mean that these jobs can be launched at whatever time, and we practice not need to wait for the response of the job.

Since jobs launched through Livy are asynchronous, they can just be launched by a parent processing workflow, and then tin proceed on its own without waiting for the response about this job. Such jobs would often produce results that are further also asynchronously consumed. Some such examples are:

  1. Data quality reporting jobs, involving stats such as row counts, cavalcade cardinality, etc. would write to a database, which is then further consumed for dashboard generation.
  2. Last jobs of a pipeline, such as jobs to prepare and consign data to an external system.

But, this does not hateful that Livy'southward features cannot be used to Orchestrate workflows. We could possibly wait for a chore'south response, by simply polling the API. Indeed using this, at zeotap, we

have leveraged Livy to launch and monitor spark jobs, and built one function of a decision making, and orchestrating framework, which supports workflow direction, similar to Oozie.

Pros:

  • Simple HTTP interface to launch, interact and track Spark Jobs.
  • Unlikely to crusade any jar issues, as it is pre-bundled with the requisite jars, and just having Spark fix correctly on the cluster is plenty.
  • Easy migration between Spark Versions. If the job runs on the newer Spark Version, Livy will launch it.
  • Do non need to write additional workflow files, and properties to launch jobs.
  • Volition not choke clusters with additional tracking jobs (like 'oozie-launcher' – Fig. 2, 3).

Cons:

  • Less versatile than Oozie for launching jobs supports only Spark jobs.
  • Not a workflow management system. Purely used to launch and track unmarried Spark Jobs. But, a workflow management system can be built around this.

Livy Features

Spark allows launching 2 types of data analysis options:

  1. Interactive: Using spark-crush and pyspark. This launch mode allows for commands to exist submitted at runtime.
  2. Batch: Using spark-submit to launch an application on a cluster, with pre-compiled code, without any interaction at run time.

Livy supports these two modes using the REST interface equally well.

Interactive / Session

In Interactive Fashion (or Session way equally Livy calls information technology), commencement, a Session needs to exist started, using a Mail call to the Livy Server. This volition beginning an Interactive Crush on the cluster for you, similar to if yous logged into the cluster yourself and started a spark-vanquish.

Batch

In batch manner, the lawmaking to exist executed needs to be already pre-written in a file; This could be a python file or compiled jar, which tin can and then be stored on the cluster and used to launch the job.

Create Session:

curl -X POST \

  http:///sessions \

  -H 'Cache-Control: no-cache' \

  -H 'Content-Type: application/json' \

  -d '{

"driverMemory" : "1g",

"executorMemory" : "1g",

"numExecutors" : 2,

"proper noun" : "livy",

"conf" : {

"spark.test.conf" : "conf1"

}

}'

Response:

{

"id": 0,

"appId": goose egg,

"owner": null,

"proxyUser": cypher,

"land": "starting",

"kind": "shared",

"appInfo": {

"driverLogUrl": cypher,

"sparkUiUrl": null

},

"log": [

"stdout: ",

"\nstderr: ",

"\nYARN Diagnostics: "

]

}

In both Session and Batch mode, on launching the Spark job, Livy will reply with an Integer ID which tin be used for tracking the state of the Session/Batch.

Cheque Session Country:

curl -X Get \

  http:///sessions/1/land

Response:

{

"id": one,

"state": "idle"

}

Setting Upwardly Livy

Setting upwards Livy on a Hadoop Cluster is fairly straightforward. The prerequisites to run Livy are that the SPARK_HOME and HADOOP_CONF_DIR environment variables demand to be ready on the Primary Node.

If these are set, then out of the box, subsequently extracting the Livy parcel to the installation folder, we tin run it direct, using :

./bin/livy-server start

Configuration

By default, Livy uses configuration files nowadays in the conf directory in the install directory. This can be changed by setting LIVY_CONF_DIR environment variables before running.

The configurations mentioned below need to exist added to a file livy.conf in the configuration directory.

livy.spark.master:  What spark principal Livy sessions should utilize.

By default, it is set to local. Ready it to yarn to launch using Hadoop YARN Resource Managing director

livy.spark.deploy-mode : What spark deploy way Livy sessions should utilize.

It is strongly recommended to use cluster deploy mode when using Livy with YARN. This ensures that the host running the Livy server doesn't become overloaded when multiple sessions/jobs are running.

livy.server.yarn.app-lookup-timeout : How long Livy will wait and try to launch a session/chore. If the cluster is busy, Livy might non be able to launch the chore inside this time limit. Increase this for a larger tolerance of a busy cluster.

livy.server.recovery.style:  Recovery mode of Livy. Possible values:

off: Default. Turn off recovery. Every time Livy shuts down, information technology stops and forgets all sessions.

recovery: Livy persists session info to the state shop. When Livy restarts, it recovers previous sessions from the state store.

It is strongly recommended to fix this to recovery. Past default, it is in off mode. If set to off, if and whenever Livy shuts downwards / crashes, all sessions and jobs launched by Livy are forgotten. Furthermore, if any session is currently running, Livy will shut information technology down likewise.

If ready to recovery, ensure that the post-obit two configs are also setup:

livy.server.recovery.land-store : Where Livy should store the state for recovery. Possible values:

: Default. State store disabled.

filesystem: Store state on a file system.

zookeeper: Shop country in a Zookeeper instance.

livy.server.recovery.country-store.url : For filesystem state store, the path of the state store directory. eastward.g. file:///tmp/livy or hdfs:///.

For zookeeper, the accost to the Zookeeper servers. e.g. host1:port1,host2:port2

At zeotap, nosotros use filesystem state store on the local organization of the master. Livy does not go down very often, and fifty-fifty if it does, storing the country on the local filesystem proves to be reliable enough for the utilize cases.

The above configurations should be enough, to allow for a stable Livy usage environs. The post-obit configurations are required simply if querying Livy and updating of the land of Spark Jobs using the API is required.

livy.server.session.state-retain.sec:  How long a finished state of a session should be retained by Livy to exist available for querying. By default, it is 10 minutes. Applies to both batch and sessions.

livy.server.session.timeout:  This is only for sessions (and not batches launched) on Livy. This is the time on how long Livy will wait before timing out an idle session. By default, 1 hr.

Note: If you are planning to utilize Apache Livy on newer versions of Amazon EMR, it is already arranged, and only configurations need to be fix.

Job Tracking using Livy – Caveats

A few things to note about querying Apache Livy to track task states :

  • The UI currently will show only 100 oldest jobs/sessions within the state-retain time out. The other jobs volition not be visible on the UI simply tin exist queried through the Residuum API.
  • Livy does chore tracking through integer job IDs. This could lead to mix-ups if Livy shuts downward, and restarts without recovery being ready, every bit the chore IDs reset to 0 again.
  • Livy logs which it returns from its APIs are usually not useful for debugging. Very limited data is bachelor, normally just the tail end of the logs from the task. Debugging will often involve looking into the yarn awarding logs instead.

Apache Livy and Oozie are just 2 of the elective technologies used to build the data platform at zeotap. Leveraging these two, and others, nosotros have built a code-driven workflow direction system used across multiple products here. A blog almost this control plane framework will follow shortly!