Skip to content

Latest commit

 

History

History
351 lines (289 loc) · 17.7 KB

File metadata and controls

351 lines (289 loc) · 17.7 KB

AGENTS.md

Taskflow is a header-only C++20 library for parallel and heterogeneous task programming. You express work as a directed acyclic graph (DAG) of tasks, and Taskflow's work-stealing executor runs them in parallel.

Detailed guides (read when you need them):

Field notes (specialized; skip unless relevant):

Profiling: for diagnostic profiling of Taskflow apps, prefer tf::DigestObserver (~2 KB structured digest, O(1) memory, all thresholds calibrated per-run). It's a drop-in custom observer — see docs/taskflow-performance.md Section 7 and INTEGRATION.md for integration details.


Installation

g++ -std=c++20 -O2 -pthread my_program.cpp -I /path/to/taskflow
#include <taskflow/taskflow.hpp>                    // core (always needed)
#include <taskflow/algorithm/for_each.hpp>          // algorithm headers are separate
#include <taskflow/algorithm/reduce.hpp>            // include only what you use

CMake options: find_package(Taskflow) (installed) or FetchContent (vendored). With FetchContent, the target is Taskflow (no namespace), not Taskflow::Taskflow.

Compiler Min Version Flag CMake >= 3.18
GCC 11 -std=c++20 CUDA: nvcc >= 12.0
Clang 12 -std=c++20
MSVC 19.29 /std:c++20

Core Concepts

Object Role
tf::Executor Thread pool that runs taskflows — heavyweight, spawns OS threads on construction
tf::Taskflow DAG of tasks (the computation) — lightweight, can be rebuilt cheaply
tf::Task Handle to a node in the DAG (just a Node*, 8 bytes)
tf::TaskGroup Cooperative task group for fork-join within a running executor
tf::Executor executor;            // thread pool (default: hardware_concurrency)
tf::Taskflow taskflow;            // task graph

auto A = taskflow.emplace([](){ /* work */ }).name("A");
auto B = taskflow.emplace([](){ /* work */ }).name("B");
A.precede(B);                     // A runs before B

executor.run(taskflow).wait();    // run() is non-blocking — .wait() is required

Executor lifetime: tf::Executor construction spawns N OS threads and initializes work-stealing queues — this is expensive (~100µs+ per thread). When a function will be called repeatedly (benchmarks, servers, iterative solvers), construct the executor once and reuse it:

void compute(tf::Executor& executor) {     // receives pre-built executor
  tf::Taskflow taskflow;
  // ... build graph ...
  executor.run(taskflow).wait();
}

void measure(tf::Executor& executor) {
  compute(executor);
}

Choose the executor's worker count at the call site, construct it once for that configuration, and reuse it across calls.

Building a new tf::Taskflow each call is fine — it's lightweight. The executor is what's expensive. If your workload only runs once, a local executor is perfectly acceptable.


Choosing the Right Pattern

Sequential Pattern Taskflow Replacement Header
for loop over range/indices for_each / for_each_index algorithm/for_each.hpp
Chunked parallel loop (large range, cheap body) for_each_by_index algorithm/for_each.hpp
Accumulate / fold (iterators) reduce / transform_reduce algorithm/reduce.hpp
Accumulate / fold (index range) reduce_by_index algorithm/reduce.hpp
std::sort sort algorithm/sort.hpp
Prefix sum inclusive_scan / exclusive_scan algorithm/scan.hpp
std::find_if / std::transform find_if / transform algorithm/find.hpp / transform.hpp
Multi-stage streaming (fixed stages) tf::Pipeline algorithm/pipeline.hpp
Multi-stage streaming (typed data flow) tf::DataPipeline algorithm/data_pipeline.hpp
Multi-stage streaming (dynamic stages) tf::ScalablePipeline algorithm/pipeline.hpp
Independent tasks with deps emplace() + precede() taskflow.hpp
Recursive fork-join tf::TaskGroup + corun() taskflow.hpp
Dynamic sub-DAG (need task handles) tf::Subflow taskflow.hpp
Conditional branching/loops Condition task (returns int) taskflow.hpp
Repeated parallel computation Condition task cycle (graph-internal loop), or run_n() / run_until() for simpler repeated execution taskflow.hpp
Fire-and-forget async (no future needed) executor.silent_async() taskflow.hpp
Async with std::future<T> return value executor.async() taskflow.hpp
Async with ordering dependent_async() taskflow.hpp
Reusable graph modules composed_of() taskflow.hpp
Limit concurrency tf::Semaphore taskflow.hpp
Sequential task chain taskflow.linearize(tasks) taskflow.hpp

