Skip to content

Conversation

@lalitb
Copy link
Member

@lalitb lalitb commented Jan 23, 2026

Fan-out Processor Implementation

Implements all four discussed scenarios:

Scenario Config Description
1 mode: parallel, await_ack: primary Duplicate to all, wait for primary only
2 mode: parallel, await_ack: all Duplicate to all, wait for all (with per-destination timeout)
3 mode: sequential Send one-by-one, advance after ack
4 fallback_for: <port> Failover to backup on nack/timeout

Why Stateful (not Stateless like Go collector)

The Go Collector's fanout is stateless because it uses synchronous, blocking calls:

err := consumer.ConsumeLogs(ctx, ld)  // blocks until complete, error returns directly

Our OTAP engine uses async message passing with explicit ack/nack routing:

effect_handler.send_message_to(port, pdata).await?;  // returns immediately
// ack arrives later as separate NodeControlMsg::Ack

I explored making scenarios 1 and 3 stateless but hit three blockers:

  1. subscribe_to() mutates context - Fanout must subscribe to receive acks, which pushes a frame onto the context stack. For correct upstream routing, we need the original pdata (pre-subscription). We cannot use ack.accepted from downstream.

  2. Downstream may mutate/drop payloads - into_parts(), transformers, and filters mean we can't rely on getting intact pdata back in ack/nack messages.

  3. Sequential/fallback/timeout require coordination - Need to know which destination is active, when to advance to the next, and when to trigger fallbacks or finish.

Even if downstream guaranteed returning intact payloads, we'd still need state for await_all completion tracking, fallback chains, and sequential advancement. The only gain would be a minor memory optimization (not storing original_pdata), not true statelessness.

Adopting Go's synchronous model would require fundamental engine architecture changes, not just fanout changes.

Memory Optimizations

While full statelessness isn't possible, I have implemented fast paths to minimize allocations for common configurations:

Configuration Fast Path State Per Request
await_ack: none Fire-and-forget None (zero inflight tracking)
parallel + primary + no fallback + no timeout Slim primary Minimal (request_id → original_pdata)
All other configs Full Complete endpoint tracking

Fast Path Details

  • Fire-and-forget (await_ack: none)
    Bypasses all inflight state. Clone, send, and ACK upstream immediately.
    Zero allocations per request.

  • Slim primary path
    Uses a tiny HashMap<u64, OtapPdata> instead of the full Inflight struct with EndpointVec.
    Ignores non-primary ACKs and NACKs.

  • Full path
    Required for:

    • Sequential mode
    • await_all
    • Any fallback
    • Any timeout

    Tracks all endpoints and request state.

Code Structure

Inflight holds per-request state:

  • original_pdata - pre-subscription pdata, used for all upstream acks/nacks
  • endpoints[] - per-destination status (Acked/Nacked/InFlight/PendingSend)
  • next_send_queue - drives sequential mode advancement
  • completed_origins - tracks completion for await_ack: all
  • timeout_at - per-destination deadlines for timeout/fallback triggering

Not all fields are used for every scenario, but the overhead is minimal - empty HashSets don't allocate, SmallVec is inline for ≤4 items, and clone cost is O(1) for bytes::Bytes.

Documentation

See crates/otap/src/fanout_processor/README.md for configuration examples and behavior details.

@lalitb lalitb requested a review from a team as a code owner January 23, 2026 20:46
@lalitb lalitb marked this pull request as draft January 23, 2026 20:46
@github-actions github-actions bot added the rust Pull requests that update Rust code label Jan 23, 2026
@codecov
Copy link

codecov bot commented Jan 23, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 81.84%. Comparing base (6ad291b) to head (a2bd621).

Additional details and impacted files
@@             Coverage Diff             @@
##             main    #1878       +/-   ##
===========================================
- Coverage   85.18%   81.84%    -3.35%     
===========================================
  Files         508      181      -327     
  Lines      153846    51433   -102413     
===========================================
- Hits       131047    42093    -88954     
+ Misses      22265     8806    -13459     
  Partials      534      534               
Components Coverage Δ
otap-dataflow ∅ <ø> (∅)
query_abstraction 80.61% <ø> (ø)
query_engine 90.23% <ø> (-0.35%) ⬇️
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 53.50% <ø> (ø)
quiver ∅ <ø> (∅)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@lalitb lalitb changed the title WIP: Fanout-processor Fanout Processor Jan 25, 2026
@lalitb
Copy link
Member Author

lalitb commented Jan 25, 2026

Fanout now runs stateless in one case (await_ack: none fire-and-forget) and slim state in one case (parallel + primary-only with no fallback/timeout: just request_id -> original_pdata). All other configuration remains fully stateful to preserve ordering, failover, and upstream routing semantics. Have updated the PR desc and Readme.md accordingly. Ready for review with that explicit split.

@lalitb lalitb marked this pull request as ready for review January 25, 2026 08:24
@lquerel
Copy link
Contributor

lquerel commented Jan 26, 2026

Regarding the stateless vs stateful debate, I'm wondering if it's actually more a question of stack vs stackless, since we're based on message passing. I'm not a Go Collector specialist, but from what I understand the state is kept on the stack across the different sequential calls. So in the end, the presence and management of state don't seem that different to me.

In our case, having support for both a parallel mode and a sequential mode should allow users to choose between speed with limited control, or a precisely defined sequence that can be interrupted at any step. That would make this fanout processor something truly powerful and expressive.

@lalitb
Copy link
Member Author

lalitb commented Jan 26, 2026

