Struct BybitFeeder

Source
#[repr(align(64))]
pub struct BybitFeeder { /* private fields */ }
Expand description

Bybit market data feeder optimized for HFT applications

This feeder implementation is specifically designed for high-frequency trading (HFT) scenarios with the following performance characteristics:

  • Zero-allocation critical paths using SmallVec and stack allocation
  • Lock-minimization with fine-grained locking for maximum concurrency
  • Cache-line alignment for optimal CPU cache efficiency
  • CPU pinning for dedicated core processing
  • Batch processing for efficient trade and orderbook updates
  • Comprehensive performance statistics tracking
  • Real-time shared orderbooks with minimal contention
  • Time synchronization with nanosecond precision
  • Efficient statistics and monitoring

§Performance Settings

Performance can be tuned via the FeederOptions:

  • channel_buffer_size: Size of message buffers (default: 1024)
  • max_depth_levels: Maximum depth levels for orderbooks (default: 100)
  • batch_size: Batch message processing size (default: 32)
  • cpu_affinity: Pin to specific CPU core (default: -1, no pinning)
  • use_zero_copy: Enable zero-copy processing where possible (default: true)
  • max_backlog: Maximum message backlog before dropping (default: 10000)

§Performance Notes

Software prefetching has been removed as it provided minimal benefit:

  • Sequential access patterns are already optimized by CPU hardware prefetcher
  • Immediate usage after prefetch negates benefits
  • Small data structures (Decimal ~16 bytes) fit multiple elements per cache line

Focus areas for HFT optimization:

  • CPU pinning to dedicated cores
  • Cache-line aligned data structures
  • Minimizing lock contention with SkipMap
  • Batch processing for large updates
  • Zero-copy where possible

§Examples

use rusty_feeder::bybit::feeder::BybitFeeder;
use rusty_feeder::feeder::{Feeder, FeederOptions};
use rusty_model::instruments::InstrumentId;
use rusty_model::venues::Venue;

async fn subscribe_spot_trades() {
    // Create new Bybit feeder
    let feeder = BybitFeeder::new();

    // Configure options for low latency
    let options = FeederOptions {
        channel_buffer_size: 4096,
        cpu_affinity: 5, // Pin to CPU core 5
        batch_size: 64,
        use_zero_copy: true,
        max_depth_levels: 20,
        max_backlog: 50000,
        ..Default::default()
    };

    // Get a receiver for normalized trades
    let instrument = InstrumentId::new("BTCUSDT".to_string(), Venue::Bybit);
    let trade_rx = feeder.subscribe_trades(instrument.clone()).await.unwrap();
    let normalized_trades_rx = feeder.start_feed_trades(
        instrument,
        trade_rx,
        Some(options)
    ).await.unwrap();
}

Implementations§

Source§

impl BybitFeeder

Source

pub fn new() -> Self

Create a new Bybit feeder with default settings

This constructor initializes a new Bybit feeder with a default high-precision clock and empty state containers. The feeder is immediately ready to process market data.

§Example
use rusty_feeder::bybit::feeder::BybitFeeder;

let feeder = BybitFeeder::new();
Source

pub fn with_clock(clock: Clock) -> Self

Create a new Bybit feeder with a custom high-precision clock

This constructor allows providing a pre-configured clock, which can be useful for:

  • Testing with a controlled clock source
  • Synchronizing multiple feeders with the same clock
  • Using a clock with specific adjustments for latency compensation
§Parameters
  • clock - A pre-configured quanta::Clock instance for high-precision timing
§Example
use rusty_feeder::bybit::feeder::BybitFeeder;
use quanta::Clock;

// Create a clock with specific configuration
let clock = Clock::new();

// Create feeder with the custom clock
let feeder = BybitFeeder::with_clock(clock);
Source

pub fn get_all_stats(&self) -> FxHashMap<String, FeedStats>

Get the current feed statistics for all active feeds

Returns a copy of the current statistics map, which can be used for monitoring and debugging the performance of the feeder.

§Returns
  • A HashMap mapping feed identifiers to their statistics
Source

pub fn memory_usage(&self) -> usize

Get the memory usage of this feeder instance in bytes

This method provides an estimate of the memory used by the feeder’s internal data structures, which can be useful for monitoring resource usage.

§Returns
  • Estimated memory usage in bytes

Trait Implementations§

Source§

impl Debug for BybitFeeder

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for BybitFeeder

Source§

fn default() -> Self

Returns the “default value” for a type. Read more
Source§

impl Feeder for BybitFeeder

Source§

fn get_shared_orderbook<'life0, 'life1, 'async_trait>( &'life0 self, instrument_id: &'life1 InstrumentId, ) -> Pin<Box<dyn Future<Output = Result<SharedSimdOrderBook>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get a realtime shared orderbook for a symbol

Source§

fn get_bar_cache<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, instrument_id: &'life1 InstrumentId, bar_type: &'life2 BarType, max_bars: usize, ) -> Pin<Box<dyn Future<Output = Result<Arc<RwLock<BarCache>>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Get a bar cache for a symbol and bar type

Source§

fn get_stats<'life0, 'life1, 'async_trait>( &'life0 self, instrument_id: &'life1 InstrumentId, ) -> Pin<Box<dyn Future<Output = Result<FeedStats>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get feed statistics for a specific instrument Returns performance data for the instrument’s feeds

Source§

fn reset_stats<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Reset all feed statistics Clears all collected performance statistics

Source§

type DepthMessage = OrderbookResponse

Raw message type from the exchange for depth/orderbook
Source§

type TradeMessage = TradeResponse

Raw message type from the exchange for trades
Source§

fn start_feed_depth<'life0, 'async_trait>( &'life0 self, instrument_id: InstrumentId, depth_rx: Receiver<Self::DepthMessage>, options: Option<FeederOptions>, ) -> Pin<Box<dyn Future<Output = Result<Receiver<OrderBookSnapshot>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Start processing and normalizing orderbook depth data Processes raw depth messages into standardized order book depth format
Source§

fn stop_feed_depth<'life0, 'life1, 'async_trait>( &'life0 self, instrument_id: &'life1 InstrumentId, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Stop processing orderbook depth data for a specific instrument
Source§

fn start_feed_trades<'life0, 'async_trait>( &'life0 self, instrument_id: InstrumentId, trade_rx: Receiver<Self::TradeMessage>, options: Option<FeederOptions>, ) -> Pin<Box<dyn Future<Output = Result<Receiver<MarketTrade>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Start processing and normalizing trade data Processes raw trade messages into standardized trade format
Source§

fn stop_feed_trades<'life0, 'life1, 'async_trait>( &'life0 self, instrument_id: &'life1 InstrumentId, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Stop processing trade data for a specific instrument
Source§

fn start_feed_bars<'life0, 'async_trait>( &'life0 self, instrument_id: InstrumentId, bar_type: BarType, trade_rx: Receiver<MarketTrade>, options: Option<FeederOptions>, ) -> Pin<Box<dyn Future<Output = Result<Receiver<Bar>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Start generating bars from trade data Aggregates trade data into OHLCV bars of the specified type
Source§

fn stop_feed_bars<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, instrument_id: &'life1 InstrumentId, bar_type: &'life2 BarType, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Stop generating bars for a specific instrument and bar type

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> ErasedDestructor for T
where T: 'static,