Pekko Extension libraries
Set of extension libraries for pekko.
Getting Started
All libraries require the same initial setup, like:
addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2")Setting dependency:
libraryDependencies += "com.evolution" %% "pekko-extension-<name>" % "<version>"Extensions
pekko-extension-serialization
TODO add description!
pekko-extension-pubsub
Typesafe layer for DistributedPubSubMediator.
trait PubSub[F[_]] {
def publish[A: Topic: ToBytes](
msg: A,
sender: Option[ActorRef] = None,
sendToEachGroup: Boolean = false
): F[Unit]
def subscribe[A: Topic: FromBytes: ClassTag](
group: Option[String] = None)(
onMsg: OnMsg[F, A]
): Resource[F, Unit]
def topics(timeout: FiniteDuration = 3.seconds): F[Set[String]]
}For an ability to serialize/deserialize messages to offload pekko remoting and improve throughput,
check DistributedPubSubMediatorSerializing.scala.
set of pekko-extension-test libraries
These two libraries were created to provide a set of tests to be used in projects dependent on Pekko
libraries.
For instance, to prevent the following "surprise" at runtime:
java.lang.IllegalStateException: You are using version 1.2.0 of Pekko HTTP, but it appears you (perhaps indirectly) also depend on older versions of related artifacts. You can solve this by adding an explicit dependency on version 1.2.0 of the [pekko-http, pekko-http-testkit] artifacts to your project. Here's a complete collection of detected artifacts: (1.1.0, [pekko-http, pekko-http-testkit]), (1.2.0, [pekko-http-core, pekko-parsing]). See also: https://pekko.apache.org/docs/pekko/current/common/binary-compatibility-rules.html#mixed-versioning-is-not-allowed
at org.apache.pekko.util.ManifestInfo.checkSameVersion(ManifestInfo.scala:188)
at org.apache.pekko.util.ManifestInfo.checkSameVersion(ManifestInfo.scala:166)
at org.apache.pekko.http.scaladsl.HttpExt.<init>(Http.scala:89)
at org.apache.pekko.http.scaladsl.Http$.createExtension(Http.scala:1140)
at org.apache.pekko.http.scaladsl.Http$.createExtension(Http.scala:871)
at org.apache.pekko.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:1175)
at org.apache.pekko.actor.ExtensionId.apply(Extension.scala:87)
at org.apache.pekko.actor.ExtensionId.apply$(Extension.scala:86)
pekko-extension-test-actor
For pekko-actor tests that all pekko modules are of the same version.
Set up the dependency in Test scope:
libraryDependencies += "com.evolution" %% "pekko-extension-test-actor" % "<version>" % TestAnd add the following test into your project:
import com.evolution.pekkotest.PekkoActorSuite
class PekkoActorTest extends PekkoActorSuitepekko-extension-test-http
For pekko-http tests that all pekko-http modules are of the same version.
Set up the dependency in Test scope:
libraryDependencies += "com.evolution" %% "pekko-extension-test-http" % "<version>" % TestAnd add the following test into your project.
import com.evolution.pekkotest.PekkoHttpSuite
class PekkoHttpTest extends PekkoHttpSuitepekko-extension-distributed-data-tools
SafeReplicator is a typesafe api for Distributed Data replicator
trait SafeReplicator[F[_], A <: ReplicatedData] {
def get(implicit consistency: ReadConsistency): F[Option[A]]
def update(modify: Option[A] => A)(implicit consistency: WriteConsistency): F[Unit]
def delete(implicit consistency: WriteConsistency): F[Boolean]
def subscribe(
onStop: F[Unit],
onChanged: A => F[Unit])(implicit
factory: ActorRefFactory,
executor: ExecutionContext
): Resource[F, Unit]
def flushChanges: F[Unit]
}pekko-extension-sharding-strategy
Alternative to org.apache.pekko.cluster.sharding.ShardCoordinator.ShardAllocationStrategy.
Api
trait ShardingStrategy[F[_]] {
def allocate(requester: Region, shard: Shard, current: Allocation): F[Option[Region]]
def rebalance(current: Allocation, inProgress: Set[Shard]): F[List[Shard]]
}Syntax
val strategy = LeastShardsStrategy()
.filterShards(...)
.filterRegions(...)
.rebalanceThreshold(10)
.takeShards(10)
.shardRebalanceCooldown(1.minute)
.logging(...)
.toAllocationStrategy()set of pekko-extension-tools libraries
pekko-extension-tools-test
TODO add description!
pekko-extension-tools-util
TODO add description!
pekko-extension-tools-serialization
TODO add description!
pekko-extension-tools-persistence
TODO add description!
pekko-extension-tools-cluster
TODO add description!
pekko-extension-tools-instrumentation
TODO add description!
TODO do we need umbrella lib pekko-extension-tools?
pekko-extension-conhub
ConHub is a distributed registry used to manage websocket connections on the different nodes of an application.
It enables you to send a serializable message to one or many connections hiding away the complexity of distributed system.
In short: user provides lookup criteria and a message and conHub does the job routing message to physical
instances of a matched connections
Usage example:
type Connection = ??? // type representing physical connection
final case class Msg(bytes: Array[Byte]) // serializable
final case class Envelope(lookup: LookupById, msg: Msg)
final case class LookupById(id: String)
val conHub: ConHub[String, LookupById, Connection, Envelope] = ???
conHub ! Envelope(LookupById("testId"), Msg(Array(…)))set of pekko-extension-effect libraries
This project aims to build a bridge between Pekko and pure functional code based
on cats-effect.
pekko-extension-effect-actor
Covered ("classic", not the "typed" kind of actors!):
Tell.scala
Represents ActorRef.tell:
trait Tell[F[_], -A] {
def apply(a: A, sender: Option[ActorRef] = None): F[Unit]
}Ask.scala
Represents ActorRef.ask pattern:
trait Ask[F[_], -A, B] {
def apply(msg: A, timeout: FiniteDuration, sender: Option[ActorRef]): F[B]
}Reply.scala
Represents a reply pattern: sender() ! reply:
trait Reply[F[_], -A] {
def apply(msg: A): F[Unit]
}Receive.scala
This is what you need to implement instead of familiar new Actor { ... }:
trait Receive[F[_], -A, B] {
def apply(msg: A): F[B]
def timeout: F[B]
}ActorOf.scala
Constructs Actor.scala out of receive: ActorCtx[F] => Resource[F, Receive[F, Any]].
ActorCtx.scala
Wraps ActorContext:
trait ActorCtx[F[_]] {
def self: ActorRef
def parent: ActorRef
def executor: ExecutionContextExecutor
def setReceiveTimeout(timeout: Duration): F[Unit]
def child(name: String): F[Option[ActorRef]]
def children: F[List[ActorRef]]
def actorRefFactory: ActorRefFactory
def watch[A](actorRef: ActorRef, msg: A): F[Unit]
def unwatch(actorRef: ActorRef): F[Unit]
def stop: F[Unit]
}pekko-extension-effect-persistence
PersistentActorOf.scala
Constructs PersistentActor.scala out of eventSourcedOf: ActorCtx[F] => F[EventSourced[F, S, E, C]]
EventSourced.scala
Describes a lifecycle of entity with regard to event sourcing, phases are: Started, Recovering, Receiving and Termination
trait EventSourced[F[_], S, E, C] {
def eventSourcedId: EventSourcedId
def recovery: Recovery
def pluginIds: PluginIds
def start: Resource[F, RecoveryStarted[F, S, E, C]]
}RecoveryStarted.scala
Describes the start of the recovery phase
trait RecoveryStarted[F[_], S, E, C] {
def apply(
seqNr: SeqNr,
snapshotOffer: Option[SnapshotOffer[S]]
): Resource[F, Recovering[F, S, E, C]]
}Recovering.scala
Describes recovery phase
trait Recovering[F[_], S, E, C] {
def replay: Resource[F, Replay[F, E]]
def completed(
seqNr: SeqNr,
journaller: Journaller[F, E],
snapshotter: Snapshotter[F, S]
): Resource[F, Receive[F, C]]
}Replay.scala
Used during recovery to replay events
trait Replay[F[_], A] {
def apply(seqNr: SeqNr, event: A): F[Unit]
}Journaller.scala
Describes communication with underlying journal
trait Journaller[F[_], -A] {
def append: Append[F, A]
def deleteTo: DeleteEventsTo[F]
}Snapshotter.scala
Describes communication with underlying snapshot storage
/**
* Describes communication with underlying snapshot storage
*
* @tparam A - snapshot
*/
trait Snapshotter[F[_], -A] {
def save(seqNr: SeqNr, snapshot: A): F[F[Instant]]
def delete(seqNr: SeqNr): F[F[Unit]]
def delete(criteria: SnapshotSelectionCriteria): F[F[Unit]]
}pekko-extension-effect-testkit
TODO add description!
pekko-extension-effect-actor-tests
TODO add description!
pekko-extension-effect-persistence-api
TODO add description!
pekko-extension-effect-persistence
TODO add description!
pekko-extension-effect-cluster
TODO add description!
pekko-extension-effect-cluster-sharding
TODO add description!
pekko-extension-effect-eventsourcing
This is the main runtime/queue where all actions against your state are processed in a desired event-sourcing sequence:
- validate and finalize events
- append events to journal
- publish changed state
- execute side effects
It is optimized for maximum throughput, hence different steps of different actions might be executed in parallel as well as events
might be stored in batches
trait Engine[F[_], S, E] {
def state: F[State[S]]
/**
* @return Outer F[_] is about `load` being enqueued, this immediately provides order guarantees
* Inner F[_] is about a `load` being completed
*/
def apply[A](load: F[Validate[F, S, E, A]]): F[F[A]]
}Library mappings pekko to akka
| pekko | akka | migrated from version |
|---|---|---|
| pekko-extension-serialization | akka-serialization | 1.1.0 |
| pekko-extension-pubsub | pubsub | 10.0.0 |
| pekko-extension-test-actor | akka-test | 0.3.0 |
| pekko-extension-test-http | akka-test | 0.3.0 |
| pekko-extension-distributed-data-tools | ddata-tools | 3.1.0 |
| pekko-extension-sharding-strategy | sharding-strategy | 3.0.2 |
| pekko-extension-tools-test | akka-tools | 3.3.13 |
| pekko-extension-tools-util | akka-tools | 3.3.13 |
| pekko-extension-tools-serialization | akka-tools | 3.3.13 |
| pekko-extension-tools-persistence | akka-tools | 3.3.13 |
| pekko-extension-tools-cluster | akka-tools | 3.3.13 |
| pekko-extension-tools-instrumentation | akka-tools | 3.3.13 |
| pekko-extension-conhub | conhub | 3.0.0 |
| pekko-extension-effect-actor | akka-effect | 4.1.10 |
| pekko-extension-effect-testkit | akka-effect | 4.1.10 |
| pekko-extension-effect-actor-tests | akka-effect | 4.1.10 |
| pekko-extension-effect-persistence-api | akka-effect | 4.1.10 |
| pekko-extension-effect-persistence | akka-effect | 4.1.10 |
| pekko-extension-effect-cluster | akka-effect | 4.1.10 |
| pekko-extension-effect-cluster-sharding | akka-effect | 4.1.10 |
| pekko-extension-effect-eventsourcing | akka-effect | 4.1.10 |