Connecting Scala microservices with ZeroMQ and Protocol Buffers

When thinking about software architectures the new (while actually being quite an old concept) hot thing are microservices. The idea makes sense, cutting a large system into smaller building blocks with well defined APIs makes each component easier to maintain, the system can be distributed more easily and overload or errors in one component can be better contained and do not need to affect other parts as badly as they would in a monolithic architecture.

For a startup project I am currently involved in (check it out: venditavi), we where faced with making a choice on how we can let our different components communicate with each other. After playing around with Thrift for some time we now settled on a combination of ZeroMQ for handling the communication and Protocol Buffers to perform serialization. We have been quite happy with this choice so far.

The advantages we found: - they support many languages (which was important for us as we need to combine C++ and Scala services). - they are very lightweight and efficient, each tool being very specialized for its respective task. - both tools have an extensive documentation. - they are really easy to use (something I will demonstrate soon). - we get interfaces that are type safe (if the language supports type safety).

Especially for ZeroMQ a huge plus is that it offers simple building blocks that can be combined for very complex messaging patterns. The very well written documentation mainly consists of a collection of such patterns for a multitude of use cases which can then be adapted to serve your particular problem.

That being said, I now want to give a tutorial for a Scala service communicating via ZeroMQ and Protocol Buffers.

Welcome to Scala bank

The example will be a banking application that can automatically approve or decline loans of customers (obviously this will not be real application and I would not advise anyone to handle security in such a relaxed manner).

The application wil have three components, a client application that the customer is using, a component for determining a customers loan worthiness and a component handling authentication of customers. The client will collect data from the customer, send it to the loan server which in turn will communicate with the authentication server to determine whether the provided credentials are valid (this process is kept simple in order to not blow the tutorial out of proportions - please put more effort into your authentication systems). In case the customer has valid credentials, the component will determine wheter the customer is loan worthy and answer with an appropriate reply.

The interfaces between all components are given by Protocol Buffer messages. These will be translated into Java classes which we can use to serialize the data we want to exchange between the components. The great thing here is that the Protocol Buffer messages can be translated into classes for many languages which makes it simple to have certain components written in another language that might be more appropriate for the respective job.

We start with looking at the messages that can be exchanged with the authentication service:

package protobuf;

option java_package = "com.fdahms.scalabank.protobuf";
option java_multiple_files = true;

import "common_models.proto";

message AuthRequest {
  required Credentials credentials = 1;
}

message AuthResponse {
  required bool authorized = 1;
}

The file defines a default package name which is here called protobuf. Keep the default package name simple, in many programming languages the Java package name train wrecks are not welcome. We can define an own package name which is used just for Java translations. Furthermore note the java_multiple_files option. Without this, all classes would be wrapped in an outer class named after the filename (which is not really a problem - just not my style). Furrthermore we import another proto file. This lets us use the messages from this file as types in this file. We will look at common_models.proto later.

The messages we define are AuthRequest and AuthResponse. As you might have guessed, the AuthRequest is send to the authentication service which will then answer with an AuthResponse. Each of these messages consists of a single field. The request has a sub message called Credentials which holds the users credentials. The response consist of just a boolean telling us whether the request held valid credentials. In this case each field is required which means that the serialization process will fail if we fail to provide this field. In these cases this the desired behavior, as the messages would not make much sense without these fields. Note that required fields should stay this way forever in order to stay compatible with version changes. In doubt a field should be declared optional.

The messages for that define the interface of the loan application service look very similar to those of the authentication service. They only contain more fields for additional information provided by the client.

package protobuf;

option java_package = "com.fdahms.scalabank.protobuf";
option java_multiple_files = true;

import "common_models.proto";

message LoanApplicationRequest {
  required Credentials credentials = 1;
  required MonetaryAmount amount = 2;

  optional bool ever_defaulted = 10;
  optional bool using_money_for_fintech = 11;
  optional bool promise_to_give_money_back = 12;
}

message LoanApplicationResponse {
  required bool approved = 1;
  optional Interest interest = 2;
}

Note that the non essential information in the request are optional as we can't know if these will be relevant for a loan application in the future. In Protocol Buffer definitions each field gets assigned a number that it is refered by in the serialized format. These should not change and it may be a good idea to also not reuse them with a different field.

The last set of messages we look at contains the common messages, one of which is the Credentials message we saw before.

package protobuf;

option java_package = "com.fdahms.scalabank.protobuf";
option java_multiple_files = true;


message Credentials {
  required uint64 account_id = 1;
  required string password = 2;
}

message MonetaryAmount {
  required int64 cent_amount = 1;
}

message Interest {
  required int32 ppm = 1;
}

All these declarations are similar to those we saw before. Note that for the MonetaryAmount and for the Interest rate I chose to represent the numeric values as integers and not as floating point numbers. For numbers where one is not willing to sacrifice precision (monetary amounts are usually one of those) floating point numbers are too imprecise and may result in rounding errors. Therefore the monetary amount is stored in cents and the interest rate in parts per million, which should grant us sufficient precision for all realistic use cases.

