Pages

Saturday, 6 August 2016

Reactive Micro Services with Kafka

In this post I would like to share the design behind the project:

https://github.com/patricknoir/reactive-system

The basic concept is to create server applications able to expose services using the Request/Response pattern based on message exchange rather than synchronous HTTP calls.

Using messages exchange rather than synchronous communications enable components to be less coupled and open to the possibility to implement Resiliency/Elasticity according to the Reactive Principles.

The idea is quite simple:

In order to create a server application able to expose services using message exchange through Kafka-Reactive-Service library you need to create a Reactive System.

Reactive System


Reactive System is nothing more than a wrapper of a simple Akka Stream runnable graph:


The Kafka-Reactive-Library provides a basic implementation for a 
  • Source[KafkaRequestEnvelope, _] 
  • ReactiveRoute
  • Sink[Future[KafkaResponseEnvelope], _]

The most important component in the graph is the ReactiveRoute. The Route is simply a map: from service URL to service implementation:

ReactiveRoute(services: Map[String, ReactiveService[_, _]])




Reactive Service


case class ReactiveService[-In: ReactiveDeserializer, +Out: ReactiveSerializer](id: String)(f: In => Future[Error Xor Out]) {
  ...
}

The Reactive Service is a wrapper to a Function1[-In, +Out] where the In type is Deserializable and Out is Serializable.

Reactive Route


In order to create a ReactiveRoute the library offers a DSL which resemble the Akka HTTP one:


import ReactiveRoute._
val route: ReactiveRoute = request.aSync("echo") { (in: String) => 
    s"echoing: $in" 
  } ~
  request.aSync[String, Int]("size") { in =>
    in.length
  } ~
  request.sync[String, String]("reverse") { in => in.reverse }


From the request.aSync/sync methods you can call anything you need in order to implement your business logic, exactly the same as you would do with Akka HTTP route. 
The request.aSync express a function that will be executed asynchronously while request.sync will be on the same thread used by the ReactiveRoute to resolve the service. 

Note: Is totally legitim that a request.sync/aSync can return a Xor[Error, A] (Xor from typelevel Cats library) where A is the return type of your function ( request.sync[_, A]). 

Ok so now we just need to put all the peaces together and create our server application:

import org.patricknoir.kafka.reactive.server.ReactiveRoute._

  implicit val system: ActorSystem = ...
  import system.dispatcher

  val source: Source[KafkaRequestEnvelope, _] = 
    ReactiveKafkaSource.create(
      "simpleServiceInbound", 
      Set("localhost:9092"), 
      "client1", "group1")
  val route = request.sync[String, String]("echo") { in =>
      "echoing: " + in
    } ~ request.aSync[String, Int]("length") { in =>
      in.length
    } ~ request[String, Int]("parseInt") { in => Future {
        Xor.fromTry(in.toInt)
      }
    }

  val sink: Sink[Future[KafkaResponseEnvelope], _] = 
    ReactiveKafkaSink.create(Set("localhost:9092"))

  val reactiveSys = ReactiveSystem(source, route, sink)

  reactiveSys.run()

and voila', we have now a server running and exposing echo/length/parseInt services through Kafka message exchanges. On the next post we will explore the client side of the library, how we can consume those services.



Sunday, 18 October 2015

Typeclass in Scala and Haskell

In this post I would like to present an example of Typeclass in Scala and compare it with the equivalent Haskell syntax.

The purpose of this article is to prove that Scala inherits lots of the elements and syntax from FP languages such as Haskell and is not just a Java++ with a nicer lambda expressions.

In this exercise we would like to implement a simple method:

Scala

def totalPrice[P:Priceable](ps:List[P]):Double // Scala

Haskell

totalPrice :: Priceable p => [p] -> Double -- Haskell

In order to compile the method we need to give a definition for Priceable. We want Priceable to be a Typeclass, all the priceable entities must provide a method to extract the price as Double.

Scala:

trait Priceable[-A] {
  def extractPrice(a:A):Double
}

Haskell:

class Priceable a where
  extractPrice :: a -> Double


Now that we have  defined the Typeclass we can go back to our original method and provide the implementation:

Scala

package object exercise {
  def totalPrice[P:Priceable](ps:List[P]):Double = 
              ps.map(implicitly[Priceable[P]].extractPrice).reduce(_+_)
}

Haskell

totalPrice ps = foldr1 (+) (map extractPrice ps)

Finally we can create our entity Book:

Scala

case class Book(title:String, author:String, price:Double)

Haskell

data Book = Book { title::String, author::String, price::Double }

and make it member of Priceable:

Scala


object Book {
  implicit val bookPriceable = new Priceable[Book] {
    def extractPrice(book:Book):Double = book.price
  }
}

