Skip to content

aptpod/iscp-kt

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Module iscp

iSCP 2.0 Client Library for Kotlin

iSCP Client for Kotlin は、iSCP version 2を用いたリアルタイムAPIにアクセスするためのクライアントライブラリです。

Dependencies

Installation

// build.gradle
...
dependencies {
    ...
    // Install iSCP
    implementation 'com.aptpod.github:iscp:1.1.0'
}
// settings.gradle
...
dependencyResolutionManagement {
    repositoriesMode.set(RepositoriesMode.FAIL_ON_PROJECT_REPOS)
    repositories {
        google()
        mavenCentral()
        // Library references for iSCP
        maven { url "https://aptpod.github.io/iscp-kt" }
    }
}

Implementation

Connect to intdash API

このサンプルではiscp-ktを使ってintdash APIに接続します。

import android.app.Activity
import com.aptpod.iscp.connection.Connection
import com.aptpod.iscp.connection.ConnectionCallbacks
import com.aptpod.iscp.transport.IConnector
import com.aptpod.iscp.transport.websocket.WebSocketConnector

class ExampleActivity : Activity() {
    /**
     * 接続するintdashサーバー
     */
    var targetServer: String = "https://example.com"
    /**
     * ノードUUID(ここで指定されたノードとして送受信を行います。
     *
     * intdash APIでノードを生成した際に発行されたノードUUIDを指定します。)
     */
    var nodeId = "00000000-0000-0000-0000-000000000000"
    /**
     * アクセストークン
     *
     * intdash APIで取得したアクセストークンを指定して下さい。
     */
    var accessToken = ""
    /**
     *  コネクション
     */
    var connection: Connection? = null

    fun connect() {
        // 接続情報のセットアップをします。
        var urls = targetServer.split("://")
        var address: String
        var enableTls: Boolean = false
        if (urls.size == 1)
        {
            address = urls[0]
        }
        else
        {
            enableTls = urls[0] == "https"
            address = urls[1]
        }
        // WebSocketを使って接続するように指定します。
        var connector: IConnector = WebSocketConnector(enableTls = enableTls)
        Connection.connectAsync(
            address = address,
            connector = connector,
            tokenSource = {
                // アクセス用のトークンを指定します。接続時に発生するイベントにより使用されます。
                // ここでは固定のトークンを返していますが随時トークンの更新を行う実装にするとトークンの期限切れを考える必要がなくなります。
                accessToken
            },
            nodeId = nodeId,
            completion = { con, ex ->
                if (con == null) {
                    // 接続失敗
                    return@connectAsync
                }
                // 接続成功.
                connection = con
                con.callbacks = this.connectionCallbacks // ConnectionCallbacks
                // 以降、startUpstreamやstartDownstreamなどが実行可能になります。
            }
        )
    }

    val connectionCallbacks : ConnectionCallbacks
        get() = object : ConnectionCallbacks {
            override fun onReconnect(connection: Connection) {
                // Connectionが再オープンされた際にコールされます。
            }

            override fun onDisconnect(connection: Connection) {
                // Connectionがクローズされた際にコールされます。
            }

            override fun onFailWithError(connection: Connection, exception: Exception) {
                // Connection内部で何らかのエラーが発生した際にコールされます。
            }
        }
}

Start Upstream

アップストリームの送信サンプルです。

このサンプルでは、基準時刻のメタデータと、文字列型のデータポイントをiSCPサーバーへ送信しています。

import com.aptpod.iscp.model.BaseTime
import com.aptpod.iscp.model.DataId
import com.aptpod.iscp.model.DataPoint
import com.aptpod.iscp.model.UpstreamChunk
import com.aptpod.iscp.model.UpstreamChunkAck
import com.aptpod.iscp.stream.Upstream
import com.aptpod.iscp.stream.UpstreamCallbacks
import java.time.ZonedDateTime
import java.util.UUID

/**
 * 送信するデータを永続化するかどうか
 */
var upstreamPersist: Boolean = false
/**
 * オープンしたストリーム一覧
 */
var upstreams: MutableList<Upstream> = mutableListOf()