All these proto files can automatically be translated into Java classes by the Protocol Buffers compiler. We could do so manually and check in the resulting code into the source code management. But this would be a somewhat unclean solution. Instead the prefered way is to use an sbt plugin that automatically performs this translation at the start of the compilation. This can be included by adding

addSbtPlugin("com.github.gseitz" % "sbt-protobuf" % "0.5.1")

to the project/plugins.sbt file and

import sbtprotobuf.{ProtobufPlugin=>PB}
PB.protobufSettings

to the build.sbt settings.

A good thing about Scala is the ability to add functionality to existing classes. Otherwise we would be stuck with the functions that the automatically generated classes provide. For example we can define helper functions that allow us to translate a MonetaryAmount into a BigDecimal holding a high precision representation of the monetary amount:

package com.fdahms.scalabank.protobuf

object MonetaryAmountExtensions {
  implicit class ExtendedMonetaryAmount( val monetaryAmount: MonetaryAmount ) {
    def getBigDecimal: BigDecimal = {
      BigDecimal(monetaryAmount.getCentAmount) / 1e2
    }
  }

  implicit class ExtendedMonetaryAmountBuilder( val monetaryAmounBuilder: MonetaryAmount.Builder ) {
    def setAmount(bigDecimal: BigDecimal): MonetaryAmount.Builder = {
      monetaryAmounBuilder.setCentAmount((bigDecimal * 1e2).toLong)
    }

    def setAmount(string: String): MonetaryAmount.Builder = {
      setAmount(BigDecimal(string))
    }
  }
}

A similar thing can be done to translate the interest rate from its parts per million representation into a more common one.

Now that we are done with the Protocol Buffer messages, let us look at the three services of our application. As promised, these are build around ZeroMQ as a communication layer. There exists a pure Java implementation of ZeroMQ (called JeroMQ) which has the same interface, all the features one usually requires and no large drawback in performance. In most cases this will be fully sufficient and make life a bit easier.

The first service we are going to inspect performs the authentication of provided credentials.

object AuthServer {
  val port: Int = 6221

  private lazy val zmqContext = new ZContext()

  private val accountsDB = Map(
    123L -> "Secret",
    321L -> "Very Secret"
  )

  def main(args: Array[String]) {
    val socket = zmqContext.createSocket(ZMQ.REP)
    socket.bind(s"tcp://*:${port}")
    println("Auth server started - awaiting requests")

    while (true) {
      val request = AuthRequest.parseFrom(socket.recv)
      println("Someone wants to authenticate for account " + request.getCredentials.getAccountId)

      val responseBuilder = AuthResponse.newBuilder
      accountsDB.get(request.getCredentials.getAccountId) match {
        case Some(pw) if pw == request.getCredentials.getPassword =>
          responseBuilder.setAuthorized(true)
        case _ =>
          responseBuilder.setAuthorized(false)
      }

      socket.send(responseBuilder.build.toByteArray)
      println("Keeping the world safe")

    }
  }
}

It binds a socket on port 6221, where it will listen for incoming requests. Then, in a perpetual loop, it takes messages from the socket, lets Protocol Buffers parse these into AuthRequest objects. Then it validates these against its credentials database. The reply is constructed using the builder object of AuthResponse. As AuthResponse represents immutable objects, a builder is used to construct new instances. Finally the response object is serialized and send back to the socket as a reply. Simple, isn't it?

The other end of this communication could be similarly simple. But there is some caveat here. As network communication is notoriously unstable and we would like to cope with problems here, its a good idea to wrap the requesting functionality so that it handles timeouts and can perform retries. Here we use a pattern that the ZeroMQ people termed Lazy pirate pattern (reliable request-reply = RRR = pirate - got it?). To reuse this pattern I implemented a brief Scala class for this:

class LazyPirate(context: ZContext,
                 endpoint: String,
                 connect: Boolean = true,
                 retries: Int = 3,
                 timeout: Int = 2500) {

  private var socket: ZMQ.Socket = null

  private def initSocket: ZMQ.Socket = {
    val socket = context.createSocket(ZMQ.REQ)
    assert (socket != null)
    if (connect) {
      socket.connect(endpoint)
    } else {
      socket.bind(endpoint)
    }

    socket
  }

  def reqResp(request: Array[Byte]): Option[Array[Byte]] = {
    var reply: Array[Byte] = null
    var retriesLeft = retries

    while (reply == null && retriesLeft > 0) {
      if (socket == null) socket = initSocket
      socket.send(request)

      val items = Array(new PollItem(socket, Poller.POLLIN))
      val rc = ZMQ.poll(items, timeout)

      if (rc != -1 && items.head.isReadable) {
        reply = socket.recv
      } else {
        retriesLeft -= 1
        context.destroySocket(socket)
        socket = null
      }
    }

    Option(reply)
  }
}

By default this implementation will set a timeout of 2.5 seconds before declaring a connection for dead and will perform 3 retries before finally giving up (and return None). This funcionality will now be used in the loan application service to communicate with the authentication service.

object LoanServer {
  val port: Int = 6220

  private lazy val zmqContext = new ZContext()
  private lazy val authLP = new LazyPirate(zmqContext, s"tcp://127.0.0.1:${AuthServer.port}")