Haskell


instance Priceable Book where
  extractPrice book = price book


Now lets create few books and use the priceTotal function:

Scala


val book1 = Book("The Fellowship of the Ring", "J. R. R. Tolkien", 31.45)
val book2 = Book("The Da Vinci Code", "Dan Brown", 22.55)
val book3 = Book("Timeline", "Michael Crichton", 21.50)

val books = List(book1, book2, book3)

totalPrice(books) // res0 = 75.50

Haskell


book1 = Book "The Fellowship of the Ring" "J. R. R. Tolkien" 31.45
book2 = Book "The Da Vinci Code" "Dan Brown" 22.55
book3 = Book "Timeline" "Michael Crichton" 21.50

books = [book1, book2, book3]

totalPrice books -- res0 = 75.50


In Scala we can also wrap the List[P:Priceable] into an implicit class to have a more (.) oriented notation:


implicit class PriceableOp[P:Priceable](ps:List[P]) {
  lazy val totalPrice = exercise.totalPrice(ps)
}


So now in is possible to write:


val book1 = Book("The Fellowship of the Ring", "J. R. R. Tolkien", 31.45)
val book2 = Book("The Da Vinci Code", "Dan Brown", 22.55)
val book3 = Book("Timeline", "Michael Crichton", 21.50)

val books = List(book1, book2, book3)

val result = books.totalPrice

If you want to learn more about the usage of implicit in Scala, here is a post of method extension strategy via implicit:

http://patricknoir.blogspot.com.es/2014/12/method-extension-in-scala-with-implicit.html

Wednesday, 14 October 2015

Big Data in Action

I recentrly came back from San Francisco where I have presented the project I'm currently leading. The name of the platform is Omnia and is designed to be a Distributed, Reactive Data platform.


 Here is webinar I made back in June/15 for Typesafe:



Note: we are hirihng :-) so if you look for a job contact me.

Wednesday, 17 December 2014

The power of Lambda-Type in Higher kinded types

In the previous posts we have explored the type classes, we have defined the Semigroup type class in this post and then we have exploited the State Monad.

I would like to start this post from the definition we have done on the State Monad:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
  trait State[S, A] {
    def apply(s:S): (A, S)
    def map[B](f:A=>B): State[S, B] = State { s =>
        val (a, s2) = this(s)
        (f(a), s2)
    }
    def flatMap[B](f:A=>State[S, B]): State[S, B] = State { s=>
      val (a, s2) = this(s)
      f(a)(s2)
    }
  }
  
  object State {
    def apply[S, A](f: S=>(A, S)) = new State[S, A] {
      def apply(s:S) = f(s)
    } 
  }

Without talking about Category Theory lets understand what make the above trait a Monad.

Monad is essentially a container of other types and it defines some transformation methods between different contained types.
Because this can get quite confusing by using words lets use code to explain it:

1
2
3
4
  trait Monad[M[A], A] {
    def map[B](f: A=>B): M[B]
    def flatMap[B](f: A=>M[B]): M[B]
  }

Quite interesting... Monad is nothing else that a trait which defines map and flatMap across container types (If you want to dig down to the math behind that then Category Theory is what you need to look for).

I would like now to re-define State as a member of Monad, something like: Monad[State[S, A], A] ...

Lets try it:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
  trait Monad[M[A], A] {
    def map[B](f: A=>B): M[B]
    def flatMap[B](f: A=>M[B]): M[B]
  }
  
  trait State[S, A] {
    def apply(s:S): (A, S)
  }
  
  object State {
    def apply[S, A](f: S=>(A, S)) = new State[S, A] {
      def apply(s:S) = f(s)
    } 
  }

Ok so now we have moved map and flatMap methods away from State and created a new trait for Monad. We want now State to be a member of Monad so it can inherit map and flatMap.
Ideally we would like to do something like that:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  object Monad {
    implicit def stateMonad[S, A](sm:State[S, A]) = new Monad[State[S, A], A] {
      def map[B](f:A=>B) = State[S, B] { s => 
        val(a, s2) = sm(s)
        (f(a), s2)
      }
      
      def flatMap[B](f: A=>State[S,B]) = State[S, B] { s =>
       val (a, s2) = sm(s)
       f(a)(s2)
      }
    }
  }

The sad part of the above code is that Line 2 will NOT COMPILE!

In fact the Monad trait accept a type M[_] and not M[_ , _] !!!!

Obviously I can go and change the Monad trait definition but then it will not work for container with single type and so on....

We need to do some magic and transform State[S, A] in something with only one type parameter but without loosing in "Generalisation".

