Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
name: Deploy Docs to GitHub Pages

on:
push:
branches:
- main
- application-experiments
- application-experiments-b
paths:
- 'docs/**'
workflow_dispatch:

permissions:
contents: read
pages: write
id-token: write

concurrency:
group: "pages"
cancel-in-progress: false

jobs:
build:
runs-on: ubuntu-latest
defaults:
run:
working-directory: docs
steps:
- uses: actions/checkout@v4

- uses: actions/setup-node@v4
with:
node-version: 20
cache: npm
cache-dependency-path: docs/package-lock.json

- name: Install dependencies
run: npm ci

- name: Build website
run: npm run build

- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
path: docs/build

deploy:
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: ubuntu-latest
needs: build
steps:
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,9 @@ out
hs_err_pid*.log
.bsp
metals.sbt
.scala-build
**/output
.scala-build
smoke
julia


32 changes: 30 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ lazy val vulkanNatives =
lazy val commonSettings = Seq(
scalacOptions ++= Seq("-feature", "-deprecation", "-unchecked", "-language:implicitConversions"),
resolvers += "maven snapshots" at "https://central.sonatype.com/repository/maven-snapshots/",
resolvers += "OSGeo Release Repository" at "https://repo.osgeo.org/repository/release/",
resolvers += "OSGeo Snapshot Repository" at "https://repo.osgeo.org/repository/snapshot/",
libraryDependencies ++= Seq(
"dev.zio" % "izumi-reflect_3" % "3.0.5",
"com.lihaoyi" % "pprint_3" % "0.9.0",
Expand All @@ -60,6 +62,24 @@ lazy val runnerSettings = Seq(libraryDependencies += "org.apache.logging.log4j"

lazy val fs2Settings = Seq(libraryDependencies ++= Seq("co.fs2" %% "fs2-core" % "3.12.0", "co.fs2" %% "fs2-io" % "3.12.0"))

lazy val tapirVersion = "1.11.10"
lazy val http4sVersion = "0.23.30"
lazy val circeVersion = "0.14.10"

lazy val tapirSettings = Seq(
libraryDependencies ++= Seq(
"com.softwaremill.sttp.tapir" %% "tapir-http4s-server" % tapirVersion,
"com.softwaremill.sttp.tapir" %% "tapir-json-circe" % tapirVersion,
"com.softwaremill.sttp.tapir" %% "tapir-swagger-ui-bundle" % tapirVersion,
"org.http4s" %% "http4s-ember-server" % http4sVersion,
"org.http4s" %% "http4s-ember-client" % http4sVersion,
"org.http4s" %% "http4s-circe" % http4sVersion,
"io.circe" %% "circe-generic" % circeVersion,
"io.circe" %% "circe-parser" % circeVersion,
"org.typelevel" %% "munit-cats-effect" % "2.0.0" % Test,
),
)

lazy val utility = (project in file("cyfra-utility"))
.settings(commonSettings)

Expand Down Expand Up @@ -91,6 +111,14 @@ lazy val foton = (project in file("cyfra-foton"))
.settings(commonSettings)
.dependsOn(compiler, dsl, runtime, utility)

lazy val fluids = (project in file("cyfra-fluids"))
.settings(commonSettings, runnerSettings)
.dependsOn(foton, runtime, dsl, utility)

lazy val analytics = (project in file("cyfra-analytics"))
.settings(commonSettings, runnerSettings, fs2Settings, tapirSettings)
.dependsOn(foton, runtime, dsl, utility, fs2interop)

lazy val examples = (project in file("cyfra-examples"))
.settings(commonSettings, runnerSettings)
.settings(libraryDependencies += "org.scala-lang.modules" % "scala-parallel-collections_3" % "1.2.0")
Expand All @@ -106,11 +134,11 @@ lazy val fs2interop = (project in file("cyfra-fs2"))

lazy val e2eTest = (project in file("cyfra-e2e-test"))
.settings(commonSettings, runnerSettings)
.dependsOn(runtime, fs2interop)
.dependsOn(runtime, fs2interop, foton)

lazy val root = (project in file("."))
.settings(name := "Cyfra")
.aggregate(compiler, dsl, foton, core, runtime, vulkan, examples, fs2interop)
.aggregate(compiler, dsl, foton, core, runtime, vulkan, examples, fs2interop, fluids, analytics)

e2eTest / Test / javaOptions ++= Seq("-Dorg.lwjgl.system.stackSize=1024", "-DuniqueLibraryNames=true")

Expand Down
13 changes: 13 additions & 0 deletions cyfra-analytics/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package io.computenode.cyfra.analytics.endpoints

import cats.effect.IO
import sttp.tapir.*
import sttp.tapir.generic.auto.*
import sttp.tapir.json.circe.*
import sttp.tapir.server.ServerEndpoint
import sttp.capabilities.fs2.Fs2Streams
import io.circe.generic.auto.*
import io.computenode.cyfra.analytics.model.*
import io.computenode.cyfra.analytics.service.SegmentationService
import io.computenode.cyfra.analytics.repository.CentroidsRepository
import io.computenode.cyfra.fs2interop.GCluster.FCMCentroids

class SegmentationEndpoints(service: SegmentationService, centroidsRepo: CentroidsRepository):

private val baseEndpoint = endpoint
.in("api" / "v1")
.errorOut(jsonBody[ErrorResponse])

val submitTransaction: PublicEndpoint[Transaction, ErrorResponse, Unit, Any] =
baseEndpoint.post
.in("transactions")
.in(jsonBody[Transaction])
.out(statusCode(sttp.model.StatusCode.Accepted))
.description("Submit a transaction for processing")

val submitBatchTransactions: PublicEndpoint[List[Transaction], ErrorResponse, BatchResponse, Any] =
baseEndpoint.post
.in("transactions" / "batch")
.in(jsonBody[List[Transaction]])
.out(jsonBody[BatchResponse])
.description("Submit a batch of transactions for bulk processing")

val getCustomer: PublicEndpoint[Long, ErrorResponse, CustomerResponse, Any] =
baseEndpoint.get
.in("customers" / path[Long]("customerId"))
.out(jsonBody[CustomerResponse])
.description("Get customer segment and profile")

val getSegments: PublicEndpoint[Unit, ErrorResponse, SegmentsResponse, Any] =
baseEndpoint.get
.in("segments")
.out(jsonBody[SegmentsResponse])
.description("List all customer segments")

val getCentroids: PublicEndpoint[Unit, ErrorResponse, CentroidsResponse, Any] =
baseEndpoint.get
.in("centroids")
.out(jsonBody[CentroidsResponse])
.description("Get current segment centroids")

val updateCentroids: PublicEndpoint[CentroidsUpdate, ErrorResponse, CentroidsResponse, Any] =
baseEndpoint.put
.in("centroids")
.in(jsonBody[CentroidsUpdate])
.out(jsonBody[CentroidsResponse])
.description("Update segment centroids")

val getHealth: PublicEndpoint[Unit, ErrorResponse, HealthStatus, Any] =
endpoint.get
.in("health")
.out(jsonBody[HealthStatus])
.errorOut(jsonBody[ErrorResponse])
.description("Service health check")

def serverEndpoints: List[ServerEndpoint[Fs2Streams[IO], IO]] = List(
submitTransaction.serverLogic(tx => service.submitTransaction(tx).map(_ => Right(()))),
submitBatchTransactions.serverLogic { transactions =>
service.submitBatch(transactions).map { _ =>
Right(BatchResponse(accepted = transactions.size, message = s"Accepted ${transactions.size} transactions for processing"))
}
},
getCustomer.serverLogic(customerId =>
service.getCustomerInfo(customerId).map {
case Some(response) => Right(response)
case None => Left(ErrorResponse(s"Customer $customerId not found"))
},
),
getSegments.serverLogic(_ => service.getSegments.map(Right(_))),
getCentroids.serverLogic(_ =>
centroidsRepo.get.map { centroids =>
Right(
CentroidsResponse(
labels = centroids.labels,
numClusters = centroids.labels.size,
numFeatures = centroids.values.length / centroids.labels.size,
),
)
},
),
updateCentroids.serverLogic { update =>
val centroids = FCMCentroids(update.labels, update.values)
centroidsRepo.update(centroids).map { _ =>
Right(
CentroidsResponse(
labels = centroids.labels,
numClusters = centroids.labels.size,
numFeatures = centroids.values.length / centroids.labels.size,
),
)
}
},
getHealth.serverLogic(_ => service.getHealth.map(Right(_))),
)

val all = List(submitTransaction, submitBatchTransactions, getCustomer, getSegments, getCentroids, updateCentroids, getHealth)
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.computenode.cyfra.analytics.gpu

import fs2.{Pipe, Stream}
import io.computenode.cyfra.core.{CyfraRuntime, GCodec, GProgram}
import io.computenode.cyfra.core.layout.Layout
import io.computenode.cyfra.dsl.{*, given}
import io.computenode.cyfra.dsl.binding.{GBuffer, GUniform}
import io.computenode.cyfra.dsl.gio.GIO
import io.computenode.cyfra.dsl.struct.GStruct
import io.computenode.cyfra.dsl.library.Functions.pow
import io.computenode.cyfra.fs2interop.GCluster.{FCMCentroids, Membership}
import io.computenode.cyfra.fs2interop.GPipe
import io.computenode.cyfra.analytics.model.CustomerProfile

/** GPU-accelerated customer segmentation pipeline. */
object GpuAnalyticsPipeline:

case class Config(numFeatures: Int, numClusters: Int, fuzziness: Float = 2.0f, batchSize: Int = 100_000)

case class SegmentResult(customer: CustomerProfile, membership: Membership, dominantSegment: Int):
def dominantWeight: Float = membership.dominantWeight

def segment[F[_]](config: Config, centroids: FCMCentroids)(using CyfraRuntime): Pipe[F, CustomerProfile, SegmentResult] =
_.chunkN(config.batchSize).flatMap { chunk =>
val customers = chunk.toArray

val memberships = GPipe.batch(membershipKernel(config), config.batchSize, config.numFeatures, config.numClusters)(
buildLayout = (flat, n) =>
FCMLayout(
points = GBuffer[Float, Float32](flat),
centroids = GBuffer[Float, Float32](centroids.values),
memberships = GBuffer[Float32](config.batchSize * config.numClusters),
params = GUniform[(Int, Int, Int, Float), FCMParams]((n, config.numFeatures, config.numClusters, config.fuzziness)),
),
outputBuffer = _.memberships,
)(customers.map(_.features))

Stream.emits(customers.zip(memberships).map { case (customer, m) =>
val membership = Membership(m)
SegmentResult(customer, membership, membership.dominantCluster)
})
}

// GPU Kernel

private case class FCMParams(numPoints: Int32, numFeatures: Int32, numClusters: Int32, fuzziness: Float32) extends GStruct[FCMParams]

private given GCodec[FCMParams, (Int, Int, Int, Float)] with
def toByteBuffer(buf: java.nio.ByteBuffer, chunk: Array[(Int, Int, Int, Float)]): java.nio.ByteBuffer =
buf.clear().order(java.nio.ByteOrder.nativeOrder())
chunk.foreach { case (a, b, c, d) => buf.putInt(a); buf.putInt(b); buf.putInt(c); buf.putFloat(d) }
buf.flip(); buf
def fromByteBuffer(buf: java.nio.ByteBuffer, arr: Array[(Int, Int, Int, Float)]): Array[(Int, Int, Int, Float)] =
arr.indices.foreach(i => arr(i) = (buf.getInt(), buf.getInt(), buf.getInt(), buf.getFloat()))
buf.rewind(); arr

private case class FCMLayout(points: GBuffer[Float32], centroids: GBuffer[Float32], memberships: GBuffer[Float32], params: GUniform[FCMParams])
derives Layout

private def membershipKernel(config: Config): GProgram[Int, FCMLayout] =
GProgram.static[Int, FCMLayout](
layout = _ =>
FCMLayout(
points = GBuffer[Float32](config.batchSize * config.numFeatures),
centroids = GBuffer[Float32](config.numClusters * config.numFeatures),
memberships = GBuffer[Float32](config.batchSize * config.numClusters),
params = GUniform[FCMParams](),
),
dispatchSize = identity,
): layout =>
val pointId = GIO.invocationId
val params = layout.params.read
val exponent = 2.0f / (config.fuzziness - 1.0f)

GIO.when(pointId < params.numPoints):
val pointBase = pointId * params.numFeatures

GIO.repeat(params.numClusters) { j =>
val centroidBaseJ = j * params.numFeatures

val distJ = GSeq
.gen[Int32](0, _ + 1)
.limit(config.numFeatures)
.fold(
0.0f,
(sum: Float32, f: Int32) => {
val diff = GIO.read(layout.points, pointBase + f) - GIO.read(layout.centroids, centroidBaseJ + f)
sum + diff * diff
},
)

val membership = when(distJ < 0.000001f)(1.0f).otherwise {
val sumRatios = GSeq
.gen[Int32](0, _ + 1)
.limit(config.numClusters)
.fold(
0.0f,
(acc: Float32, k: Int32) => {
val centroidBaseK = k * params.numFeatures
val distK = GSeq
.gen[Int32](0, _ + 1)
.limit(config.numFeatures)
.fold(
0.0f,
(s: Float32, f: Int32) => {
val d = GIO.read(layout.points, pointBase + f) - GIO.read(layout.centroids, centroidBaseK + f)
s + d * d
},
)
val ratio = when(distK < 0.000001f)(0.0f).otherwise(pow(distJ / distK, exponent))
acc + ratio
},
)
when(sumRatios > 0.0f)(1.0f / sumRatios).otherwise(1.0f / params.numClusters.asFloat)
}

GIO.write(layout.memberships, pointId * params.numClusters + j, membership)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.computenode.cyfra.analytics.model

import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto.*

/** Request to update segment centroids */
case class CentroidsUpdate(labels: Vector[String], values: Array[Float])

object CentroidsUpdate:
given Decoder[CentroidsUpdate] = deriveDecoder
given Encoder[CentroidsUpdate] = deriveEncoder

/** Response with current centroids */
case class CentroidsResponse(labels: Vector[String], numClusters: Int, numFeatures: Int)

object CentroidsResponse:
given Encoder[CentroidsResponse] = deriveEncoder
given Decoder[CentroidsResponse] = deriveDecoder
Loading