  private def checkAuth(credentials: Credentials): Boolean = {
    val request = AuthRequest.newBuilder
      .setCredentials(credentials)
      .build
    authLP.reqResp(request.toByteArray)
      .map(AuthResponse.parseFrom) match {
      case None => false
      case Some(response) => response.getAuthorized
    }
  }

  private def cleverAlgorithm(loanRequest: LoanApplicationRequest): Option[BigDecimal] = {
    // Check authorization first
    if (!checkAuth(loanRequest.getCredentials)) return None

    // Now do risk assessment
    if (loanRequest.getUsingMoneyForFintech) return Some(BigDecimal(0.02))
    if (loanRequest.getEverDefaulted) return None
    if (loanRequest.getAmount.getBigDecimal < BigDecimal(20000)
        && loanRequest.getPromiseToGiveMoneyBack) return Some(BigDecimal(0.095))
    None
  }

  def main(args: Array[String]) {
    val socket = zmqContext.createSocket(ZMQ.REP)
    socket.bind(s"tcp://*:${port}")
    println("Loan server started - awaiting requests")

    while(true) {
      val request = LoanApplicationRequest.parseFrom(socket.recv)
      println("Got an application for an amount of " + request.getAmount.getBigDecimal)

      val responseBuilder = LoanApplicationResponse.newBuilder
      cleverAlgorithm(request) match {
        case None => responseBuilder.setApproved(false)
        case Some(interest) =>
          responseBuilder.setApproved(true)
            .setInterest(Interest.newBuilder.setInterest(interest))
      }

      socket.send(responseBuilder.build.toByteArray)
      println("Done for now")

    }
  }
}

The service has one function for checking credentials which uses our lazy pirate to send the credentials to the authentication service and gets back whether the account has valid credentials or not. The next function is the clever algorithm for determining the loan worthiness of an application. An unauthorized account will never be labeled loan worthy. Finally the main function will again perform a perpetual loop, receive applications, do the risk assessment and then construct an appropriate reply to send back.

Finally the client service has nothing new to present to us: object Client {

  private lazy val loanLP = new LazyPirate(new ZContext, s"tcp://127.0.0.1:${LoanServer.port}")

  private def performApplication(application: LoanApplicationRequest): Option[LoanApplicationResponse] = {
    val message = application.toByteArray
    val response = loanLP.reqResp(message)
    response.map(LoanApplicationResponse.parseFrom)
  }

  def main(args: Array[String]): Unit = {

    println("\nWelcome to the Scala Bank loan application client\n")

    val loanRequestBuilder = LoanApplicationRequest.newBuilder

    println("How much do you need? ")
    val amountBuilder = MonetaryAmount.newBuilder
      amountBuilder.setAmount(readLine)
    loanRequestBuilder.setAmount(amountBuilder)

    println("What do you need the money for? ")
    loanRequestBuilder.setUsingMoneyForFintech(readLine.contains("FinTech"))

    println("Where you ever unable to repay a loan? ")
    loanRequestBuilder.setEverDefaulted(readLine == "Yes")

    println("Do you promise to pay us back? ")
    loanRequestBuilder.setPromiseToGiveMoneyBack(readLine == "Yes")

    println("Your Account number: ")
    val credentialsBuilder = Credentials.newBuilder
    credentialsBuilder.setAccountId(readLine.toLong)

    println("Your password: ")
    credentialsBuilder.setPassword(readLine)
    loanRequestBuilder.setCredentials(credentialsBuilder)

    println("Let's see if we can help you with that")
    val applicationResponse = performApplication(loanRequestBuilder.build)

    applicationResponse match {
      case None => println("Could not connect to server. Its definitely not us so it must be your fault.")
      case Some(resp) if resp.getApproved() =>
        println("Lucky you!")
        println("We can offer you the loan for an interest rate of just " +
          (resp.getInterest.getBigDecimal * 1e2).toDouble
          + "%."
        )
      case Some(resp) => println("No chance my friend! Go see your local loan shark.")
    }
  }
}

Again our friend the lazy pirate is used to send request to the loan application service and the response is then displayed to the user.

To try the entire project, open three terminals and in the first two start the authentication and the loan application service with sbt "runMain com.fdahms.scalabank.authserver.AuthServer" and sbt "runMain com.fdahms.scalabank.loanserver.LoanServer". Then in the third terminal run the client:

$ sbt "runMain com.fdahms.scalabank.client.Client"

Welcome to the Scala Bank loan application client

How much do you need? 
10000000
What do you need the money for? 
Something, something, FinTech
Where you ever unable to repay a loan? 
Yes
Do you promise to pay us back? 
Maybe
Your Account number: 
123
Your password: 
Secret
Let's see if we can help you with that
Lucky you!
We can offer you the loan for an interest rate of just 2.0%.

Works like a charm. Go ahead and try to put some stress on the system, like shutting down some component and see how the other parts keep doing their stuff (of course with some component shut down no application will ever be approved - but that is just what we would like to happen if some problem arises in the system).

I hope you enjoyed this little introduction. The entire code of the tutorial is available on Github.


comments powered by Disqus