GitHunt

AggregateMessages actor, fsm:

Lets you congregate/aggregate messages and publish it back
to the passed function.

The callback is done when either the number of messages
exceeds a threshold or time has elapsed,
specified by "Rate" case class.

To check it out further please see: source-code

With a recent development of alpakka, we can do this better if we are using streaming data.

ConsumerKafka actor, kafka:

Lets you open a consumer in a different actor, polling every 5 seconds.
Function passed to map would be called with the result from polling.

Support mapping from Json to Object.
Assuming: value.serializer is specified as "org.apache.kafka.common.serialization.StringSerializer"

case class MyComponent(name: String)

val topicNames: List[String] = List("Topic-to-consume-from")

val groupId = "group-id"

val kafkaConsumerProperties = new java.util.Properties()
kafkaConsumerProperties.putAll(...)

val consumer = new ConsumerKafka[MyComponent](kafkaConsumerProperties, topicNames, groupId)

consumer.map {
  components: List[MyComponent] => 
    components.map(...)
}

To check it out further please see: source-code

Favor Kafka Streams now.

Languages

Scala100.0%

Contributors

Created June 19, 2016
Updated May 3, 2022