Akka Streams in Java Spring Boot!

Posted on Apr 12, 2020

image

Streaming data from a Source to Sink is a very trivial task in today’s data processing and data pipelining systems. Ergo, there are many streaming solutions out there like: Kafka Stream, Spark Streaming, Apache Flink etc.

All of them in one way or another either need an infrastructure to be setup to be able to fully take advantage of them (e.g., HDFS, Spark cluster, Kafka streaming setup etc.) or we need some kind of orchestration among the streaming jobs (e.g., Apache Airflow).

Akka Streams

Akka streams stands out in this battle and have this advantage of being totally application driven. Akka stream is build on top of the Akka’s celebrated Actor model (which in fact is inspired from Erlang’s actor model). Hence, Akka streams can leverage its battle tested resiliency, elastic, event-driven and responsive (see reactive manifesto) capabilities.

Problems with Akka:

  1. Java developer community has been staying away from the “made-for and built-on scala” Akka platform.
  2. Not much documentation and support for the most popular Java framework, “Spring”.

I’m here to tell you otherwise! Inspite of lack of resources available on the internet, we can in-fact do akka-streams in Java and do it with ease.

In this post we will build an Akka stream application in Java and with Spring Boot! And we will analyse the out of the box benefits we can get using Akka streams. So, let’s ge started…

Problem Statement

We need a simple realtime stream which consumes all the updates published on a Kafka topic and persists the events in a SQL server database after parsing. And we only want to commit the Kafka offset after the record has been inserted into the database.

image Lets start a fresh Spring boot project from Spring Initialzr.

Spring Initializr

We will make the project organised like the tree below which is a pretty standard maven directory structure (since we have chosen to use maven here). You could go with gradle also if you like.

.
├── pom.xml
└── src
    ├── main
    │   ├── java
    │   │   └── com
    │   │       └── lprakashv
    │   │           └── springalpakka
    │   │               ├── SpringCommandLineApplication.java
    │   │               ├── configs
    │   │               │   ├── AkkaConfig.java
    │   │               │   └── StreamConfig.java
    │   │               ├── dtos
    │   │               │   └── Event.java
    │   │               ├── services
    │   │               │   └── StreamService.java
    │   │               └── utils
    │   │                   └── StreamUtils.java
    │   └── resources
    │       ├── application.yml
    │       └── stream.conf
    └── test
        └── java
            └── com
                └── lprakashv
                    └── springalpakka

Don’t bother about the code files in the tree, we will shortly talk about those.

Add all the required dependencies:

image

Setup Foundational Akka configurations:

In every Akka/Akka-Stream application the very basic components needed are the Akka’s ActorSystem and Materializer. These are needed for a lot of things in this eco-system like, spawning actors, creating stream components, running streams, materializing streams etc.

In the above code, we made sure:

  1. We have only one instance beans of both ActorSystem and Materializer throughout our application.
  2. Will be instantiated only if akka.stream.javadsl.Source is in scope.

Our Kafka Event DTO to consume:

Let’s write our Source (Kafka): We would like to commit the offsets later hence using a committableSource. We will create a Bean out of our committable source to be autowired in our service classes.

Now let’s write our Flow (Slick): We could have cerated a Sink if we were to not bother about persist result and commit Kafka offset anyway (using a “plainSource”). We are using flow instead of sink because we want to propagate the committable-offset even after the database stage. We will add the following code in our same config class.

You might be wondering what is in the stream.conf file and where did that committableMesssageToDTO and insertEventQuery came from?

Slick needs a configuration to create a session which will execute all our DB queries. This config needs to follow the below structure in a .conf file (which is the standard way to access configs in the typesafe/lightbend world).

event-sqlserver {
  profile = "slick.jdbc.SQLServerProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties {
      driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      url = "your-db-url!"
      user = "your-db-user!"
      password = "your-db-pass"
    }
  }
}

While, the committableMesssageToDTO and insertEventQuery are functions which will convert our CommittableMessage to DTO (our record class) and then to SQL insert query.

We can write a Utils class with static functions to generate the SQL like:

Now, let’s piece them together and build a stream:

This is it! We can now invoke the startKafkaToDatabaseStream() method from anywhere and it will do our job.

There were a lot of highlighting features there in a single “terse” chain! Let me explain each:

  1. .buffer(size, overflowStrategy): This will add a fixed-size buffer into our “flow” which will protect the downstream system from a faster upstream source. We can use the strategies for discarding the messages if the size is full: backpressure — will down the consumption, drophead/droptail/dropbuffer — will drop the messages and won’t backpressure and fail — fails the stream on buffer full.
  2. .idleTimeout(duration): It will throw a java.util.concurrent.TimeoutException on idle timeout duration which can be handled using recover/recoverWith.
  3. .recoverWith(ThrowableClass, fallbackSourceConsumer): Whenever a ThrowableClass.class exception is intercepted, the original source is replaced by a Source received from the fallbackSourceConsumer.
  4. .throttle(elements, per, maximumburst, mode): Sends elements downstream with a speed limited to (elements/per), mode=Shaping — makes pauses before emitting to meet the throttle rate, mode=Enforcing — fails with exception when upstream is faster than the throttle rate.
  5. .mapAsync(parallelism, () -< CompletionStage(value)): To process the stage in asynchronous mode with parallelism defined.
  6. CommitterSettings.create(actorSystem): Creates a default committer settings.

The DrainingControl can be used as:

There are a lot of other features of akka stream which were not in scope of this post but still worth to explore. Please checkout akka-stream and alpakka’s documentations.

Conclusion

We saw that we could easily integrate Akka streams into our Spring Boot projects and leverage Spring’s dependency injection to manage our Akka/Stream beans easily.

There are a lot of streaming features like back-pressure and throttling to name a few which would easily take up a lot of developer hours and brains. And yet, we would have missed some corner cases if not tested thoroughly. Those things we got out-of-the-box in akka-stream’s toolkit.

All in all, Akka-streams along with Alpakka are great tools to have in a data platform stack.

Thanks for reading!