Decision Guide

Data-parallel loop?

  • Per-element work is substantial → for_each_index (Taskflow partitions the range internally; good default for non-trivial per-element work)
  • Large range with cheap per-element work, or when you want explicit inner-loop control → for_each_by_index (lambda receives a subrange to iterate)
  • Need to reduce/accumulate a result → reduce_by_index (thread-local partial sums, merged at the end — avoids shared-state contention)

Recursive / fork-join?

  • Structure discovered during execution (fibonacci, merge sort, recursive tree traversal) → tf::TaskGroup + corun(). Workers cooperatively steal tasks while waiting.
  • Structure known at construction time (enumerable in a loop — e.g., BFS levels, layer-by-layer graph) → emplace() + precede(). Build the full DAG statically, no runtime overhead.
  • Need a dynamic sub-DAG with explicit task handles and edges → tf::Subflow. More powerful but heavier — each subflow materializes a full task graph.

TaskGroup+corun recipe (recursive fork-join):

// Recursive function — runs inside a worker, so TaskGroup is available
size_t fib(tf::Executor& executor, size_t n) {
  if (n < 2) return n;
  size_t x, y;
  auto tg = executor.task_group();
  tg.silent_async([&]{ x = fib(executor, n - 1); });
  y = fib(executor, n - 2);   // run one child inline to save a task
  tg.corun();   // worker steals other tasks while waiting — no thread starvation
  return x + y;
}

// Entry point — enter worker context first
size_t fib_entry(tf::Executor& executor, size_t n) {
  return executor.async([&executor, n]() {
    return fib(executor, n);
  }).get();
}

TaskGroup requires a worker context — call executor.task_group() only from code running inside the executor (e.g., inside executor.async(...) or a task lambda). For binary recursion, fork one child and run the other inline to reduce task creation overhead.

Pipeline / staged processing? → Use the Pipeline family rather than manually wiring stage-to-stage DAGs with barrier tasks. Pipeline provides optimized token-based scheduling without materializing the full DAG.

  • Stages known at compile time → tf::Pipeline (simplest)
  • Typed data flowing between stages → tf::DataPipeline (compile-time type safety)
  • Number of stages determined at runtime → tf::ScalablePipeline (dynamic configuration) See docs/taskflow-patterns.md Section 4e for details on each variant.

Repeated computation in a loop? → Prefer a condition task cycle when loop state belongs inside the DAG or you need graph-internal control flow. Use run_n() / run_until() when you simply need to rerun the same graph with a simpler API.

None of the above? → Manual graph with emplace() + precede().

Before writing manual implementations, check if Taskflow already has a built-in algorithm (e.g., sort, reduce, inclusive_scan, for_each_index). Built-in algorithms are pre-tuned and always faster than hand-rolled recursive equivalents.

For large-scale explicit DAGs (100K+ tasks), see docs/taskflow-patterns.md Section 6f.

Partitioners

Algorithms like for_each_index, for_each_by_index, reduce_by_index, and sort accept an optional partitioner that controls how work is distributed across threads.

Partitioner Best for Chunk size
tf::GuidedPartitioner{} (default) General-purpose — starts with large chunks, shrinks adaptively Auto
tf::DynamicPartitioner{C} Irregular per-element cost (image processing, sparse graphs) C elements per steal
tf::StaticPartitioner{} Uniform per-element cost (matrix ops, memset-like loops) N/threads, no stealing

