Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/code-quality.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,14 @@ jobs:

- name: Build and test
run: xcodebuild test -scheme OllamaKit -destination 'platform=macOS,arch=x86_64'

test-linux:
runs-on: ubuntu-latest
steps:
- name: Swift version
run: swift --version

- uses: actions/checkout@v4

- name: Build and test
run: swift build
57 changes: 55 additions & 2 deletions Sources/OllamaKit/Utils/OKHTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
// Created by Kevin Hermawan on 08/06/24.
//

import Combine
import Foundation
#if canImport(FoundationNetworking)
import FoundationNetworking
#endif

internal struct OKHTTPClient {
private let decoder: JSONDecoder = .default
Expand All @@ -26,7 +28,53 @@ internal extension OKHTTPClient {

return try decoder.decode(T.self, from: data)
}


#if canImport(FoundationNetworking)
func stream<T: Decodable>(request: URLRequest, with responseType: T.Type) -> AsyncThrowingStream<T, Error> {
return AsyncThrowingStream { continuation in
Task {
let delegate = StreamingDelegate(
urlResponseCallback: { response in
do {
try validate(response: response)
} catch {
continuation.finish(throwing: error)
}
},
dataCallback: { buffer in //
while let chunk = self.extractNextJSON(from: &buffer) {
do {
let decodedObject = try self.decoder.decode(T.self, from: chunk)
continuation.yield(decodedObject)
} catch {
continuation.finish(throwing: error)
return
}
}
},
completionCallback: { error in
if let error {
continuation.finish(throwing: error)
}
continuation.finish()
}
)

let session = URLSession(configuration: .default, delegate: delegate, delegateQueue: .main)
let task = session.dataTask(with: request)

continuation.onTermination = { terminationState in
// Cancellation of our task should cancel the URLSessionDataTask
if case .cancelled = terminationState {
task.cancel()
}
}

task.resume()
}
}
}
#else
func stream<T: Decodable>(request: URLRequest, with responseType: T.Type) -> AsyncThrowingStream<T, Error> {
return AsyncThrowingStream { continuation in
Task {
Expand Down Expand Up @@ -64,8 +112,12 @@ internal extension OKHTTPClient {
}
}
}
#endif
}

#if canImport(Combine)
import Combine

internal extension OKHTTPClient {
func send<T: Decodable>(request: URLRequest, with responseType: T.Type) -> AnyPublisher<T, Error> {
return URLSession.shared.dataTaskPublisher(for: request)
Expand Down Expand Up @@ -119,6 +171,7 @@ internal extension OKHTTPClient {
.eraseToAnyPublisher()
}
}
#endif

private extension OKHTTPClient {
func validate(response: URLResponse) throws {
Expand Down
3 changes: 3 additions & 0 deletions Sources/OllamaKit/Utils/OKRouter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
//

import Foundation
#if canImport(FoundationNetworking)
import FoundationNetworking
#endif

internal enum OKRouter {
case root
Expand Down
52 changes: 51 additions & 1 deletion Sources/OllamaKit/Utils/StreamingDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
// Created by Kevin Hermawan on 09/06/24.
//

@preconcurrency import Combine
import Foundation
#if canImport(Combine)
@preconcurrency import Combine

internal class StreamingDelegate: NSObject, URLSessionDataDelegate, @unchecked Sendable {
private let subject = PassthroughSubject<Data, URLError>()
Expand All @@ -27,3 +28,52 @@ internal class StreamingDelegate: NSObject, URLSessionDataDelegate, @unchecked S
subject.eraseToAnyPublisher()
}
}
#endif

#if canImport(FoundationNetworking)
import FoundationNetworking
internal final class StreamingDelegate: NSObject, URLSessionDataDelegate, @unchecked Sendable {

let urlResponseCallback: (@Sendable (URLResponse) -> Void)?
let dataCallback: (@Sendable (inout Data) -> Void)?
let completionCallback: (@Sendable (Error?) -> Void)?

var buffer = Data()

init(urlResponseCallback: (@Sendable (URLResponse) -> Void)?,
dataCallback: (@Sendable (inout Data) -> Void)?,
completionCallback: (@Sendable (Error?) -> Void)?
) {
self.urlResponseCallback = urlResponseCallback
self.dataCallback = dataCallback
self.completionCallback = completionCallback
}

func urlSession(
_ session: URLSession,
dataTask: URLSessionDataTask,
didReceive response: URLResponse,
completionHandler: @escaping (URLSession.ResponseDisposition) -> Void
) {
// Handle the URLResponse here
urlResponseCallback?(response)
completionHandler(.allow)
}

func urlSession(_ session: URLSession,
dataTask: URLSessionDataTask,
didReceive data: Data) {
buffer.append(data)

// Handle the incoming data chunk here
dataCallback?(&buffer)
}

func urlSession(_ session: URLSession,
task: URLSessionTask,
didCompleteWithError error: Error?) {
// Handle completion or errors
completionCallback?(error)
}
}
#endif