Regarding the stateless vs stateful debate, I'm wondering if it's actually more a question of stack vs stackless, since we're based on message passing. I'm not a Go Collector specialist, but from what I understand the state is kept on the stack across the different sequential calls. So in the end, the presence and management of state don't seem that different to me.

You're right - state always exists somewhere. In Go fanout it's implicit on the call stack during the synchronous call, then gone when it returns; in our async pipeline we make it explicit via the message's context stack or a map when coordinating multiple outcomes. The key difference is blocking vs non-blocking - Go's blocking model gives implicit correlation, while our async model requires explicit correlation since acks arrive later as separate messages.

In our case, having support for both a parallel mode and a sequential mode should allow users to choose between speed with limited control, or a precisely defined sequence that can be interrupted at any step. That would make this fanout processor something truly powerful and expressive.

Yes, parallel vs sequential gives users that flexibility - speed when order doesn't matter, precise control when it does. Currently Go collector fanout is only sequential.

Copy link
Contributor

@lquerel lquerel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a very good first PR on this new processor, which is definitely complex (and will likely become even more so in the future). I have a few comments that I think should improve a couple of things here and there.

Comment on lines 170 to 172
| `sent` | Requests dispatched (per incoming PData) |
| `acked` | Requests successfully acked upstream |
| `nacked` | Requests nacked upstream |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There metrics will be redondant with the existing pdata channel metrics.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These metrics answer different questions:

  • Channel metrics: Per-destination sends/receives/errors (one count per destination)
  • Fanout metrics: Aggregated request outcomes after await_ack/fallback logic

Specifically:

sent/acked/nacked - request-level outcomes, not per-destination
timed_out - destination-level (how many destinations hit their timeout)
rejected_max_inflight - fanout-only, channels don't track this

For example, with await_ack: primary + fallback: if primary fails but fallback succeeds, fanout reports 1 ack. Channel metrics would show 1 nack + 1 ack across destinations.

Happy to remove these if we see metrics bloat - some could potentially be aggregated at channel level to get the equivalent.

- **Fallback cycles**: Detected and rejected at config validation
- **Fallback with `await_ack: none`**: Rejected; fire-and-forget ignores fallbacks
- **Timeout with `await_ack: none`**: Rejected; fire-and-forget doesn't track responses
- **Shutdown**: Inflight requests are dropped (not proactively nacked)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure to understand this one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed the original text was terse. Updated to:

Shutdown: Inflight state is dropped; no nacks sent to upstream. Upstream will not receive notification for in-progress requests.

This clarifies that on shutdown, we don't proactively nack pending requests - upstream simply won't hear back. Downstream may still process them successfully; we just won't see the ack.

Alternative would be to nack all inflight requests on shutdown, so upstream knows they weren't confirmed. But:

  • Downstream may still process them successfully (we just won't see the ack)
  • Proactive nacking could cause duplicate processing if upstream retries
  • Shutdown is typically a terminal event anyway

If you think we should nack inflight on shutdown (fail-safe), I can add that in next PR, and add tracking issue for now.

@jmacd
Copy link
Contributor

jmacd commented Jan 27, 2026

I think we can overcome the blockers and avoid memory allocation for the "slim primary" case, meaning really this case ought to work and we can modify OtapPdata and/or Context to help. Here's how I think this will work:

When the request arrives, use into_parts() to split into context/data; clone the data once for each output. For the primary request, use OtapPdata::new(original_context, data), and for the other requests (for now) we can use Context::new() creating an empty context. Send all the requests in parallel.

You don't need any calls to subscribe_to() in this arrangement, since the primary keeps the context it had any Ack/Nack it receives will go straight through the fanout processor directly to the original recipient. If we have other useful information in the Context (later), we will want a way to erase the subscription information and keep everything else (I think).

Copy link
Contributor

@jmacd jmacd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lalitb
Copy link
Member Author

lalitb commented Jan 28, 2026

I think we can overcome the blockers and avoid memory allocation for the "slim primary" case, meaning really this case ought to work and we can modify OtapPdata and/or Context to help. Here's how I think this will work:

When the request arrives, use into_parts() to split into context/data; clone the data once for each output. For the primary request, use OtapPdata::new(original_context, data), and for the other requests (for now) we can use Context::new() creating an empty context. Send all the requests in parallel.

You don't need any calls to subscribe_to() in this arrangement, since the primary keeps the context it had any Ack/Nack it receives will go straight through the fanout processor directly to the original recipient. If we have other useful information in the Context (later), we will want a way to erase the subscription information and keep everything else (I think).

Interesting idea - keeping the original context on primary so acks route directly upstream would eliminate the subscription overhead entirely.

Right now the slim path still needs the subscription so fanout can see the primary's ack/nack - it clears slim_inflight, bumps metrics, and enforces max_inflight. Letting acks bypass fanout would leave entries dangling and break backpressure. Non-primary acks would also be unrouted/dropped with empty contexts, which is fine only if the engine treats them as benign.

I'm open to a zero-alloc slim path, but it would require engine support for an "observer" mode where the fanout can see the primary outcome without adding a subscription frame, or a way to strip/restore subscription state in Context while still delivering the control message back to the fanout.

Given the tiny context clone cost today, I kept the existing slim path. Happy to revisit with a concrete engine change or a safe "passthrough observer" design.

@lalitb
Copy link
Member Author

lalitb commented Jan 28, 2026

I think this is a very good first PR on this new processor, which is definitely complex (and will likely become even more so in the future). I have a few comments that I think should improve a couple of things here and there.

Thanks for the thorough review! Addressed the comments in this PR and created tracking issues for follow-up improvements.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

3 participants