mirror of
https://github.com/simplex-chat/simplex-chat.git
synced 2025-06-28 20:29:53 +00:00
ios: moving webrtc commands processing to another mechanism (#3480)
* ios: moving webrtc commands processing to another mechanism * async * decide * handle errors * error alert * await --------- Co-authored-by: Avently <avently@local> Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
parent
6a21d5c7f1
commit
05a64c99a2
6 changed files with 124 additions and 88 deletions
|
@ -83,7 +83,7 @@ final class ChatModel: ObservableObject {
|
|||
// current WebRTC call
|
||||
@Published var callInvitations: Dictionary<ChatId, RcvCallInvitation> = [:]
|
||||
@Published var activeCall: Call?
|
||||
@Published var callCommand: WCallCommand?
|
||||
let callCommand: WebRTCCommandProcessor = WebRTCCommandProcessor()
|
||||
@Published var showCallView = false
|
||||
// remote desktop
|
||||
@Published var remoteCtrlSession: RemoteCtrlSession?
|
||||
|
|
|
@ -1666,36 +1666,40 @@ func processReceivedMsg(_ res: ChatResponse) async {
|
|||
activateCall(invitation)
|
||||
case let .callOffer(_, contact, callType, offer, sharedKey, _):
|
||||
await withCall(contact) { call in
|
||||
call.callState = .offerReceived
|
||||
call.peerMedia = callType.media
|
||||
call.sharedKey = sharedKey
|
||||
await MainActor.run {
|
||||
call.callState = .offerReceived
|
||||
call.peerMedia = callType.media
|
||||
call.sharedKey = sharedKey
|
||||
}
|
||||
let useRelay = UserDefaults.standard.bool(forKey: DEFAULT_WEBRTC_POLICY_RELAY)
|
||||
let iceServers = getIceServers()
|
||||
logger.debug(".callOffer useRelay \(useRelay)")
|
||||
logger.debug(".callOffer iceServers \(String(describing: iceServers))")
|
||||
m.callCommand = .offer(
|
||||
await m.callCommand.processCommand(.offer(
|
||||
offer: offer.rtcSession,
|
||||
iceCandidates: offer.rtcIceCandidates,
|
||||
media: callType.media, aesKey: sharedKey,
|
||||
iceServers: iceServers,
|
||||
relay: useRelay
|
||||
)
|
||||
))
|
||||
}
|
||||
case let .callAnswer(_, contact, answer):
|
||||
await withCall(contact) { call in
|
||||
call.callState = .answerReceived
|
||||
m.callCommand = .answer(answer: answer.rtcSession, iceCandidates: answer.rtcIceCandidates)
|
||||
await MainActor.run {
|
||||
call.callState = .answerReceived
|
||||
}
|
||||
await m.callCommand.processCommand(.answer(answer: answer.rtcSession, iceCandidates: answer.rtcIceCandidates))
|
||||
}
|
||||
case let .callExtraInfo(_, contact, extraInfo):
|
||||
await withCall(contact) { _ in
|
||||
m.callCommand = .ice(iceCandidates: extraInfo.rtcIceCandidates)
|
||||
await m.callCommand.processCommand(.ice(iceCandidates: extraInfo.rtcIceCandidates))
|
||||
}
|
||||
case let .callEnded(_, contact):
|
||||
if let invitation = await MainActor.run(body: { m.callInvitations.removeValue(forKey: contact.id) }) {
|
||||
CallController.shared.reportCallRemoteEnded(invitation: invitation)
|
||||
}
|
||||
await withCall(contact) { call in
|
||||
m.callCommand = .end
|
||||
await m.callCommand.processCommand(.end)
|
||||
CallController.shared.reportCallRemoteEnded(call: call)
|
||||
}
|
||||
case .chatSuspended:
|
||||
|
@ -1753,9 +1757,9 @@ func processReceivedMsg(_ res: ChatResponse) async {
|
|||
logger.debug("unsupported event: \(res.responseType)")
|
||||
}
|
||||
|
||||
func withCall(_ contact: Contact, _ perform: (Call) -> Void) async {
|
||||
func withCall(_ contact: Contact, _ perform: (Call) async -> Void) async {
|
||||
if let call = m.activeCall, call.contact.apiId == contact.apiId {
|
||||
await MainActor.run { perform(call) }
|
||||
await perform(call)
|
||||
} else {
|
||||
logger.debug("processReceivedMsg: ignoring \(res.responseType), not in call with the contact \(contact.id)")
|
||||
}
|
||||
|
|
|
@ -49,15 +49,10 @@ struct ActiveCallView: View {
|
|||
}
|
||||
.onDisappear {
|
||||
logger.debug("ActiveCallView: disappear")
|
||||
Task { await m.callCommand.setClient(nil) }
|
||||
AppDelegate.keepScreenOn(false)
|
||||
client?.endCall()
|
||||
}
|
||||
.onChange(of: m.callCommand) { cmd in
|
||||
if let cmd = cmd {
|
||||
m.callCommand = nil
|
||||
sendCommandToClient(cmd)
|
||||
}
|
||||
}
|
||||
.background(.black)
|
||||
.preferredColorScheme(.dark)
|
||||
}
|
||||
|
@ -65,20 +60,8 @@ struct ActiveCallView: View {
|
|||
private func createWebRTCClient() {
|
||||
if client == nil && canConnectCall {
|
||||
client = WebRTCClient($activeCall, { msg in await MainActor.run { processRtcMessage(msg: msg) } }, $localRendererAspectRatio)
|
||||
if let cmd = m.callCommand {
|
||||
m.callCommand = nil
|
||||
sendCommandToClient(cmd)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func sendCommandToClient(_ cmd: WCallCommand) {
|
||||
if call == m.activeCall,
|
||||
m.activeCall != nil,
|
||||
let client = client {
|
||||
logger.debug("sendCallCommand: \(cmd.cmdType)")
|
||||
Task {
|
||||
await client.sendCallCommand(command: cmd)
|
||||
await m.callCommand.setClient(client)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -174,8 +157,10 @@ struct ActiveCallView: View {
|
|||
}
|
||||
case let .error(message):
|
||||
logger.debug("ActiveCallView: command error: \(message)")
|
||||
AlertManager.shared.showAlert(Alert(title: Text("Error"), message: Text(message)))
|
||||
case let .invalid(type):
|
||||
logger.debug("ActiveCallView: invalid response: \(type)")
|
||||
AlertManager.shared.showAlert(Alert(title: Text("Invalid response"), message: Text(type)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ class CallManager {
|
|||
let m = ChatModel.shared
|
||||
if let call = m.activeCall, call.callkitUUID == callUUID {
|
||||
m.showCallView = true
|
||||
m.callCommand = .capabilities(media: call.localMedia)
|
||||
Task { await m.callCommand.processCommand(.capabilities(media: call.localMedia)) }
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
@ -57,19 +57,21 @@ class CallManager {
|
|||
m.activeCall = call
|
||||
m.showCallView = true
|
||||
|
||||
m.callCommand = .start(
|
||||
Task {
|
||||
await m.callCommand.processCommand(.start(
|
||||
media: invitation.callType.media,
|
||||
aesKey: invitation.sharedKey,
|
||||
iceServers: iceServers,
|
||||
relay: useRelay
|
||||
)
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func enableMedia(media: CallMediaType, enable: Bool, callUUID: UUID) -> Bool {
|
||||
if let call = ChatModel.shared.activeCall, call.callkitUUID == callUUID {
|
||||
let m = ChatModel.shared
|
||||
m.callCommand = .media(media: media, enable: enable)
|
||||
Task { await m.callCommand.processCommand(.media(media: media, enable: enable)) }
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
@ -94,13 +96,13 @@ class CallManager {
|
|||
completed()
|
||||
} else {
|
||||
logger.debug("CallManager.endCall: ending call...")
|
||||
// TODO this command won't be executed because activeCall is assigned nil,
|
||||
// and there is a condition in sendCommandToClient that would prevent its execution.
|
||||
m.callCommand = .end
|
||||
m.activeCall = nil
|
||||
m.showCallView = false
|
||||
completed()
|
||||
Task {
|
||||
await m.callCommand.processCommand(.end)
|
||||
await MainActor.run {
|
||||
m.activeCall = nil
|
||||
m.showCallView = false
|
||||
completed()
|
||||
}
|
||||
do {
|
||||
try await apiEndCall(call.contact)
|
||||
} catch {
|
||||
|
|
|
@ -335,6 +335,50 @@ extension WCallResponse: Encodable {
|
|||
}
|
||||
}
|
||||
|
||||
actor WebRTCCommandProcessor {
|
||||
private var client: WebRTCClient? = nil
|
||||
private var commands: [WCallCommand] = []
|
||||
private var running: Bool = false
|
||||
|
||||
func setClient(_ client: WebRTCClient?) async {
|
||||
logger.debug("WebRTC: setClient, commands count \(self.commands.count)")
|
||||
self.client = client
|
||||
if client != nil {
|
||||
await processAllCommands()
|
||||
} else {
|
||||
commands.removeAll()
|
||||
}
|
||||
}
|
||||
|
||||
func processCommand(_ c: WCallCommand) async {
|
||||
// logger.debug("WebRTC: process command \(c.cmdType)")
|
||||
commands.append(c)
|
||||
if !running && client != nil {
|
||||
await processAllCommands()
|
||||
}
|
||||
}
|
||||
|
||||
func processAllCommands() async {
|
||||
logger.debug("WebRTC: process all commands, commands count \(self.commands.count), client == nil \(self.client == nil)")
|
||||
if let client = client {
|
||||
running = true
|
||||
while let c = commands.first, shouldRunCommand(client, c) {
|
||||
commands.remove(at: 0)
|
||||
await client.sendCallCommand(command: c)
|
||||
logger.debug("WebRTC: processed cmd \(c.cmdType)")
|
||||
}
|
||||
running = false
|
||||
}
|
||||
}
|
||||
|
||||
func shouldRunCommand(_ client: WebRTCClient, _ c: WCallCommand) -> Bool {
|
||||
switch c {
|
||||
case .capabilities, .start, .offer, .end: true
|
||||
default: client.activeCall.wrappedValue != nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ConnectionState: Codable, Equatable {
|
||||
var connectionState: String
|
||||
var iceConnectionState: String
|
||||
|
|
|
@ -50,7 +50,7 @@ final class WebRTCClient: NSObject, RTCVideoViewDelegate, RTCFrameEncryptorDeleg
|
|||
private let rtcAudioSession = RTCAudioSession.sharedInstance()
|
||||
private let audioQueue = DispatchQueue(label: "audio")
|
||||
private var sendCallResponse: (WVAPIMessage) async -> Void
|
||||
private var activeCall: Binding<Call?>
|
||||
var activeCall: Binding<Call?>
|
||||
private var localRendererAspectRatio: Binding<CGFloat?>
|
||||
|
||||
@available(*, unavailable)
|
||||
|
@ -160,19 +160,16 @@ final class WebRTCClient: NSObject, RTCVideoViewDelegate, RTCFrameEncryptorDeleg
|
|||
let encryption = WebRTCClient.enableEncryption
|
||||
let call = initializeCall(iceServers?.toWebRTCIceServers(), media, encryption ? aesKey : nil, relay)
|
||||
activeCall.wrappedValue = call
|
||||
call.connection.offer { answer in
|
||||
Task {
|
||||
await self.sendCallResponse(.init(
|
||||
corrId: nil,
|
||||
resp: .offer(
|
||||
offer: compressToBase64(input: encodeJSON(CustomRTCSessionDescription(type: answer.type.toSdpType(), sdp: answer.sdp))),
|
||||
iceCandidates: compressToBase64(input: encodeJSON(await self.getInitialIceCandidates())),
|
||||
capabilities: CallCapabilities(encryption: encryption)
|
||||
),
|
||||
command: command)
|
||||
)
|
||||
await self.waitForMoreIceCandidates()
|
||||
}
|
||||
let (offer, error) = await call.connection.offer()
|
||||
if let offer = offer {
|
||||
resp = .offer(
|
||||
offer: compressToBase64(input: encodeJSON(CustomRTCSessionDescription(type: offer.type.toSdpType(), sdp: offer.sdp))),
|
||||
iceCandidates: compressToBase64(input: encodeJSON(await self.getInitialIceCandidates())),
|
||||
capabilities: CallCapabilities(encryption: encryption)
|
||||
)
|
||||
self.waitForMoreIceCandidates()
|
||||
} else {
|
||||
resp = .error(message: "offer error: \(error?.localizedDescription ?? "unknown error")")
|
||||
}
|
||||
case let .offer(offer, iceCandidates, media, aesKey, iceServers, relay):
|
||||
if activeCall.wrappedValue != nil {
|
||||
|
@ -186,22 +183,16 @@ final class WebRTCClient: NSObject, RTCVideoViewDelegate, RTCFrameEncryptorDeleg
|
|||
let pc = call.connection
|
||||
if let type = offer.type, let sdp = offer.sdp {
|
||||
if (try? await pc.setRemoteDescription(RTCSessionDescription(type: type.toWebRTCSdpType(), sdp: sdp))) != nil {
|
||||
pc.answer { answer in
|
||||
let (answer, error) = await pc.answer()
|
||||
if let answer = answer {
|
||||
self.addIceCandidates(pc, remoteIceCandidates)
|
||||
// Task {
|
||||
// try? await Task.sleep(nanoseconds: 32_000 * 1000000)
|
||||
Task {
|
||||
await self.sendCallResponse(.init(
|
||||
corrId: nil,
|
||||
resp: .answer(
|
||||
answer: compressToBase64(input: encodeJSON(CustomRTCSessionDescription(type: answer.type.toSdpType(), sdp: answer.sdp))),
|
||||
iceCandidates: compressToBase64(input: encodeJSON(await self.getInitialIceCandidates()))
|
||||
),
|
||||
command: command)
|
||||
)
|
||||
await self.waitForMoreIceCandidates()
|
||||
}
|
||||
// }
|
||||
resp = .answer(
|
||||
answer: compressToBase64(input: encodeJSON(CustomRTCSessionDescription(type: answer.type.toSdpType(), sdp: answer.sdp))),
|
||||
iceCandidates: compressToBase64(input: encodeJSON(await self.getInitialIceCandidates()))
|
||||
)
|
||||
self.waitForMoreIceCandidates()
|
||||
} else {
|
||||
resp = .error(message: "answer error: \(error?.localizedDescription ?? "unknown error")")
|
||||
}
|
||||
} else {
|
||||
resp = .error(message: "accept: remote description is not set")
|
||||
|
@ -260,12 +251,14 @@ final class WebRTCClient: NSObject, RTCVideoViewDelegate, RTCFrameEncryptorDeleg
|
|||
return candidates
|
||||
}
|
||||
|
||||
func waitForMoreIceCandidates() async {
|
||||
await untilIceComplete(timeoutMs: 12000, stepMs: 1500) {
|
||||
let candidates = await self.activeCall.wrappedValue?.iceCandidates.getAndClear() ?? []
|
||||
if candidates.count > 0 {
|
||||
logger.debug("WebRTCClient: sending more ice candidates: \(candidates.count)")
|
||||
await self.sendIceCandidates(candidates)
|
||||
func waitForMoreIceCandidates() {
|
||||
Task {
|
||||
await untilIceComplete(timeoutMs: 12000, stepMs: 1500) {
|
||||
let candidates = await self.activeCall.wrappedValue?.iceCandidates.getAndClear() ?? []
|
||||
if candidates.count > 0 {
|
||||
logger.debug("WebRTCClient: sending more ice candidates: \(candidates.count)")
|
||||
await self.sendIceCandidates(candidates)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -442,25 +435,33 @@ extension WebRTC.RTCPeerConnection {
|
|||
optionalConstraints: nil)
|
||||
}
|
||||
|
||||
func offer(_ completion: @escaping (_ sdp: RTCSessionDescription) -> Void) {
|
||||
offer(for: mediaConstraints()) { (sdp, error) in
|
||||
guard let sdp = sdp else {
|
||||
return
|
||||
func offer() async -> (RTCSessionDescription?, Error?) {
|
||||
await withCheckedContinuation { cont in
|
||||
offer(for: mediaConstraints()) { (sdp, error) in
|
||||
self.processSDP(cont, sdp, error)
|
||||
}
|
||||
self.setLocalDescription(sdp, completionHandler: { (error) in
|
||||
completion(sdp)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func answer(_ completion: @escaping (_ sdp: RTCSessionDescription) -> Void) {
|
||||
answer(for: mediaConstraints()) { (sdp, error) in
|
||||
guard let sdp = sdp else {
|
||||
return
|
||||
func answer() async -> (RTCSessionDescription?, Error?) {
|
||||
await withCheckedContinuation { cont in
|
||||
answer(for: mediaConstraints()) { (sdp, error) in
|
||||
self.processSDP(cont, sdp, error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func processSDP(_ cont: CheckedContinuation<(RTCSessionDescription?, Error?), Never>, _ sdp: RTCSessionDescription?, _ error: Error?) {
|
||||
if let sdp = sdp {
|
||||
self.setLocalDescription(sdp, completionHandler: { (error) in
|
||||
completion(sdp)
|
||||
if let error = error {
|
||||
cont.resume(returning: (nil, error))
|
||||
} else {
|
||||
cont.resume(returning: (sdp, nil))
|
||||
}
|
||||
})
|
||||
} else {
|
||||
cont.resume(returning: (nil, error))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue