Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ members = [
"examples"
]

resolver = "2"

[patch.crates-io]
flashfunk-core = { path = "./core" }
owned-log = { path = "./log" }
Expand Down
10 changes: 4 additions & 6 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "flashfunk-core"
version = "0.5.0"
version = "0.6.0"
authors = ["somewheve <somewheve@gmail.com>"]
edition = "2021"

Expand All @@ -11,20 +11,18 @@ harness = false

[features]
default = ["std"]
# feature can be used together with rust standard library.
std = ["core_affinity"]
# optional feature for small symbol type less than 8 bytes
small-symbol = []
# support for strategy and worker async communcation.
async = ["std", "futures-core", "parking"]

[dependencies]
cache-padded = "1.1.1"

# core affinity support
core_affinity = { version = "0.8", optional = true }

# async feature support
futures-core = { version = "0.3", default-features = false, features = ["alloc"], optional = true }
parking = { version = "2", optional = true }

[dev-dependencies]
criterion = "0.4"
criterion = "0.5"
6 changes: 3 additions & 3 deletions core/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use super::builder::APIBuilder;
use super::strategy::Strategy;
use super::util::channel::{GroupReceiver, GroupSender};
use super::util::channel::{BroadcastSender, GroupReceiver};

pub trait API: Sized {
/// message type from server to API and would be sent to strategies.
type SndMessage: Send;
type SndMessage: Send + Sync;

/// message type from strategies to API and sent to server.
type RecvMessage: Send;
Expand All @@ -19,7 +19,7 @@ pub trait API: Sized {

fn run<const N: usize>(
self,
sender: GroupSender<Self::SndMessage, N>,
sender: BroadcastSender<Self::SndMessage>,
receiver: GroupReceiver<Self::RecvMessage, N>,
);
}
64 changes: 9 additions & 55 deletions core/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::util::channel::GroupIndex;
use alloc::vec::Vec;

use super::{
api::API,
strategy::Strategy,
util::{
channel::{channel, GroupReceiver, GroupSender},
channel::{broadcast, channel, GroupReceiver},
pin_to_core,
},
worker::Worker,
Expand Down Expand Up @@ -66,74 +65,29 @@ where
// 收集核心cid
let mut cores = pin_to_core::get_core_ids();

// 单向spsc:
// 单向spmc:
// API -> Strategies.
let mut senders = Vec::new();
let (broadcast_tx, broadcast_rx) = broadcast(self.message_capacity);

// Strategies -> API.
let mut receivers = Vec::new();

// groups为与symbols相对应(vec index)的策略们的发送端vec.
let mut group = {
#[cfg(not(feature = "small-symbol"))]
{
crate::util::fx_hasher::FxHashMap::default()
}

#[cfg(feature = "small-symbol")]
{
crate::util::no_hasher::NoHashMap::default()
}
};

for (st_idx, st) in strategies.into_iter().enumerate() {
st.symbol().iter().for_each(|symbol| {
let g = {
#[cfg(feature = "small-symbol")]
{
let bytes = symbol.as_bytes();

assert!(bytes.len() <= 8, "small-symbol feature require a symbol with no more than 8 bytes in length.");

let mut buf = [0; 8];
for (idx, char) in bytes.iter().enumerate() {
buf[idx] = *char;
}

let symbol = u64::from_le_bytes(buf);

group.entry(symbol).or_insert_with(GroupIndex::default)
}

#[cfg(not(feature = "small-symbol"))]
{
group.entry(*symbol).or_insert_with(GroupIndex::default)
}
};

assert!(!g.contains(&st_idx));
g.push(st_idx);
});

// API -> Strategies
let (s1, r1) = channel(message_capacity);

for st in strategies.into_iter() {
// Strategies -> API.
let (s2, r2) = channel(message_capacity);
let (tx, rx) = channel(message_capacity);

senders.push(s1);
receivers.push(r2);
receivers.push(rx);

let id = pin_to_core.then(|| cores.pop()).flatten();
Worker::new(st, s2, r1).run_in_core(id);
Worker::new(st, tx, broadcast_rx.clone()).run_in_core(id);
}

let group_senders = GroupSender::<_, N>::new(senders, group);
let group_receivers = GroupReceiver::<_, N>::from_vec(receivers);

// 分配最后一个核心给主线程
let id = pin_to_core.then(|| cores.pop()).flatten();
pin_to_core::pin_to_core(id);

api.run(group_senders, group_receivers);
api.run(broadcast_tx, group_receivers);
}
}
38 changes: 10 additions & 28 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
extern crate alloc;
extern crate core;

mod worker;

Expand All @@ -11,16 +12,18 @@ pub mod util;
mod test {
use super::api::API;
use super::strategy::{Context, Strategy};
use super::util::channel::{channel, GroupReceiver, GroupSender, Sender};
use super::util::channel::{BroadcastSender, GroupReceiver};

use alloc::vec::Vec;

use std::sync::mpsc;

struct Rem;

#[derive(Default)]
struct RemContext;

struct APIMessage(Sender<u32>);
struct APIMessage(mpsc::Sender<u32>);

struct StrategyMessage(u32);

Expand All @@ -30,30 +33,12 @@ mod test {

fn run<const N: usize>(
self,
mut sender: GroupSender<Self::SndMessage, N>,
mut sender: BroadcastSender<Self::SndMessage>,
mut receiver: GroupReceiver<Self::RecvMessage, N>,
) {
#[cfg(feature = "small-symbol")]
{
let mut buf = [0; 8];
for (idx, char) in "dgr123".as_bytes().iter().enumerate() {
buf[idx] = *char;
}
let (tx, rx) = mpsc::channel();

assert_eq!(
sender.group().get(&u64::from_le_bytes(buf)).unwrap().len(),
1
);
}

#[cfg(not(feature = "small-symbol"))]
{
assert_eq!(sender.group().get("dgr123").unwrap().len(), 1);
}

let (tx, mut rx) = channel(1);

sender.send_to(APIMessage(tx), 0);
sender.send(APIMessage(tx));

#[cfg(not(feature = "async"))]
{
Expand Down Expand Up @@ -95,12 +80,9 @@ mod test {
self.symbols.as_slice()
}

fn call(&mut self, msg: <Rem as API>::SndMessage, ctx: &mut Context<Rem>) {
let mut tx = msg.0;

fn call(&mut self, msg: &<Rem as API>::SndMessage, ctx: &mut Context<Rem>) {
ctx.sender().send(StrategyMessage(251));

tx.send(996u32);
msg.0.send(996u32).unwrap();
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ where
fn on_start(&mut self, ctx: &mut Context<A>) {}

/// Method called when a new message is received by strategy.
fn call(&mut self, msg: A::SndMessage, ctx: &mut Context<A>);
fn call(&mut self, msg: &A::SndMessage, ctx: &mut Context<A>);

/// Method called when all message are processed by strategy and wait for next
/// message to arrive.
Expand All @@ -37,7 +37,7 @@ where
}

#[inline]
fn call(&mut self, msg: A::SndMessage, ctx: &mut Context<A>) {
fn call(&mut self, msg: &A::SndMessage, ctx: &mut Context<A>) {
(**self).call(msg, ctx)
}

Expand Down
94 changes: 94 additions & 0 deletions core/src/util/cache_padded.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use core::ops::{Deref, DerefMut};

/// Pads and aligns a value to the length of a cache line.
#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)]
// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
// lines at a time, so we have to align to 128 bytes rather than 64.
//
// Sources:
// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
//
// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
//
// Sources:
// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
//
// powerpc64 has 128-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
#[cfg_attr(
any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
),
repr(align(128))
)]
// arm, mips, mips64, and riscv64 have 32-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7
#[cfg_attr(
any(
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "riscv64",
),
repr(align(32))
)]
// s390x has 256-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
#[cfg_attr(target_arch = "s390x", repr(align(256)))]
// x86 and wasm have 64-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
//
// All others are assumed to have 64-byte cache line size.
#[cfg_attr(
not(any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "riscv64",
target_arch = "s390x",
)),
repr(align(64))
)]
pub(crate) struct CachePadded<T> {
value: T,
}

impl<T> CachePadded<T> {
/// Pads and aligns a value to the length of a cache line.
pub(crate) fn new(value: T) -> CachePadded<T> {
CachePadded::<T> { value }
}
}

impl<T> Deref for CachePadded<T> {
type Target = T;

fn deref(&self) -> &T {
&self.value
}
}

impl<T> DerefMut for CachePadded<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.value
}
}
Loading