That's where we use Lambda-types. Have a look on the below code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  object Monad {
    implicit def stateMonad[S, A](sm:State[S, A]) = new Monad[({type λ[A]=State[S,A]})#λ, A] {
      def map[B](f:A=>B) = State[S, B] { s => 
        val(a, s2) = sm(s)
        (f(a), s2)
      }
      
      def flatMap[B](f: A=>State[S,B]) = State[S, B] { s =>
       val (a, s2) = sm(s)
       f(a)(s2)
      }
    }
  }

I know what you are thinking right now... WTF.... At least that was my first reaction when I have seen Lambda-types for the first time.

What's going on line 2 ?
I had to do 2 things:

  1. Transform State[S,  A] into a type wit single parameter: Î»[A]
  2. Don't loose the constraint of type S, so using a sort of closure on the type definition.
So what we have said is that we want Î»[A] to be an alias for State[S, A] and because Î»[A] is of type M[_] the compiler is not complaining. Note that I couldn't define the type Î» in another expression because the definition of Î» is using the type S, so if you remember our talk about closure on function parameters here we see the same trick but used on types, isn't that cool?

Saturday, 13 December 2014

Demistify the State Monad with Scala 2/2

In the previous post we have explored how to define a Stack data structure and we have defined methods to pull and pop from that. All the methods accept the current stack and return a tuple containing the result of the operation and the new stack. We defined that kind of signature:

type State[S, A] = S => (A, S)

So here is where we are:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package fp

object StackApp extends App {

  type Stack[A] = List[A]
  type State[S, A] = S => (A, S)
  
  def push[A](a:A) : State[Stack[A], Unit] = stack => ((), a :: stack)
  
  def pop[A]: State[Stack[A], Option[A]] = {
    case a :: tail => (Some(a), tail)
    case Nil       => (None, Nil)
  }
  
  def popPairs[A]: State[Stack[A],(Option[A], Option[A])] = stack => {
    val (opt1, stack1) = pop(stack)
    val (opt2, stack2) = pop(stack1)
    ((opt1, opt2), stack2)
  }
}

Now that we have defined the State our target is to avoid the expressions inside popPairs, where we need carefully to pass the right instance of stack on each expression.

In order to do that we should define methods to compose different state together, for this reason we are going to introduce the 2 key methods: map and flatMap.


def map[S, A, B](sa:State[S, A])(f:A=>B): State[S, B] = ???
def flatMap[S, A, B](sa:State[S, A])(f:A=>State[S, B]): State[S, B] = ???

Ok lets see how we can implement these methods.
Lets start with map. The first thing we have to consider is the return type, has to be a State[S, B], so in essence has to be a function of type:
Given an initial state I have to return a tuple of kind (b, newState)


def map[S, A, B](sa:State[S, A])(f:A=>B): State[S, B] = state => {
  ...
  (b, newState)
}

Lets look the full implementation and comment it:


1
2
3
4
def map[S, A, B](sa:State[S, A])(f:A=>B): State[S, B] = state => {
    val (a, newState) = sa(state)
    (f(a), newState)
  }

We are simply applying the input state to sa, the result of this operation is in line 2 a tuple with result a:A and a newState:S.
We can simply apply f:A=>B to a and return the new tuple (f(a), newState).
The result is a function that take a state as input and return a tuple (b:B, newState:S), respecting the State[S, B] signature.
Note:Is interesting the high level of abstraction we are using in this function. Map is an higher level function because we are not reasoning on the possible implementations and it turns to be more like a game where we are trying to match the types. Is surprising that there are no many possible implementations of map, the type signature is restricting the possible implementations.
Ok lets move now on flatMap. This method is very important for "composibility".
The only difference on the signature compared to map is that the function f is transforming directly into the final type.
Why is this so important ?
Well lets assume we have only map method, and we call map inside another map:


1
2
3
4
5
6
7
map(dummyState) { stack1 => 
    ...
    map(dummyState2) {
      ...
      (b, finalState)
    }
  }

Well by composing 2 map methods one inside the other the final return type will be:

State[S, State[S, B]]

which is not exactly what we wanted. The idea is to flatten the inner State[S, B] to have just:
State[S, B] as final result.

In order to do that we need a new function able to do a map operation and then flatten it, for this reason we call it flatMap:


1
2
3
4
def flatMap[S, A, B](sa:State[S, A])(f:A=>State[S, B]): State[S, B] = state => {
    val (a, newState) = sa(state)
    f(a)(newState)
  }

Here we go, as per map we apply the state to sa, then we take the value a and we pass it to the function f.
This time f will return a new State[S, B], which is not good as return type because we need to return the tuple (b:B, s:State),  but because f(a) : State[S, B] we just can apply the newState:S to get our desired result in line 3.

Now that we have flatMap and map we can use them in the popPairs:


