I Flink You Freaky And I like you a lot!

With great powers come great responsibility

The hero of the day is K8S and a not so common add-on — “Metacontroller”.

Metacontroller comes out of Google. It utilizes a K8S object named “CustomResourceDefinition” — in short CRD. CRDs are objects that define new (custom) objects in K8S thus allowing you to customize and extend K8S. Another way to extend K8S is to use API server aggregation — which can be even more powerful yet requires more work.

Metacontroller offers two meta objects — objects that define new objects — DecoratorController which is a great way to add functionality to existing objects (and more…) and CompositeController which defines an object that contains/owns child resources.

To create your own CompositeController you first need to define a CRD. The CRD defines your custom object and how it will appear to users. It must declare the names (plural and singular — casing is important). It also declares the schema of the custom properties using OpenAPI (with some restrictions). It allows you to define how the object will appear in the K8S CLI as well as many other options to control its behavior on K8S. The CRD for our Flink Jobs looks something like this:

A K8S CRD manifest for a FlinkJob

The most important hook is the “sync” hook. Metacontroller calls this hook to check the status of child resources and create/delete when parent object definitions change. At this time it can be a WebHook which means you need to create an HTTP service to handle these calls.

Any HTTP capable service would do here but as we are using K8S the best way to create this service would be using a Deployment/Statefulset with a Service.

We have used a NodeJS ExpressJS server deployed by a Statefulset.

The server code listens to “./sync” calls and responds by sending a JSON object containg the “status” and a list of K8S manifest objects for the childreen resources. The “status” can be anything but you can see how other K8S report status to keep with common practice. This is how the server code looks like:

Skeleton structure for index.js of the controller

As with all K8S manifest scripts Metacontroller uses declarative declarations — this means your sync hook should only respond with the desired state and metacontoller will handle the rest. You don’t need to interrogate the current state. In fact if the same parent spec is given your code should always return exactly the same child resource manifests.

In our case we chose to create a Statefulset to handle each job. We also create a ConfigMap that injects a startup shell script to one of the containers.

When a user creates a FlinkJob object they will define the Flink JobManager URL as well as information on where to find the class to run. To define that they will need to create a Docker image containing a JAR with the compiled code. They will also specify the replicas count.

For a specific FlinkJob we will create a single Statefulset with as many replicas as specified by the FlinkJob spec. This will cause K8S to create a pod per replica. Each of those pods is responsibe for a single Job running on Flink. It needs to start the job when the pod is created, monitor the Job and stop it when the pod is finalized.

To be able to read the JAR containing the code for the job the pod runs the “jar” container — it’s only purpose is to mount the JAR into volume shared with the main “job” container.

The “job” container then starts by running the flink CLI to submit the job. It will then wait until it is interrupted to stop the Job before exiting:

This takes care of starting and stopping jobs. To handle monitoring stopped or crashed jobs we have tried to use K8S builtin mechanism as much as possible.

When we want to make sure a crashed job is restarted all we now need to do is have the pod replaced with a new one by K8S. This is automatically handled by the Statefulset when a pod terminates unexpectedly. To make the pod exit when the job has been terminated we have used a Container Liveness Probe. We defined the liveness probe of the “job” container to use the Flink CLI for checing that the job is live. If it fails to find the job in the running job list it will fail the liveness probe causing the container to report it is dead eventually triggering pod recycling. The code that defines that looks like this:

“job” container spec