diff --git a/Cargo.toml b/Cargo.toml index 361bb02..f0e7cf0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,8 @@ members = [ "examples" ] +resolver = "2" + [patch.crates-io] flashfunk-core = { path = "./core" } owned-log = { path = "./log" } diff --git a/core/Cargo.toml b/core/Cargo.toml index 681a543..44d59db 100755 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "flashfunk-core" -version = "0.5.0" +version = "0.6.0" authors = ["somewheve "] edition = "2021" @@ -11,20 +11,18 @@ harness = false [features] default = ["std"] +# feature can be used together with rust standard library. std = ["core_affinity"] # optional feature for small symbol type less than 8 bytes small-symbol = [] +# support for strategy and worker async communcation. async = ["std", "futures-core", "parking"] [dependencies] -cache-padded = "1.1.1" - -# core affinity support core_affinity = { version = "0.8", optional = true } -# async feature support futures-core = { version = "0.3", default-features = false, features = ["alloc"], optional = true } parking = { version = "2", optional = true } [dev-dependencies] -criterion = "0.4" +criterion = "0.5" diff --git a/core/src/api.rs b/core/src/api.rs index 98e7b23..f2f0fac 100644 --- a/core/src/api.rs +++ b/core/src/api.rs @@ -1,10 +1,10 @@ use super::builder::APIBuilder; use super::strategy::Strategy; -use super::util::channel::{GroupReceiver, GroupSender}; +use super::util::channel::{BroadcastSender, GroupReceiver}; pub trait API: Sized { /// message type from server to API and would be sent to strategies. - type SndMessage: Send; + type SndMessage: Send + Sync; /// message type from strategies to API and sent to server. type RecvMessage: Send; @@ -19,7 +19,7 @@ pub trait API: Sized { fn run( self, - sender: GroupSender, + sender: BroadcastSender, receiver: GroupReceiver, ); } diff --git a/core/src/builder.rs b/core/src/builder.rs index 6837a28..4441a1e 100644 --- a/core/src/builder.rs +++ b/core/src/builder.rs @@ -1,11 +1,10 @@ -use crate::util::channel::GroupIndex; use alloc::vec::Vec; use super::{ api::API, strategy::Strategy, util::{ - channel::{channel, GroupReceiver, GroupSender}, + channel::{broadcast, channel, GroupReceiver}, pin_to_core, }, worker::Worker, @@ -66,74 +65,29 @@ where // 收集核心cid let mut cores = pin_to_core::get_core_ids(); - // 单向spsc: + // 单向spmc: // API -> Strategies. - let mut senders = Vec::new(); + let (broadcast_tx, broadcast_rx) = broadcast(self.message_capacity); + // Strategies -> API. let mut receivers = Vec::new(); - // groups为与symbols相对应(vec index)的策略们的发送端vec. - let mut group = { - #[cfg(not(feature = "small-symbol"))] - { - crate::util::fx_hasher::FxHashMap::default() - } - - #[cfg(feature = "small-symbol")] - { - crate::util::no_hasher::NoHashMap::default() - } - }; - - for (st_idx, st) in strategies.into_iter().enumerate() { - st.symbol().iter().for_each(|symbol| { - let g = { - #[cfg(feature = "small-symbol")] - { - let bytes = symbol.as_bytes(); - - assert!(bytes.len() <= 8, "small-symbol feature require a symbol with no more than 8 bytes in length."); - - let mut buf = [0; 8]; - for (idx, char) in bytes.iter().enumerate() { - buf[idx] = *char; - } - - let symbol = u64::from_le_bytes(buf); - - group.entry(symbol).or_insert_with(GroupIndex::default) - } - - #[cfg(not(feature = "small-symbol"))] - { - group.entry(*symbol).or_insert_with(GroupIndex::default) - } - }; - - assert!(!g.contains(&st_idx)); - g.push(st_idx); - }); - - // API -> Strategies - let (s1, r1) = channel(message_capacity); - + for st in strategies.into_iter() { // Strategies -> API. - let (s2, r2) = channel(message_capacity); + let (tx, rx) = channel(message_capacity); - senders.push(s1); - receivers.push(r2); + receivers.push(rx); let id = pin_to_core.then(|| cores.pop()).flatten(); - Worker::new(st, s2, r1).run_in_core(id); + Worker::new(st, tx, broadcast_rx.clone()).run_in_core(id); } - let group_senders = GroupSender::<_, N>::new(senders, group); let group_receivers = GroupReceiver::<_, N>::from_vec(receivers); // 分配最后一个核心给主线程 let id = pin_to_core.then(|| cores.pop()).flatten(); pin_to_core::pin_to_core(id); - api.run(group_senders, group_receivers); + api.run(broadcast_tx, group_receivers); } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 53d045f..8071021 100755 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,4 +1,5 @@ extern crate alloc; +extern crate core; mod worker; @@ -11,16 +12,18 @@ pub mod util; mod test { use super::api::API; use super::strategy::{Context, Strategy}; - use super::util::channel::{channel, GroupReceiver, GroupSender, Sender}; + use super::util::channel::{BroadcastSender, GroupReceiver}; use alloc::vec::Vec; + use std::sync::mpsc; + struct Rem; #[derive(Default)] struct RemContext; - struct APIMessage(Sender); + struct APIMessage(mpsc::Sender); struct StrategyMessage(u32); @@ -30,30 +33,12 @@ mod test { fn run( self, - mut sender: GroupSender, + mut sender: BroadcastSender, mut receiver: GroupReceiver, ) { - #[cfg(feature = "small-symbol")] - { - let mut buf = [0; 8]; - for (idx, char) in "dgr123".as_bytes().iter().enumerate() { - buf[idx] = *char; - } + let (tx, rx) = mpsc::channel(); - assert_eq!( - sender.group().get(&u64::from_le_bytes(buf)).unwrap().len(), - 1 - ); - } - - #[cfg(not(feature = "small-symbol"))] - { - assert_eq!(sender.group().get("dgr123").unwrap().len(), 1); - } - - let (tx, mut rx) = channel(1); - - sender.send_to(APIMessage(tx), 0); + sender.send(APIMessage(tx)); #[cfg(not(feature = "async"))] { @@ -95,12 +80,9 @@ mod test { self.symbols.as_slice() } - fn call(&mut self, msg: ::SndMessage, ctx: &mut Context) { - let mut tx = msg.0; - + fn call(&mut self, msg: &::SndMessage, ctx: &mut Context) { ctx.sender().send(StrategyMessage(251)); - - tx.send(996u32); + msg.0.send(996u32).unwrap(); } } diff --git a/core/src/strategy.rs b/core/src/strategy.rs index 9ecb609..42329e5 100644 --- a/core/src/strategy.rs +++ b/core/src/strategy.rs @@ -13,7 +13,7 @@ where fn on_start(&mut self, ctx: &mut Context) {} /// Method called when a new message is received by strategy. - fn call(&mut self, msg: A::SndMessage, ctx: &mut Context); + fn call(&mut self, msg: &A::SndMessage, ctx: &mut Context); /// Method called when all message are processed by strategy and wait for next /// message to arrive. @@ -37,7 +37,7 @@ where } #[inline] - fn call(&mut self, msg: A::SndMessage, ctx: &mut Context) { + fn call(&mut self, msg: &A::SndMessage, ctx: &mut Context) { (**self).call(msg, ctx) } diff --git a/core/src/util/cache_padded.rs b/core/src/util/cache_padded.rs new file mode 100644 index 0000000..53eb1e1 --- /dev/null +++ b/core/src/util/cache_padded.rs @@ -0,0 +1,94 @@ +use core::ops::{Deref, DerefMut}; + +/// Pads and aligns a value to the length of a cache line. +#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)] +// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache +// lines at a time, so we have to align to 128 bytes rather than 64. +// +// Sources: +// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf +// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 +// +// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. +// +// Sources: +// - https://www.mono-project.com/news/2016/09/12/arm64-icache/ +// +// powerpc64 has 128-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 +#[cfg_attr( + any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + ), + repr(align(128)) +)] +// arm, mips, mips64, and riscv64 have 32-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 +#[cfg_attr( + any( + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + ), + repr(align(32)) +)] +// s390x has 256-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 +#[cfg_attr(target_arch = "s390x", repr(align(256)))] +// x86 and wasm have 64-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 +// +// All others are assumed to have 64-byte cache line size. +#[cfg_attr( + not(any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + target_arch = "s390x", + )), + repr(align(64)) +)] +pub(crate) struct CachePadded { + value: T, +} + +impl CachePadded { + /// Pads and aligns a value to the length of a cache line. + pub(crate) fn new(value: T) -> CachePadded { + CachePadded:: { value } + } +} + +impl Deref for CachePadded { + type Target = T; + + fn deref(&self) -> &T { + &self.value + } +} + +impl DerefMut for CachePadded { + fn deref_mut(&mut self) -> &mut T { + &mut self.value + } +} diff --git a/core/src/util/channel.rs b/core/src/util/channel.rs index ccdf4d4..0ef093e 100644 --- a/core/src/util/channel.rs +++ b/core/src/util/channel.rs @@ -1,7 +1,6 @@ use core::{ fmt::{Debug, Display, Formatter, Result as FmtResult}, ops::{Deref, DerefMut}, - ptr, }; use alloc::vec::Vec; @@ -9,7 +8,7 @@ use alloc::vec::Vec; #[cfg(feature = "async")] use {alloc::sync::Arc, futures_core::task::__internal::AtomicWaker}; -use super::spsc::{new, Consumer, Producer}; +use super::{spmc, spsc}; pub enum ChannelError { RecvError, @@ -51,7 +50,7 @@ impl Display for ChannelError { } pub struct Sender { - tx: Producer, + tx: spsc::Producer, #[cfg(feature = "async")] waker: Arc, } @@ -80,11 +79,56 @@ impl Sender { } pub struct Receiver { - rx: Consumer, + rx: spsc::Consumer, #[cfg(feature = "async")] waker: Arc, } +pub struct BroadcastSender { + tx: spmc::Producer, + #[cfg(feature = "async")] + waker: Arc, +} + +impl BroadcastSender { + // 发送失败会panic + #[inline] + pub fn send(&mut self, m: impl Into) { + self.tx.push(m.into()).unwrap(); + #[cfg(feature = "async")] + self.waker.wake(); + } + + // 发送失败返回消息 + #[inline] + pub fn try_send(&mut self, m: M) -> Result<(), ChannelError> { + match self.tx.push(m) { + Ok(_) => { + #[cfg(feature = "async")] + self.waker.wake(); + Ok(()) + } + Err(e) => Err(ChannelError::TrySendError(e.0)), + } + } +} + +pub struct BroadcastReceiver { + rx: spmc::Consumer, + #[cfg(feature = "async")] + waker: Arc, +} + +impl Clone for BroadcastReceiver { + fn clone(&self) -> Self { + Self { + rx: self.rx.clone(), + #[cfg(feature = "async")] + waker: Arc::new(AtomicWaker::new()), + } + } +} + #[cfg(not(feature = "async"))] pub use r#sync::*; @@ -93,16 +137,28 @@ mod r#sync { use super::*; pub fn channel(cap: usize) -> (Sender, Receiver) { - let (tx, rx) = new(cap); + let (tx, rx) = spsc::new(cap); (Sender { tx }, Receiver { rx }) } + pub fn broadcast(cap: usize) -> (BroadcastSender, BroadcastReceiver) { + let (tx, rx) = spmc::new(cap); + (BroadcastSender { tx }, BroadcastReceiver { rx }) + } + impl Receiver { #[inline] pub fn recv(&mut self) -> Result> { self.rx.pop().map_err(|_| ChannelError::RecvError) } } + + impl BroadcastReceiver { + #[inline] + pub fn recv(&mut self) -> Result<&M, ChannelError> { + self.rx.pop().ok_or(ChannelError::RecvError) + } + } } #[cfg(feature = "async")] @@ -119,7 +175,7 @@ mod r#async { }; pub fn channel(cap: usize) -> (Sender, Receiver) { - let (tx, rx) = new(cap); + let (tx, rx) = spsc::new(cap); let waker = Arc::new(AtomicWaker::new()); ( Sender { @@ -130,6 +186,18 @@ mod r#async { ) } + pub fn broadcast(cap: usize) -> (BroadcastSender, BroadcastReceiver) { + let (tx, rx) = spmc::new(cap); + let waker = Arc::new(AtomicWaker::new()); + ( + BroadcastSender { + tx, + waker: waker.clone(), + }, + BroadcastReceiver { rx, waker }, + ) + } + impl Receiver { #[inline] pub async fn recv(&mut self) -> Result> { @@ -154,6 +222,13 @@ mod r#async { } } + impl BroadcastReceiver { + #[inline] + pub async fn recv(&mut self) -> Result<&M, ChannelError> { + todo!() + } + } + impl GroupReceiver { pub async fn recv(&mut self) -> Result> { struct GroupReceiveFut<'a, M, const N: usize> { @@ -182,109 +257,6 @@ mod r#async { } } -#[cfg(feature = "small-symbol")] -pub(crate) type HashMap = super::no_hasher::NoHashMap>; - -#[cfg(feature = "small-symbol")] -type KeyRef<'a> = &'a u64; - -#[cfg(not(feature = "small-symbol"))] -pub(crate) type HashMap = super::fx_hasher::FxHashMap<&'static str, GroupIndex>; - -#[cfg(not(feature = "small-symbol"))] -type KeyRef<'a> = &'a str; - -pub struct GroupSender { - senders: [Sender; N], - group: HashMap, -} - -impl GroupSender { - pub fn new(sender: Vec>, group: HashMap) -> Self { - let this = Self { - senders: sender.try_into().ok().unwrap(), - group, - }; - // IMPORTANT: - // - // Don't remove. See GroupSender::try_send_group method for reason. - this.bound_check(); - this - } - - #[inline] - pub fn group(&self) -> &HashMap { - &self.group - } - - #[inline] - pub fn senders(&self) -> &[Sender] { - &self.senders - } - - // 发送至所有sender - #[inline] - pub fn send_all(&mut self, mm: MM) - where - MM: Into + Clone, - { - self.senders - .iter_mut() - .for_each(|s| s.send(mm.clone().into())) - } - - // 发送至指定index的sender. 失败会panic - #[inline] - pub fn send_to(&mut self, m: impl Into, sender_index: usize) { - self.senders[sender_index].send(m.into()); - } - - // 发送至指定index的sender. 失败会返回消息 - #[inline] - pub fn try_send_to(&mut self, m: MM, sender_index: usize) -> Result<(), ChannelError> - where - MM: Into, - { - match self.senders.get_mut(sender_index) { - Some(s) => { - s.send(m.into()); - Ok(()) - } - None => Err(ChannelError::SenderOverFlow(m)), - } - } - - // 发送至指定group. group查找失败失败会返回消息.(group内的sender发送失败会panic) - #[inline] - pub fn try_send_group(&mut self, mm: MM, symbol: KeyRef<'_>) -> Result<(), ChannelError> - where - MM: Into + Clone, - { - match self.group.get(symbol) { - Some(g) => { - g.iter().for_each(|i| { - // SAFETY: - // - // Self::bound_check guarantee i is in range of Sender's stack array. - unsafe { - self.senders.get_unchecked_mut(*i).send(mm.clone().into()); - } - }); - Ok(()) - } - None => Err(ChannelError::SenderGroupNotFound(mm)), - } - } - - #[cold] - #[inline(never)] - fn bound_check(&self) { - self.group - .iter() - .for_each(|(_, g)| g.iter().for_each(|i| assert!(*i < self.senders.len()))); - } -} - pub struct GroupReceiver { receivers: [Receiver; N], } @@ -310,101 +282,3 @@ impl DerefMut for GroupReceiver { &mut self.receivers } } - -/// a collection of Index of [GroupSender]'s `[Sender; N]`. -pub struct GroupIndex { - idx: [usize; N], - len: usize, -} - -impl Default for GroupIndex { - fn default() -> Self { - Self { - idx: [0; N], - len: 0, - } - } -} - -impl GroupIndex { - #[cold] - #[inline(never)] - pub(crate) fn push(&mut self, i: usize) { - assert_ne!(self.len, N, "GroupIndex is full"); - self.idx[self.len] = i; - self.len += 1; - } - - #[cold] - #[inline(never)] - pub(crate) fn contains(&self, idx: &usize) -> bool { - self.iter().any(|i| i == idx) - } - - fn iter(&self) -> impl Iterator { - // SAFETY: - // - // This is safe as self.len is bound checked against N with every GroupIndex::push call. - unsafe { &*ptr::slice_from_raw_parts(self.idx.as_ptr(), self.len) }.iter() - } - - #[allow(clippy::len_without_is_empty)] - #[inline] - pub fn len(&self) -> usize { - self.len - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - #[should_panic] - fn overflow() { - let mut group = GroupIndex::<1>::default(); - group.push(1); - group.push(2); - } - - #[test] - fn iter() { - let mut group = GroupIndex::<4>::default(); - group.push(1); - group.push(2); - group.push(4); - - { - let mut iter = group.iter(); - - assert_eq!(iter.next(), Some(&1)); - assert_eq!(iter.next(), Some(&2)); - assert_eq!(iter.next(), Some(&4)); - assert_eq!(iter.next(), None); - } - - group.push(8); - - let mut iter = group.iter(); - - assert_eq!(iter.next(), Some(&1)); - assert_eq!(iter.next(), Some(&2)); - assert_eq!(iter.next(), Some(&4)); - assert_eq!(iter.next(), Some(&8)); - assert_eq!(iter.next(), None); - } - - #[test] - fn len() { - let mut group = GroupIndex::<4>::default(); - - group.push(1); - assert_eq!(group.len(), 1); - - group.push(2); - assert_eq!(group.len(), 2); - - group.push(4); - assert_eq!(group.len(), 3); - } -} diff --git a/core/src/util/fx_hasher.rs b/core/src/util/fx_hasher.rs index 2d488a8..8a38fc8 100644 --- a/core/src/util/fx_hasher.rs +++ b/core/src/util/fx_hasher.rs @@ -33,6 +33,11 @@ impl FxHasher { } impl Hasher for FxHasher { + #[inline] + fn finish(&self) -> u64 { + self.hash as u64 + } + #[inline] fn write(&mut self, mut bytes: &[u8]) { #[cfg(target_pointer_width = "32")] @@ -92,9 +97,4 @@ impl Hasher for FxHasher { fn write_usize(&mut self, i: usize) { self.add_to_hash(i); } - - #[inline] - fn finish(&self) -> u64 { - self.hash as u64 - } } diff --git a/core/src/util/mod.rs b/core/src/util/mod.rs index f9011ab..b740b75 100644 --- a/core/src/util/mod.rs +++ b/core/src/util/mod.rs @@ -1,3 +1,5 @@ +mod cache_padded; +mod spmc; mod spsc; #[cfg(feature = "async")] diff --git a/core/src/util/spmc.rs b/core/src/util/spmc.rs new file mode 100644 index 0000000..08a82e3 --- /dev/null +++ b/core/src/util/spmc.rs @@ -0,0 +1,180 @@ +use core::{ + fmt, mem, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use alloc::{sync::Arc, vec::Vec}; + +use super::cache_padded::CachePadded; + +struct Inner { + tail: CachePadded, + cap: usize, + buffer: *mut T, +} + +/// construct append only single producer and multi consumer channel. +/// the channel can only hold given size of items and item can not be removed from channel. +pub fn new(cap: usize) -> (Producer, Consumer) { + assert!(cap > 0, "capacity must be non-zero"); + + let buffer = { + let mut v = Vec::::with_capacity(cap); + let ptr = v.as_mut_ptr(); + mem::forget(v); + ptr + }; + + let inner = Arc::new(Inner { + tail: CachePadded::new(AtomicUsize::new(0)), + cap, + buffer, + }); + + let p = Producer { + tail: 0, + inner: inner.clone(), + }; + + let c = Consumer { + inner, + next: 0, + tail: 0, + }; + + (p, c) +} + +impl Drop for Inner { + fn drop(&mut self) { + let tail = self.tail.load(Ordering::Acquire); + // SAFETY: + // pointer and capacity is constructed through Vec and upholds the safety. + // tail is incremented every time a new T added to the vec and upholds the safety. + let _vec = unsafe { Vec::::from_raw_parts(self.buffer, tail, self.cap) }; + } +} + +pub struct Producer { + tail: usize, + inner: Arc>, +} + +unsafe impl Send for Producer {} + +impl Producer { + pub fn push(&mut self, val: T) -> Result<(), PushError> { + if self.tail == self.inner.cap { + return Err(PushError(val)); + } + + // SAFETY: + // just checked for out of bound. + unsafe { self.inner.buffer.add(self.tail).write(val) }; + + self.tail += 1; + self.inner.tail.fetch_add(1, Ordering::Release); + + Ok(()) + } +} + +pub struct Consumer { + inner: Arc>, + next: usize, + tail: usize, +} + +impl Clone for Consumer { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + next: 0, + tail: 0, + } + } +} + +unsafe impl Send for Consumer {} + +impl Consumer { + pub fn pop(&mut self) -> Option<&T> { + if self.next == self.tail { + self.tail = self.inner.tail.load(Ordering::Relaxed); + } + + if self.next == self.tail { + return None; + } + + if self.next == self.inner.cap { + return None; + } + + // SAFETY: + // just checked for out of bound. + let next = unsafe { &*self.inner.buffer.add(self.next) }; + self.next += 1; + + Some(next) + } +} + +/// Error which occurs when pushing into a full queue. +#[derive(Clone, Copy, Eq, PartialEq)] +pub struct PushError(pub T); + +impl fmt::Debug for PushError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("PushError(..)") + } +} + +impl fmt::Display for PushError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("pushing into a full channel") + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn push_pop() { + let (mut p, c) = new::(2); + + p.push(996).unwrap(); + p.push(251).unwrap(); + p.push(666).unwrap_err(); + + let mut c1 = c.clone(); + let mut c2 = c.clone(); + + assert_eq!(c1.pop().unwrap(), &996); + assert_eq!(c2.pop().unwrap(), &996); + + assert_eq!(c1.pop().unwrap(), &251); + assert_eq!(c2.pop().unwrap(), &251); + + assert!(c1.pop().is_none()); + assert!(c2.pop().is_none()); + } + + #[test] + fn drop_test() { + let (mut p, c) = new::>(2); + + let item = Arc::new(()); + + p.push(item.clone()).unwrap(); + + assert_eq!(Arc::strong_count(&item), 2); + + drop(p); + assert_eq!(Arc::strong_count(&item), 2); + + drop(c); + assert_eq!(Arc::strong_count(&item), 1); + } +} diff --git a/core/src/util/spsc.rs b/core/src/util/spsc.rs index 0c419b6..9cfed3b 100644 --- a/core/src/util/spsc.rs +++ b/core/src/util/spsc.rs @@ -11,7 +11,7 @@ use core::{ use alloc::{sync::Arc, vec::Vec}; -use cache_padded::CachePadded; +use super::cache_padded::CachePadded; /// The inner representation of a single-producer single-consumer queue. struct Inner { diff --git a/core/src/worker.rs b/core/src/worker.rs index 07c1d1f..ff1c293 100644 --- a/core/src/worker.rs +++ b/core/src/worker.rs @@ -1,7 +1,7 @@ use super::api::API; use super::strategy::{Context, Strategy}; use super::util::{ - channel::{Receiver, Sender}, + channel::{BroadcastReceiver, Sender}, pin_to_core::{self, CoreId}, }; @@ -12,7 +12,7 @@ where { strategy: S, sender: Sender, - receiver: Receiver, + receiver: BroadcastReceiver, } impl Worker @@ -23,7 +23,7 @@ where pub(super) fn new( strategy: S, sender: Sender, - receiver: Receiver, + receiver: BroadcastReceiver, ) -> Self { Self { strategy, diff --git a/examples/Cargo.toml b/examples/Cargo.toml index c4e0282..389ec27 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -13,7 +13,7 @@ name = "log" path = "./src/log.rs" [dependencies] -flashfunk-core = { version = "0.5" } +flashfunk-core = { version = "0.6" } owned-log = { version = "0.1" } crossbeam-queue = "0.3.5" diff --git a/examples/src/websocket.rs b/examples/src/websocket.rs index 7112522..9fc7370 100644 --- a/examples/src/websocket.rs +++ b/examples/src/websocket.rs @@ -5,7 +5,7 @@ use std::io; use flashfunk_core::{ api::API, strategy::{Context, Strategy}, - util::channel::{GroupReceiver, GroupSender}, + util::channel::{BroadcastSender, GroupReceiver}, }; use futures_util::{SinkExt, StreamExt}; use xitca_client::{bytes::Bytes, error::Error, http::Version, ws::Message, Client}; @@ -20,7 +20,7 @@ impl API for WsAPI { fn run( self, - mut sender: GroupSender, + mut sender: BroadcastSender, mut receiver: GroupReceiver, ) { let res = tokio::runtime::Builder::new_current_thread() @@ -46,7 +46,7 @@ impl API for WsAPI { res = ws.next() => { let msg = res.ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??; match msg { - Message::Text(bytes) | Message::Binary(bytes) => sender.send_all(bytes), + Message::Text(bytes) | Message::Binary(bytes) => sender.send(bytes), Message::Ping(bytes) => ws.send(Message::Pong(bytes)).await?, Message::Close(reason) => { ws.send(Message::Close(reason)).await?; @@ -78,7 +78,7 @@ impl Strategy for WsStrategy { self.symbols.as_slice() } - fn call(&mut self, msg: Bytes, ctx: &mut Context) { + fn call(&mut self, msg: &Bytes, ctx: &mut Context) { println!("Message from WsAPI: {}\r\n", String::from_utf8_lossy(&msg)); ctx.sender().send(StrategyMessage( self.symbol()