fun ExampleActivity.startUpstream() {
    // セッションIDを払い出します。
    var sessionId = UUID.randomUUID().toString().lowercase()

    // Upstreamをオープンします。
    connection?.openUpstreamAsync(
        sessionId = sessionId,
        persist = upstreamPersist,
        completion = { upstream, ex ->
            if (upstream == null) {
                // オープン失敗。
                return@openUpstreamAsync
            }
            // オープン成功
            upstreams.add(upstream)

            // 送信するデータポイントを保存したい場合や、アップストリームのエラーをハンドリングしたい場合はコールバックを設定します。
            upstream.callbacks = this.upstreamCallbacks // UpstreamCallbacks

            var date = ZonedDateTime.now() // ※マイクロ秒以下をサポートするには修正が必要です。
            var baseTime = (date.toEpochSecond() * 1000_000_000) + date.nano // 基準時刻です。

            // 基準時刻をiSCPサーバーへ送信します。
            connection?.sendBaseTimeAsync(
                baseTime = BaseTime(
                    sessionId = sessionId,
                    name = "manual",
                    priority = 60,
                    elapsedTime = 0,
                    baseTime = baseTime),
                persist = upstreamPersist,
                completion = { sendBaseTimeEx ->
                    if (sendBaseTimeEx != null) {
                        // 基準時刻の送信に失敗。
                        return@sendBaseTimeAsync
                    }
                    // 基準時刻の送信に成功。

                    // 文字列型のデータポイントをiSCPサーバーへ送信します。
                    var now = ZonedDateTime.now() // ※マイクロ秒以下をサポートするには修正が必要です。
                    upstream.writeDataPoint(
                        dataId = DataId(
                            name = "greeting",
                            type = "string"),
                        dataPoint = DataPoint(
                            elapsedTime = ((now.toEpochSecond() * 1000_000_000) + now.nano) - baseTime, // 基準時刻からの経過時間をデータポイントの経過時間として打刻します。
                            payload = "hello".toByteArray()
                        )
                    )
                }
            )
        }
    )
}

val ExampleActivity.upstreamCallbacks : UpstreamCallbacks
    get() = object : UpstreamCallbacks {
        override fun onGenerateChunk(upstream: Upstream, message: UpstreamChunk) {
            // バッファへ書き込んだデータポイントが実際に送信される直前にコールされます。
        }

        override fun onReceiveAck(upstream: Upstream, message: UpstreamChunkAck) {
            // データポイントの送信後に返却されるACKを受信できた場合にコールされます。
        }

        override fun onFailWithError(upstream: Upstream, error: Exception) {
            // 内部でエラーが発生した場合にコールされます。
        }

        override fun onCloseWithError(upstream: Upstream, error: Exception) {
            // 何らかの理由でストリームがクローズした場合にコールされます。
            // 再度アップストリームをオープンしたい場合は、 `Connection.reopenUpstream()` を使用することにより、ストリームの設定を引き継いで別のストリームを開くことが可能です。
        }

        override fun onResume(upstream: Upstream) {
            // 自動再接続機能が働き、再接続が行われた場合にコールされます。
        }
    }

Start Downstream

アップストリームで送信されたデータをダウンストリームで受信するサンプルです。

このサンプルでは、アップストリーム開始のメタデータ、基準時刻のメタデータ、文字列型のデータポイントを受信しています。

import android.util.Log
import com.aptpod.iscp.model.DataFilter
import com.aptpod.iscp.model.DownstreamChunk
import com.aptpod.iscp.model.DownstreamFilter
import com.aptpod.iscp.model.DownstreamMetadata
import com.aptpod.iscp.stream.Downstream
import com.aptpod.iscp.stream.DownstreamCallbacks
import java.time.Instant
import java.time.ZoneId
import java.time.ZonedDateTime

/**
 * 受信したいデータを送信している送信元ノードのUUID
 *
 * (アップストリームを行っている送信元でConnection.Configで設定したnodeIdを指定してください。)
 */
var targetDownstreamNodeID = "00000000-0000-0000-0000-000000000000"
/**
 * オープンしたダウンストリーム一覧
 */
var downstreams: MutableList<Downstream> = mutableListOf()

