How to Run Apache Flink Effectively On YARN?

Apache Flink is a framework to write distributed realtime data processing applications in Java/Scala/Python. Uber, Netflix, Disney and other major companies use Flink for a variety of purposes. It was recently bought by Alibaba in a multi-million dollar deal.

Flink is inspired by Google’s Dataflow model. According to this model, the events are processed as soon as they are received rather than grouping them into a batch. This enables the framework to scale easily and effectively for millions of events per second.

You can explore and read more on Flink on their official website. In this article, we’ll be exploring how Flink runs on top of yarn.

Flink on top of YARN

A Flink application consists of two major unit- one Jobmanager and multiple Taskmanagers. If you are familiar with Apache Spark, Jobmanager and Taskmanagers are equivalent to Driver and Executors.

Jobmanager handles coordination among TaskManagers. It assigns operations to them and distributes the data according to the parallelism. On the other hand, Taskmanagers are the processes on which actual computations happen such as map, reduce, joins etc.

Below is a typical bash command used to run a Flink job on YARN –

./bin/flink run -m yarn-cluster -d -yn 4 -ys 3 -ytm 4096m -yjm 2048m WordCount.jar

In the above command we are telling Flink to start the job on yarn cluster. YARN should assign 4 JVM containers for TaskManager with 4GB memory each and having 3 slots. Slots are analogous to JVM threads but they offer memory isolation. For example, in the above scenario 4GB of memory will be distributed equally to each slot and they can’t access each other’s memory. They don’t offer CPU isolation though.

After you execute the command, following takes place-

  1. Wordcount.jar and all the files from Flink installation lib/ directory are copied to HDFS of the cluster. This enables all the nodes of the cluster to download the files on their local disk.
  2. A request is issued to YARN to start Application Master on one of the node. The Application Master is where the Jobmanager runs.
  3. Application Master requests resources from the YARN Resource Manager.
  4. Once the resources are available Application Master deploys TaskManager JVMs on available nodes of the cluster.
  5. When all Taskmanagers are healthy, JobManager starts assigning subtasks to each slot.

What happens when one of the TaskManager crashes?

Whenever Jobmanager detects an unhealthy TaskManager, it restarts the whole application and resumes processing data from the last state. This is only done when checkpointing is enabled. When disabled, the job shuts down.

The number of restarts are also limited by the property yarn.maximum-failed-containers. It’s default value is equal to the number of requested TaskManagers. This means the application will tolerate only these many failures after which it’ll shutdown. To avoid this you can set the value of config to -1.

What happens when the Jobmanager itself is unhealthy?

If the Jobmanager is unhealthy, Yarn will fail the application.

To avoid this scenario, you can run the Jobmanager in HA (High-availability) mode. In this mode, the JobManager preserves it’s state in HDFS and saves a pointer to that state in zookeeper. On crash, YARN will launch a new Application Master which will deploy the job again and resume from the last state.

To enable HA, you need the add the following configs in flink-conf.yaml.

This config dictates Flink to use zookeeper for coordination when JobManager restarts. Jobmanager will store its current state in storageDir and pointer to that state in zookeeper root path. Yarn can restart JobManager at most yarn.application-attempts-1 times.

The number of restarts of Application Master is also limited by YARN property in yarn-site.xml which is by default 2. You can set this to -1 for infinite attempts as well although not advised.

How to see the logs of the application?

Logs can be seen either by opening the taskmanager.log files on individual nodes of the cluster or by running the command

yarn logs -applicationId appId123

The issue with latter is that logs are only available once the job has ended. 
Flink uses log4j for logging so you can also use KafkaAppender to send logs to Kafka and store them in HDFS/S3 to analyse in realtime. To enable this you can simply add the following to your file inside Flink’s lib folder.

If you want to explore more on this topic, you can refer to the links below:

  1. Official documentation
  2. The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing
  3. Hadoop YARN Tutorial — Learn the Fundamentals of YARN Architecture