I have a flow in our system which reads some elements from SQS (using alpakka) and does some preporcessing (~ 10 stages, normally < 1 minute in total). Then, the prepared element is sent to the main processing (single stage, taking a few minutes). The whole thing runs on AWS/K8S and we’d like to scale out when the SQS queue grows above a certain threshold. The issue is, the SQS queue takes a long time to blow up, since there are a lot of elements “idling” in-process, having done their preprocessing but waiting for the main thing.
We can’t externalize the preprocessing stuff to a separate queue since their outcome can’t survive a de/serialization roundtrip. Also, this service and the “main” processor are deeply coupled (this service runs as main’s sidecar) and can’t be scaled independently.
The preprocessing stages are technically .mapAsyncUnordered
, but the whole thing is already very slim (stream stages and SQS batches/buffers).
We tried lowering the interstage buffer (akka.stream.materializer.max-input-buffer-size), but that only gives some indirect benefit, no direct control (and is too internal to be mucking with, for my taste anyway).
I tried implementing a “gate” wrapper which would limit the amount of elements allowed inside some arbitrary Flow
, looking something like:
class LimitingGate[T, U](originalFlow: Flow[T, U], maxInFlight: Int) {
private def in: InputGate[T] = ???
private def out: OutputGate[U] = ???
def gatedFlow: Flow[T, U, NotUsed] = Flow[T].via(in).via(originalFlow).via(out)
}
And using callbacks between the in/out gates for throttling.
The implementation partially works (stream termination is giving me a hard time), but it feels like the wrong way to go about achieving the actual goal.
Any ideas / comments / enlightening questions are appreciated
Thanks!
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…