fun ExampleActivity.startDownstream() {
    // ダウンストリームをオープンします。
    connection?.openDownstreamAsync(
        downstreamFilters = listOf(
            DownstreamFilter(
                sourceNodeId = targetDownstreamNodeID, // // 送信元ノードのIDを指定します。
                dataFilters = listOf(
                    DataFilter(
                        name = "#", type = "#") // 受信したいデータを名称と型で指定します。この例では、ワイルドカード `#` を使用して全てのデータを取得します。
                )
            )
        ),
        completion = { downstream, ex ->
            if (downstream == null) {
                // オープン失敗。
                return@openDownstreamAsync
            }
            // オープン成功。
            downstreams.add(downstream)
            // 受信データを取り扱うためにデリゲートを設定します。
            downstream.callbacks = this.downstreamCallbacks // DownstreamCallbacks
        }
    )
}

val ExampleActivity.downstreamCallbacks: DownstreamCallbacks
    get() = object : DownstreamCallbacks {
        override fun onReceiveChunk(downstream: Downstream, message: DownstreamChunk) {
            // データポイントを読み込むことができた際にコールされます。
            Log.d(this.javaClass.name, "Received dataPoints sequenceNumber[${message.sequenceNumber}], sessionId[${message.upstreamInfo.sessionId}]")
            for (g in message.dataPointGroups) {
                for (dp in g.dataPoints) {
                    Log.d(this.javaClass.name, "Received a dataPoint dataName[${g.dataId.name}], dataType[${g.dataId.type}], payload[${String(dp.payload)}]")
                }
            }
        }

        override fun onReceiveMetadata(downstream: Downstream, message: DownstreamMetadata) {
            // メタデータを受信した際にコールされます。
            Log.d(this.javaClass.name, "Received a metadata sourceNodeId[${message.sourceNodeId}], metadataType:${message.type}")
            when (message.type) {
                DownstreamMetadata.MetadataType.BASE_TIME -> {
                    var baseTime = message.baseTime!!
                    var date = ZonedDateTime.ofInstant(Instant.ofEpochSecond(baseTime.baseTime / 1000_000_000, baseTime.baseTime % 1000_000_000), ZoneId.systemDefault())
                    Log.d(this.javaClass.name, "Received baseTime[$date], priority[${baseTime.priority}], name[${baseTime.name}]")
                }
                else -> {}
            }
        }

        override fun onFailWithError(downstream: Downstream, error: Exception) {
            // 内部でエラーが発生した場合にコールされます。
        }

        override fun onCloseWithError(downstream: Downstream, error: Exception) {
            // 何らかの理由でストリームがクローズした場合にコールされます。
            // 再度ダウンストリームをオープンしたい場合は、 `Connection.reopenDownstream()` を使用することにより、ストリームの設定を引き継いで別のストリームを開くことが可能です。
        }

        override fun onResume(downstream: Downstream) {
            // 自動再接続機能が働き、再接続が行われた場合にコールされます。
        }
    }

E2E Call

E2E(エンドツーエンド)コールのサンプルです。

コントローラノードが対象ノードに対して指示を出し、対象ノードは受信完了のリプライを行う簡単なサンプルです。

class E2ECallExampleActivity  : Activity() {
    /**
     * 接続するintdashサーバー
     */
    var targetServer: String = "https://example.com"

    /**
     * コントローラーノードのUUID
     */
    var controllerNodeID: String = "00000000-0000-0000-0000-000000000000"
    /**
     * 対象ノードのUUID
     */
    var targetNodeID: String = "11111111-1111-1111-1111-111111111111"

    /**
     * コントローラーノード用のアクセストークン
     *
     * intdash APIで取得したアクセストークンを指定して下さい。
     */
    var accessTokenForController : String = ""
    /**
     * 対象ノード用のアクセストークン
     *
     * intdash APIで取得したアクセストークンを指定して下さい。
     */
    var accessTokenForTarget: String = ""

    /**
     * コントローラーノード用のコネクション
     */
    var connectionForController: Connection? = null
    /**
     * 対象ノード用のコネクション
     */
    var connectionForTarget: Connection? = null
}

//region コントローラーノードからメッセージを送信するサンプルです。このサンプルでは文字列メッセージを対象ノードに対して送信し、対象ノードからのリプライを待ちます。

