How to simplify get GRPC streaming in Swift

Issue #360

Given a streaming service

1
2
3
service Server {
rpc GetUsers(GetUsersRequest) returns (stream GetUsersResponse);
}

To get a response list in Swift, we need to do observe stream, which is a subclass of ClientCallServerStreaming

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func getUsers(roomId: String, completion: @escaping (Result<[User], Error>) -> Void) {
let request = withValue(Server_GetUsersRequest()) {
$0.roomId = roomId
}

DispatchQueue.global().async {
var users = [User]()

do {
var streaming = true
let stream = try self.client.getUsers(request, completion: { _ in
streaming = false
})

while streaming {
if let response = try stream.receive() {
users.append(response.user)
}
}

DispatchQueue.main.async {
completion(.success(users))
}
} catch {
DispatchQueue.main.async {
completion(.failure(error))
}
}
}
}

This can get repetitive very fast. To avoid the duplication, we can make a generic function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import SwiftGRPC

func getStream<Streaming, Response>(
makeStream: @escaping (@escaping () -> Void) throws -> Streaming,
receive: @escaping (Streaming) throws -> Response?,
completion: @escaping (Result<[Response], Error>) -> Void) {

DispatchQueue.global().async {
var responses = [Response]()

do {
var streaming = true

let stream = try makeStream({
streaming = false
})

while streaming {
if let response = try receive(stream) {
responses.append(response)
}
}

DispatchQueue.main.async {
completion(.success(responses))
}
} catch {
DispatchQueue.main.async {
completion(.failure(error))
}
}
}
}

Since swift-grpc generates very concrete structs, we need to use generic. The difference is the Streaming class and Response struct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func getUsers(roomId: String, completion: @escaping (Result<[User], Error>) -> Void) {
let request = withValue(Server_GetUsersRequest()) {
$0.roomId = roomId
}

getStream(
makeStream: { completion in
return try self.client.getUsers(request, completion: { _ in
completion()
})
}, receive: { stream in
return try stream.receive()
}, completion: { result in
completion(result.map { $0.map { $0.user }})
})
}

Handle CallResult

1
2
3
4
5
6
7
8
9
10
11
import SwiftGRPC
import SwiftProtobuf

extension CallResult {
func toError() -> NSError {
return NSError(domain: "com.myApp", code: statusCode.rawValue, userInfo: [
"status_code": statusCode,
"status_message": statusMessage ?? ""
])
}
}

Comments