Welcome to MLink Developer Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
156 views
in Technique[技术] by (71.8m points)

Explicit throughput limiting on part of an akka stream

I have a flow in our system which reads some elements from SQS (using alpakka) and does some preporcessing (~ 10 stages, normally < 1 minute in total). Then, the prepared element is sent to the main processing (single stage, taking a few minutes). The whole thing runs on AWS/K8S and we’d like to scale out when the SQS queue grows above a certain threshold. The issue is, the SQS queue takes a long time to blow up, since there are a lot of elements “idling” in-process, having done their preprocessing but waiting for the main thing.

We can’t externalize the preprocessing stuff to a separate queue since their outcome can’t survive a de/serialization roundtrip. Also, this service and the “main” processor are deeply coupled (this service runs as main’s sidecar) and can’t be scaled independently.

The preprocessing stages are technically .mapAsyncUnordered, but the whole thing is already very slim (stream stages and SQS batches/buffers).

We tried lowering the interstage buffer (akka.stream.materializer.max-input-buffer-size), but that only gives some indirect benefit, no direct control (and is too internal to be mucking with, for my taste anyway).

I tried implementing a “gate” wrapper which would limit the amount of elements allowed inside some arbitrary Flow, looking something like:

class LimitingGate[T, U](originalFlow: Flow[T, U], maxInFlight: Int) {
  private def in: InputGate[T] = ???
  private def out: OutputGate[U] = ???

  def gatedFlow: Flow[T, U, NotUsed] = Flow[T].via(in).via(originalFlow).via(out)
}

And using callbacks between the in/out gates for throttling.

The implementation partially works (stream termination is giving me a hard time), but it feels like the wrong way to go about achieving the actual goal.

Any ideas / comments / enlightening questions are appreciated

Thanks!


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Try something along these lines (I'm only compiling it in my head):

def inflightLimit[A, B, M](n: Int, source: Source[T, M])(businessFlow: Flow[T, B, _])(implicit materializer: Materializer): Source[B, M] = {
  require(n > 0)  // alternatively, could just result in a Source.empty...
  val actorSource = Source.actorRef[Unit](
    completionMatcher = PartialFunction.empty,
    failureMatcher = PartialFunction.empty,
    bufferSize = 2 * n,
    overflowStrategy = OverflowStrategy.dropHead  // shouldn't matter, but if the buffer fills, the effective limit will be reduced
  )
  val (flowControl, unitSource) = actorSource.preMaterialize()

  source.statefulMapConcat { () =>
    var firstElem: Boolean = true
    { a =>
      if (firstElem) {
        (0 until n).foreach(_ => flowControl.tell(()))  // prime the pump on stream materialization
        firstElem = false
      }
      List(a)
    }}
    .zip(unitSource)
    .map(_._1)
    .via(businessFlow)
    .wireTap { _ => flowControl.tell(()) }  // wireTap is Akka Streams 2.6, but can be easily replaced by a map stage which sends () to flowControl and passes through the input
 }

Basically:

  • actorSource will emit a Unit ((), i.e. meaningless) element for every () it receives
  • statefulMapConcat will cause n messages to be sent to the actorSource only when the stream first starts (thus allowing n elements from the source through)
  • zip will pass on a pair of the input from source and a () only when actorSource and source both have an element available
  • for every element which exits businessFlow, a message will be sent to the actorSource, which will allow another element from the source through

Some things to note:

  • this will not in any way limit buffering within source
  • businessFlow cannot drop elements: after n elements are dropped the stream will no longer process elements but won't fail; if dropping elements is required, you may be able to inline businessFlow and have the stages which drop elements send a message to flowControl when they drop an element; there are other things to address this which you can do as well

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to MLink Developer Q&A Community for programmer and developer-Open, Learning and Share
...