def popPairs[A]: State[Stack[A],(Option[A], Option[A])] =
    flatMap(pop[A]) ( opt1 => map(pop[A]) ( opt2 => (opt1, opt2) ) )

Here we go, with flatMap and map we are manipulating the State object obtained  from the pop call but now we don't have to bother with the states.

Lets try to improve the syntax.
The next step is to promote the State[S, A], from a simple alias to a propert trait with the map and flatMap methods part of it.


1
2
3
4
5
trait State[S, A] {
    def apply(s:S): (A, S)
    def map[B](f:A =>B): State[S, B]
    def flatMap[B](f:A=>State[S, B]): State[S, B]
  }

Now lets define a companion object for State and a factory method, we will also define the map and flatMap implementation.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
trait State[S, A] {
    def apply(s:S): (A, S)
    def map[B](f:A =>B): State[S, B] = State { s => 
     val (a, newState) = this(s)
     (f(a), newState)
    }
    def flatMap[B](f:A=>State[S, B]): State[S, B] = State { s => 
     val (a, newState) = this(s)
     f(a)(newState)
    }
  }
  
  object State {
    def apply[S, A](r: S => (A, S)): State[S, A] = new State[S, A] {
      def apply(s:S) = r(s)
    }
  }

So now map and flatMap are methods of the State trait, also the companion object State has a factory method that create a new state using a function of type S => (A, S).

Now our code will look like the below:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package fp

object StackApp extends App {

  type Stack[A] = List[A]
  
  trait State[S, A] {
    def apply(s:S): (A, S)
    def map[B](f:A =>B): State[S, B] = State { s => 
     val (a, newState) = this(s)
     (f(a), newState)
    }
    def flatMap[B](f:A=>State[S, B]): State[S, B] = State { s => 
     val (a, newState) = this(s)
     f(a)(newState)
    }
  }
  
  object State {
    def apply[S, A](r: S => (A, S)): State[S, A] = new State[S, A] {
      def apply(s:S) = r(s)
    }
  }
  
  def push[A](a:A) : State[Stack[A], Unit] = State { stack => ((), a :: stack) }
  
  def pop[A]: State[Stack[A], Option[A]] = State {
    case a :: tail => (Some(a), tail)
    case Nil       => (None, Nil)
  }
  
  def popPairs[A]: State[Stack[A],(Option[A], Option[A])] =
    pop[A].flatMap(opt1 => pop[A].map(opt2 => (opt1, opt2) ))
    
}

Fantastic now our popPairs uses the dot notation and looks more similar to object oriented programming style but is still not optimal.

In scala all the expressions where composibility is used (flatMap of flatMap of .... map) can be translated in a for comprehension expression.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def popPairs[A]: State[Stack[A],(Option[A], Option[A])] =
  pop[A].flatMap(opt1 => pop[A].map(opt2 => (opt1, opt2) ))

/**
 * IS EQUIVALENT TO :
 */
    
def popPairs[A]: State[Stack[A], (Option[A], Option[A])] = for {
  opt1 <- pop[A]
  opt2 <- pop[A]
} yield (opt1, opt2)

The second version is similar to imperative programming style but is in reality syntactical sugar of the first version.

So finally our code is:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package fp

object StackApp extends App {

  type Stack[A] = List[A]
  
  trait State[S, A] {
    def apply(s:S): (A, S)
    def map[B](f:A =>B): State[S, B] = State { s => 
     val (a, newState) = this(s)
     (f(a), newState)
    }
    def flatMap[B](f:A=>State[S, B]): State[S, B] = State { s => 
     val (a, newState) = this(s)
     f(a)(newState)
    }
  }
  
  object State {
    def apply[S, A](r: S => (A, S)): State[S, A] = new State[S, A] {
      def apply(s:S) = r(s)
    }
  }
  
  def push[A](a:A) : State[Stack[A], Unit] = State { stack => ((), a :: stack) }
  
  def pop[A]: State[Stack[A], Option[A]] = State {
    case a :: tail => (Some(a), tail)
    case Nil       => (None, Nil)
  }
  
  def popPairs[A]: State[Stack[A],(Option[A], Option[A])] = for {
    opt1 <- pop[A]
    opt2 <- pop[A]
  } yield (opt1, opt2)
  
}

As you can see we are not dealing anymore with the states in popPairs and the code is quiet intuitive and similar on how it would look in imperative style.

The benefit of using the State[S, A] is that none of these functions are manipulating the state, they are actually pushing the state modification to the very up of your stack so all these functions are PURE, this will give to your program an important property: Reference Transparency and in distributed system it can be extended to Location Transparency, very important for system which requires high availability and fault tolerance, aspects that we will cover on the next post, when we will use the State monad to implement a Key Value Store.