fun E2ECallExampleActivity.connectForController() {
    // 接続情報のセットアップをします。
    var urls = targetServer.split("://")
    var address: String
    var enableTls: Boolean = false
    if (urls.size == 1)
    {
        address = urls[0]
    }
    else
    {
        enableTls = urls[0] == "https"
        address = urls[1]
    }
    // WebSocketを使って接続するように指定します。
    var connector: IConnector = WebSocketConnector(enableTls = enableTls)
    Connection.connectAsync(
        address = address,
        connector = connector,
        tokenSource = {
            // アクセス用のトークンを指定します。接続時に発生するイベントにより使用されます。
            // ここでは固定のトークンを返していますが随時トークンの更新を行う実装にするとトークンの期限切れを考える必要がなくなります。
            accessTokenForController
        },
        nodeId = controllerNodeID,
        completion = { con, ex ->
            if (con == null) {
                // 接続失敗
                return@connectAsync
            }
            // 接続成功.
            connectionForController = con
        }
    )
}

fun E2ECallExampleActivity.sendCall() {
    // コールを送信し、リプライコールを受信するとコールバックが発生します。
    connectionForController?.sendCallAndWaitReplyCallAsync(
        upstreamCall = UpstreamCall(
            destinationNodeId = targetNodeID,
            name = "greeting",
            type = "string",
            payload = "hello".toByteArray()
        ),
        completion = { downstreamReplyCall, ex ->
            if (ex != null) {
                // コールの送信もしくはリプライの受信に失敗。
                return@sendCallAndWaitReplyCallAsync
            }
            // コールの送信及びリプライの受信に成功。
        }
    )
}

//endregion

//region コントローラーノードからのコールを受け付け、すぐにリプライするサンプルです。

fun E2ECallExampleActivity.connectForTarget() {
    // 接続情報のセットアップをします。
    var urls = targetServer.split("://")
    var address: String
    var enableTls: Boolean = false
    if (urls.size == 1)
    {
        address = urls[0]
    }
    else
    {
        enableTls = urls[0] == "https"
        address = urls[1]
    }
    // WebSocketを使って接続するように指定します。
    var connector: IConnector = WebSocketConnector(enableTls = enableTls)
    Connection.connectAsync(
        address = address,
        connector = connector,
        tokenSource = {
            // アクセス用のトークンを指定します。接続時に発生するイベントにより使用されます。
            // ここでは固定のトークンを返していますが随時トークンの更新を行う実装にするとトークンの期限切れを考える必要がなくなります。
            accessTokenForTarget
        },
        nodeId = targetNodeID,
        completion = { con, ex ->
            if (con == null) {
                // 接続失敗
                return@connectAsync
            }
            // 接続成功.
            this.connectionForTarget = con
            // DownstreamCallの受信を監視するためにコールバックを設定します。
            con.e2ECallCallbacks = this.e2ECallCallbacks // ConnectionE2ECallCallbacks
        }
    )
}

//endregion

val E2ECallExampleActivity.e2ECallCallbacks: ConnectionE2ECallCallbacks
    get() = object : ConnectionE2ECallCallbacks {
        override fun onReceiveCall(connection: Connection, downstreamCall: DownstreamCall) {
            // DownstreamCallを受信した際にコールされます。
            // このサンプルではDownstreamCallを受信したらすぐにリプライコールを送信します。
            connection.sendReplyCallAsync(
                upstreamReplyCall = UpstreamReplyCall(
                    requestCallId = downstreamCall.callId,
                    destinationNodeId = downstreamCall.sourceNodeId,
                    name = "reply_greeting",
                    type = "string",
                    payload = "world".toByteArray()
                ),
                completion = { ex ->
                    if (ex != null) {
                        // リプライコールの送信に失敗。
                        return@sendReplyCallAsync
                    }
                    // リプライコールの送信に成功。
                }
            )
        }

        override fun onReceiveReplyCall(connection: Connection, downstreamReplyCall: DownstreamReplyCall) {
            // DownstreamReplyCallを受信した際にコールされます。
        }
    }

References

About

iSCP client library for Java and Kotlin. iSCP (intdash Stream Control Protocol) is an L7 network protocol authored by aptpod, Inc.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors