I have been playing with Akka Streams and Akka Http to create a flow to get some data from a public Rest endpoint and deserialize the json using Json4s.
Since there are not that many examples yet, and documentation only has a few examples, I’m sharing my little app here.
Default Akka Http only supports Spray Json, but fortunately Heiko already created a small akka-http-json library for Json4s or Play Json.
Here’s is small code sample on how to create a Akka Streams Flow and run it. This was just to test the calling of the Rest endpoint and deserialise the result json into a case class. Next step is then to extend the flow to do something useful with the retrieved data. I’ll put putting it into a time series database called Prometheus, and maybe also into Mongo.
package enphase
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, Uri}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.{DefaultFormats, Formats, Serialization, jackson}
import scala.concurrent.{Await, Future}
/**
* Enphase API Client which gets Enphase data and put those into InfluxDB
*
* - Start with HTTP GET request to Enphase API.
* - Transform response into json
* - Transform json into time series data
* - Put time series data into InfluxDB using HTTP POST request
*/
object Client extends App with Json4sSupport {
val systemId = 999999 // replace with your system id
val apiKey = "replace-with-your-api-key"
val userId = "replace-with-your-user-id"
val systemSummaryUrl = s"""/api/v2/systems/$systemId/summary?key=$apiKey&user_id=$userId"""
println(s"Getting from: $systemSummaryUrl")
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val formats: Formats = DefaultFormats
implicit val jacksonSerialization: Serialization = jackson.Serialization
import concurrent.ExecutionContext.Implicits.global
val httpClient = Http().outgoingConnectionTls(host = "api.enphaseenergy.com")
private val flow: Future[SystemSummary] = Source.single(HttpRequest(uri = Uri(systemSummaryUrl)))
.via(httpClient)
.mapAsync(1)(response => Unmarshal(response.entity).to[SystemSummary])
.runWith(Sink.head)
import concurrent.duration._
val start = System.currentTimeMillis()
val result = Await.result(flow, 15 seconds)
val end = System.currentTimeMillis()
println(s"Result in ${end-start} millis: $result")
}
/**
* Entity for system summary json:
* {
* "current_power": 3322,
* "energy_lifetime": 19050353,
* "energy_today": 25639,
* "last_report_at": 1380632700,
* "modules": 31,
* "operational_at": 1201362300,
* "size_w": 5250,
* "source": "microinverters",
* "status": "normal",
* "summary_date": "2014-01-06",
* "system_id": 123
* }
*/
case class SystemSummary(system_id: Int, summary_date: String, status: String, source: String,
size_w: Int, operational_at: Long, modules: Int, last_report_at: Long,
energy_today: Int, energy_lifetime: Long, current_power: Int)
At first I could not get Heiko’s Unmarchallers working and I wrote my own Unmarshaller which is not that difficult looking at some other implementations. The problem was a very vage error saying something was missing, but not exactly what. Today I figured out, it was just missing one of the required implicit arguments, the Json4s Serializers, and then it all worked nicely.
But here’s is how to implement a custom Unmarshaller which unmarshalls a HttpResponse instance:
implicit def responseUnmarshaller[T : Manifest]: FromResponseUnmarshaller[T] = {
import concurrent.duration._
import enphase.json.Json4sProtocol._
import org.json4s.jackson.Serialization._
new Unmarshaller[HttpResponse, T] {
override def apply(resp: HttpResponse)(implicit ec: ExecutionContext): Future[T] = {
resp.entity.withContentType(ContentTypes.`application/json`)
.toStrict(1 second)
.map(_.data)
.map(_.decodeString(resp.entity.contentType.charset.value))
.map(json => { println(s"Deserialized to: $json"); json })
.map(json => read[T](json))
}
}
}
The only change in the application needed to use this unmarshaller is to replace the ‘mapAsync’ line with:
.mapAsync(1)(Unmarshal(_).to[SystemSummary])
The project build.sbt contains these dependencies:
scalaVersion := "2.11.6"
libraryDependencies ++= Seq(
"com.typesafe.akka" % "akka-http-experimental_2.11" % "1.0-RC4",
"de.heikoseeberger" %% "akka-http-json4s" % "0.9.1",
"org.json4s" %% "json4s-jackson" % "3.2.11",
"org.scalatest" % "scalatest_2.11" % "2.2.4" % "test"
)
Happy Akka-ing!