Pages

Saturday 6 August 2016

Reactive Micro Services with Kafka

In this post I would like to share the design behind the project:

https://github.com/patricknoir/reactive-system

The basic concept is to create server applications able to expose services using the Request/Response pattern based on message exchange rather than synchronous HTTP calls.

Using messages exchange rather than synchronous communications enable components to be less coupled and open to the possibility to implement Resiliency/Elasticity according to the Reactive Principles.

The idea is quite simple:

In order to create a server application able to expose services using message exchange through Kafka-Reactive-Service library you need to create a Reactive System.

Reactive System


Reactive System is nothing more than a wrapper of a simple Akka Stream runnable graph:


The Kafka-Reactive-Library provides a basic implementation for a 
  • Source[KafkaRequestEnvelope, _] 
  • ReactiveRoute
  • Sink[Future[KafkaResponseEnvelope], _]

The most important component in the graph is the ReactiveRoute. The Route is simply a map: from service URL to service implementation:

ReactiveRoute(services: Map[String, ReactiveService[_, _]])




Reactive Service


case class ReactiveService[-In: ReactiveDeserializer, +Out: ReactiveSerializer](id: String)(f: In => Future[Error Xor Out]) {
  ...
}

The Reactive Service is a wrapper to a Function1[-In, +Out] where the In type is Deserializable and Out is Serializable.

Reactive Route


In order to create a ReactiveRoute the library offers a DSL which resemble the Akka HTTP one:


import ReactiveRoute._
val route: ReactiveRoute = request.aSync("echo") { (in: String) => 
    s"echoing: $in" 
  } ~
  request.aSync[String, Int]("size") { in =>
    in.length
  } ~
  request.sync[String, String]("reverse") { in => in.reverse }


From the request.aSync/sync methods you can call anything you need in order to implement your business logic, exactly the same as you would do with Akka HTTP route. 
The request.aSync express a function that will be executed asynchronously while request.sync will be on the same thread used by the ReactiveRoute to resolve the service. 

Note: Is totally legitim that a request.sync/aSync can return a Xor[Error, A] (Xor from typelevel Cats library) where A is the return type of your function ( request.sync[_, A]). 

Ok so now we just need to put all the peaces together and create our server application:

import org.patricknoir.kafka.reactive.server.ReactiveRoute._

  implicit val system: ActorSystem = ...
  import system.dispatcher

  val source: Source[KafkaRequestEnvelope, _] = 
    ReactiveKafkaSource.create(
      "simpleServiceInbound", 
      Set("localhost:9092"), 
      "client1", "group1")
  val route = request.sync[String, String]("echo") { in =>
      "echoing: " + in
    } ~ request.aSync[String, Int]("length") { in =>
      in.length
    } ~ request[String, Int]("parseInt") { in => Future {
        Xor.fromTry(in.toInt)
      }
    }

  val sink: Sink[Future[KafkaResponseEnvelope], _] = 
    ReactiveKafkaSink.create(Set("localhost:9092"))

  val reactiveSys = ReactiveSystem(source, route, sink)

  reactiveSys.run()

and voila', we have now a server running and exposing echo/length/parseInt services through Kafka message exchanges. On the next post we will explore the client side of the library, how we can consume those services.