Chunk size guidance:

  • DynamicPartitioner{1} — finest load balance, highest steal overhead
  • DynamicPartitioner{64} or larger — amortizes steal overhead for cheap per-element work
  • StaticPartitioner{} — zero steal overhead, best when every element costs the same

When to switch from default: If profiling shows load imbalance (some threads finish much earlier than others), try DynamicPartitioner{1}. If profiling shows steal overhead dominates (very cheap per-element work, uniform cost), try StaticPartitioner{}.


Per-Task Cost Model (Large DAGs)

Each taskflow.emplace(lambda) allocates a heap Node (~160 bytes) and type-erases the lambda via std::function. The SBO (small-buffer optimization) threshold for std::function is only 16 bytes (libstdc++) — most realistic captures exceed this, triggering a second heap allocation. Total cost: ~150-250ns per task.

Cost hierarchy — where to focus optimization effort:

  1. Macro-level strategy — monolithic vs batched DAG. Each executor.run().wait() is a full synchronization barrier. Monolithic (1 barrier) is generally faster than batched (N barriers). Consider batching only for memory constraints (~0.5 GB per 1M tasks).
  2. Lambda capture size — since most captures exceed the 16-byte SBO, the second heap allocation is unavoidable. But capture size still matters: copying 240 bytes per task is 5x slower than 48 bytes, and fat captures pollute L1/L2 cache during construction. Target ≤48 bytes. Use pointer+index into pre-allocated arrays rather than embedding data in the capture struct (see docs/taskflow-patterns.md Section 6f).
  3. Extra heap allocations per task — copying structs with vectors, shared_ptr captures, std::vector captured by value. Each adds ~50-150ns on top of the mandatory node alloc.
  4. Algorithmic complexity of lookups — O(log n) containers (std::map) for dependency tracking cause cache-miss pointer chasing during single-threaded construction. Prefer flat vectors for O(1) indexed access.
  5. Scratch buffer strategy — pre-allocate per-worker scratch arrays and index by executor.this_worker_id() or thread_local. Either mechanism works, but all init and allocation must happen outside the lambda. Never call prepare_scratch(), resize, or check init flags inside the task body — this adds per-task overhead at millions of tasks.

Implication: Simpler code with fewer allocations outperforms complex "optimized" code that inadvertently adds overhead. See docs/taskflow-performance.md for the full cost model.


Common Mistakes

  1. Forgetting .wait()run() is non-blocking; without .wait() results are undefined.

  2. Recreating the executor in hot pathstf::Executor spawns OS threads on construction. Creating and destroying it on every call pays thread spawn/join overhead each time.

    Anti-pattern (executor created per call):

    auto measure_time_taskflow(unsigned num_threads) {
      tf::Executor executor(num_threads);   // BAD: spawns threads every call
      tf::Taskflow taskflow;
      // ...
      executor.run(taskflow).wait();
    }

    Correct pattern — pass executor in:

    auto measure_time_taskflow(tf::Executor& executor) {
      tf::Taskflow taskflow;                // Taskflow is cheap to rebuild
      // ...
      executor.run(taskflow).wait();
    }

    If you cannot change the function signature, a static local executor is only safe when the worker count is fixed for the lifetime of the process:

    auto measure_time_taskflow() {
      static tf::Executor executor(fixed_num_threads);  // only when worker count never changes
      tf::Taskflow taskflow;
      // ...
      executor.run(taskflow).wait();
    }

    If num_threads varies across calls (for example, benchmark sweeps), construct the executor at the caller and pass it in instead.

  3. Atomic contention in reductions — using for_each_index with std::atomic::fetch_add per element causes all threads to contend on one cache line. At high core counts, this effectively serializes the computation. Prefer reduce_by_index which accumulates into thread-local partials and merges only at the end. See docs/taskflow-patterns.md Section 4b.

  4. Subflow + join() for recursive fork-joinsf.join() blocks the calling worker thread, removing it from the work-stealing pool. With deep recursion this cascades into thread starvation. TaskGroup + corun() lets the waiting worker steal other tasks cooperatively. Prefer TaskGroup + corun() for all recursive fork-join. Reserve Subflow for cases where you need explicit task handles and edges within the sub-DAG. See docs/taskflow-patterns.md Section 6a for guidance on when each pattern fits.

  5. Rebuilding the graph every iteration — when a parallel computation repeats N times, avoid clearing and rebuilding the graph each iteration.

    Anti-pattern (rebuild per iteration):

    for (int i = 0; i < N; i++) {
      tf::Taskflow taskflow;
      // ... build graph ...
      executor.run(taskflow).wait();   // BAD: graph construction + barrier per iteration
    }

    Correct pattern (condition task cycle with StaticPartitioner):

    tf::Taskflow taskflow;
    int iter = 0;
    auto work = taskflow.for_each_index(0, M, 1, [&](int j){ /* ... */ },
                                         tf::StaticPartitioner{});
    auto cond = taskflow.emplace([&]() -> int {
      return (++iter < N) ? 0 : -1;   // 0 = loop back, -1 = stop
    });
    work.precede(cond);
    cond.precede(work);               // successor 0 = work (cycles back)
    executor.run(taskflow).wait();     // single barrier for all N iterations

    This avoids per-iteration graph construction and executor re-launch overhead. See docs/taskflow-patterns.md Section 6b.

  6. Data races — tasks run concurrently; protect shared mutable state.

  7. Dangling references — captured references must outlive executor.run().wait().

  8. std::function wrappers — unnecessary; emplace() accepts callables directly.

  9. Missing algorithm headersfor_each, reduce, etc. need their own #include.

  10. Too-fine granularity — tasks should be ≥10-100 microseconds of work. If per-element work is cheap, prefer chunked APIs (for_each_by_index, reduce_by_index) that let each task process a range of elements.

  11. Concurrent runs of same Taskflow — undefined behavior; wait for completion first.

  12. Modifying graph while running — undefined behavior; build before run().

  13. Condition task successor ordering — return value selects by precede() call order. Returning -1 (or any out-of-bounds value) selects no successor, effectively stopping.

  14. Compiling below C++20 — Taskflow requires -std=c++20.

  15. Missing WAR deps with reused storage — circular buffers need WAR edges, not just RAW/WAW. See docs/taskflow-patterns.md Section 6f.

  16. Heap allocs in task loops — capturing vector by value, copying large structs → per-task malloc. Use fixed-size captures, pointers, references instead.

  17. const captures breaking pointer APIs — mark lambda mutable if called function needs non-const pointers from your captured struct.

  18. Skipping parallel init — when porting from OMP/TBB, parallelize all loops that were parallel in the source, including initialization. Sequential init of large data structures can be a meaningful fraction of total runtime.


When to Parallelize (and When Not To)

Good candidates: Independent loop iterations (>10µs each), serialized independent stages, recursive divide-and-conquer, pipeline/streaming, CPU-bound work saturating one core.

Leave alone: Trivially cheap loops (nanoseconds), memory-bandwidth-bound code, tightly sequential deps (except reductions/scans), working OpenMP/TBB code unless the goal is to standardize on Taskflow or remove the existing framework dependency, I/O-bound code, N < 10K with moderate per-element work, SIMD-vectorizable inner loops.

Priority order: 1) Serialized independent work → task graph. 2) Large parallel loops → for_each/for_each_by_index. 3) Reductions → reduce/reduce_by_index. 4) Pipelines → Pipeline. 5) Recursive fork-join → TaskGroup + corun().


for_each_index vs Explicit DAG

Approach Best when
Explicit DAG (emplace + precede) Width < threads, or cross-timestep parallelism matters
Per-timestep for_each_index Width >> threads (width ≥ workers × 8)
Hybrid Mixed narrow/wide phases