SimpleX-Chat/apps/ios/SimpleX NSE/NotificationService.swift
Evgeny a0d1cca389
core: split response to two types, to improve iOS parsing memory usage (#5867)
* core: split response to two types, to improve iOS parsing memory usage

* ios: split core events to separate types

* comment

* limit more events to CLI

* fix parser

* simplemq
2025-05-04 22:14:36 +01:00

1021 lines
44 KiB
Swift

//
// NotificationService.swift
// SimpleX NSE
//
// Created by Evgeny on 26/04/2022.
// Copyright © 2022 SimpleX Chat. All rights reserved.
//
import UserNotifications
import OSLog
import StoreKit
import CallKit
import SimpleXChat
let logger = Logger()
let appSuspendingDelay: UInt64 = 2_500_000_000
typealias SuspendSchedule = (delay: TimeInterval, timeout: Int)
let nseSuspendSchedule: SuspendSchedule = (2, 4)
let fastNSESuspendSchedule: SuspendSchedule = (1, 1)
enum NSENotification {
case nse(UNMutableNotificationContent)
case callkit(RcvCallInvitation)
case empty
}
public enum NSENotificationData {
case connectionEvent(_ user: User, _ connEntity: ConnectionEntity)
case contactConnected(_ user: any UserLike, _ contact: Contact)
case contactRequest(_ user: any UserLike, _ contactRequest: UserContactRequest)
case messageReceived(_ user: any UserLike, _ cInfo: ChatInfo, _ cItem: ChatItem)
case callInvitation(_ invitation: RcvCallInvitation)
case msgInfo(NtfMsgAckInfo)
case noNtf
var callInvitation: RcvCallInvitation? {
switch self {
case let .callInvitation(invitation): invitation
default: nil
}
}
func notificationContent(_ badgeCount: Int) -> UNMutableNotificationContent {
return switch self {
case let .connectionEvent(user, connEntity): createConnectionEventNtf(user, connEntity, badgeCount)
case let .contactConnected(user, contact): createContactConnectedNtf(user, contact, badgeCount)
case let .contactRequest(user, contactRequest): createContactRequestNtf(user, contactRequest, badgeCount)
case let .messageReceived(user, cInfo, cItem): createMessageReceivedNtf(user, cInfo, cItem, badgeCount)
case let .callInvitation(invitation): createCallInvitationNtf(invitation, badgeCount)
case .msgInfo: UNMutableNotificationContent()
case .noNtf: UNMutableNotificationContent()
}
}
var notificationEvent: NSENotificationData? {
return switch self {
case .connectionEvent: self
case .contactConnected: self
case .contactRequest: self
case .messageReceived: self
case .callInvitation: self
case .msgInfo: nil
case .noNtf: nil
}
}
var newMsgData: (any UserLike, ChatInfo)? {
return switch self {
case let .messageReceived(user, cInfo, _): (user, cInfo)
default: nil
}
}
}
// Once the last thread in the process completes processing chat controller is suspended, and the database is closed, to avoid
// background crashes and contention for database with the application (both UI and background fetch triggered either on schedule
// or when background notification is received.
class NSEThreads {
static let shared = NSEThreads()
static let queue = DispatchQueue(label: "chat.simplex.app.SimpleX-NSE.notification-threads.lock")
private var allThreads: Set<UUID> = []
var activeThreads: [(UUID, NotificationService)] = []
var droppedNotifications: [(ChatId, NSENotificationData)] = []
func newThread() -> UUID {
NSEThreads.queue.sync {
let (_, t) = allThreads.insert(UUID())
return t
}
}
func startThread(_ t: UUID, _ service: NotificationService) {
NSEThreads.queue.sync {
if allThreads.contains(t) {
activeThreads.append((t, service))
} else {
logger.warning("NotificationService startThread: thread \(t) was removed before it started")
}
}
}
func processNotification(_ id: ChatId, _ ntf: NSENotificationData) async -> Void {
if let (_, nse) = rcvEntityThread(id),
nse.expectedMessages[id]?.shouldProcessNtf ?? false {
nse.processReceivedNtf(id, ntf, signalReady: true)
}
}
private func rcvEntityThread(_ id: ChatId) -> (UUID, NotificationService)? {
NSEThreads.queue.sync {
// this selects the earliest thread that:
// 1) has this connection in nse.expectedMessages
// 2) has not completed processing messages for this connection (not ready)
activeThreads.first(where: { (_, nse) in nse.expectedMessages[id]?.ready == false })
}
}
func endThread(_ t: UUID) -> Bool {
NSEThreads.queue.sync {
let tActive: UUID? = if let index = activeThreads.firstIndex(where: { $0.0 == t }) {
activeThreads.remove(at: index).0
} else {
nil
}
let t = allThreads.remove(t)
if tActive != nil && activeThreads.isEmpty {
return true
}
if t != nil && allThreads.isEmpty {
NSEChatState.shared.set(.suspended)
}
return false
}
}
var noThreads: Bool {
allThreads.isEmpty
}
}
struct ExpectedMessage {
var ntfConn: NtfConn
var expectedMsgId: String?
var allowedGetNextAttempts: Int
var msgBestAttemptNtf: NSENotificationData?
var ready: Bool
var shouldProcessNtf: Bool
var startedProcessingNewMsgs: Bool
var semaphore: DispatchSemaphore
var connMsgReq: ConnMsgReq? {
if let expectedMsg_ = ntfConn.expectedMsg_ {
ConnMsgReq(msgConnId: ntfConn.agentConnId, msgDbQueueId: ntfConn.agentDbQueueId, msgTs: expectedMsg_.msgTs)
} else {
nil
}
}
}
// Notification service extension creates a new instance of the class and calls didReceive for each notification.
// Each didReceive is called in its own thread, but multiple calls can be made in one process, and, empirically, there is never
// more than one process of notification service extension exists at a time.
// Soon after notification service delivers the last notification it is either suspended or terminated.
class NotificationService: UNNotificationServiceExtension {
var contentHandler: ((UNNotificationContent) -> Void)?
// served as notification if no message attempts (msgBestAttemptNtf) could be produced
var serviceBestAttemptNtf: NSENotification?
var badgeCount: Int = 0
// thread is added to allThreads here - if thread did not start chat,
// chat does not need to be suspended but NSE state still needs to be set to "suspended".
var threadId: UUID? = NSEThreads.shared.newThread()
var expectedMessages: Dictionary<String, ExpectedMessage> = [:] // key is receiveEntityId
var appSubscriber: AppSubscriber?
var returnedSuspension = false
override func didReceive(_ request: UNNotificationRequest, withContentHandler contentHandler: @escaping (UNNotificationContent) -> Void) {
logger.debug("DEBUGGING: NotificationService.didReceive")
let receivedNtf = if let ntf_ = request.content.mutableCopy() as? UNMutableNotificationContent { ntf_ } else { UNMutableNotificationContent() }
setServiceBestAttemptNtf(receivedNtf)
self.contentHandler = contentHandler
registerGroupDefaults()
let appState = appStateGroupDefault.get()
logger.debug("NotificationService: app is \(appState.rawValue)")
switch appState {
case .stopped:
// Use this block to debug notificaitons delivery in CLI, with "ejected" database and stopped chat
// if let nrData = ntfRequestData(request) {
// logger.debug("NotificationService get notification connections: /_ntf conns \(nrData.nonce) \(nrData.encNtfInfo)")
// contentHandler(receivedNtf)
// return;
// }
setBadgeCount()
contentHandler(createAppStoppedNtf(badgeCount))
case .suspended:
setExpirationTimer()
receiveNtfMessages(request)
case .suspending:
setExpirationTimer()
Task {
let state: AppState = await withCheckedContinuation { cont in
appSubscriber = appStateSubscriber { s in
if s == .suspended { appSuspension(s) }
}
DispatchQueue.global().asyncAfter(deadline: .now() + Double(appSuspendTimeout) + 1) {
logger.debug("NotificationService: appSuspension timeout")
appSuspension(appStateGroupDefault.get())
}
@Sendable
func appSuspension(_ s: AppState) {
if !self.returnedSuspension {
self.returnedSuspension = true
self.appSubscriber = nil // this disposes of appStateSubscriber
cont.resume(returning: s)
}
}
}
logger.debug("NotificationService: app state is now \(state.rawValue)")
if state.inactive && self.contentHandler != nil {
receiveNtfMessages(request)
} else {
contentHandler(receivedNtf)
}
}
case .active: contentHandler(receivedNtf)
case .activating: contentHandler(receivedNtf)
case .bgRefresh: contentHandler(receivedNtf)
}
}
private func setExpirationTimer() -> Void {
DispatchQueue.main.asyncAfter(deadline: .now() + 30) {
self.deliverBestAttemptNtf(urgent: true)
}
}
private func ntfRequestData(_ request: UNNotificationRequest) -> (nonce: String, encNtfInfo: String)? {
if let ntfData = request.content.userInfo["notificationData"] as? [AnyHashable : Any],
let nonce = ntfData["nonce"] as? String,
let encNtfInfo = ntfData["message"] as? String {
(nonce, encNtfInfo)
} else {
nil
}
}
func receiveNtfMessages(_ request: UNNotificationRequest) {
logger.debug("NotificationService: receiveNtfMessages")
if case .documents = dbContainerGroupDefault.get() {
deliverBestAttemptNtf()
return
}
if let nrData = ntfRequestData(request),
// check it here again
appStateGroupDefault.get().inactive {
// thread is added to activeThreads tracking set here - if thread started chat it needs to be suspended
if let t = threadId { NSEThreads.shared.startThread(t, self) }
let dbStatus = startChat()
if case .ok = dbStatus,
let ntfConns = apiGetNtfConns(nonce: nrData.nonce, encNtfInfo: nrData.encNtfInfo) {
logger.debug("NotificationService: receiveNtfMessages: apiGetNtfConns ntfConns count = \(ntfConns.count)")
// logger.debug("NotificationService: receiveNtfMessages: apiGetNtfConns ntfConns \(String(describing: ntfConns.map { $0.connEntity.id }))")
for ntfConn in ntfConns {
addExpectedMessage(ntfConn: ntfConn)
}
let connMsgReqs = expectedMessages.compactMap { (id, _) in
let started = NSEThreads.queue.sync {
let canStart = checkCanStart(id)
if let t = threadId { logger.debug("NotificationService thread \(t, privacy: .private): receiveNtfMessages: can start: \(canStart)") }
if canStart {
processDroppedNotifications(id)
expectedMessages[id]?.startedProcessingNewMsgs = true
expectedMessages[id]?.shouldProcessNtf = true
}
return canStart
}
if started {
return expectedMessages[id]?.connMsgReq
} else {
if let t = threadId { logger.debug("NotificationService thread \(t, privacy: .private): receiveNtfMessages: entity \(id, privacy: .private) waiting on semaphore") }
expectedMessages[id]?.semaphore.wait()
if let t = threadId { logger.debug("NotificationService thread \(t, privacy: .private): receiveNtfMessages: entity \(id, privacy: .private) proceeding after semaphore") }
Task {
NSEThreads.queue.sync {
processDroppedNotifications(id)
expectedMessages[id]?.startedProcessingNewMsgs = true
expectedMessages[id]?.shouldProcessNtf = true
}
if let connMsgReq = expectedMessages[id]?.connMsgReq {
let _ = getConnNtfMessage(connMsgReq: connMsgReq)
}
}
return nil
}
}
if !connMsgReqs.isEmpty {
if let r = apiGetConnNtfMessages(connMsgReqs: connMsgReqs) {
logger.debug("NotificationService: receiveNtfMessages: apiGetConnNtfMessages count = \(r.count), expecting messages \(r.count { $0 != nil })")
}
return
}
} else if let dbStatus = dbStatus {
setServiceBestAttemptNtf(createErrorNtf(dbStatus, badgeCount))
}
}
deliverBestAttemptNtf()
}
func addExpectedMessage(ntfConn: NtfConn) {
let expectedMsgId = ntfConn.expectedMsg_?.msgId
if let receiveEntityId = ntfConn.connEntity.id {
logger.debug("NotificationService: addExpectedMessage: expectedMsgId = \(expectedMsgId ?? "nil", privacy: .private)")
expectedMessages[receiveEntityId] = ExpectedMessage(
ntfConn: ntfConn,
expectedMsgId: expectedMsgId,
allowedGetNextAttempts: 3,
msgBestAttemptNtf: defaultBestAttemptNtf(ntfConn),
ready: ntfConn.expectedMsg_ == nil, // show defaultBestAttemptNtf(ntfConn) if there is no expected message
shouldProcessNtf: false,
startedProcessingNewMsgs: false,
semaphore: DispatchSemaphore(value: 0)
)
}
}
func checkCanStart(_ entityId: String) -> Bool {
return !NSEThreads.shared.activeThreads.contains(where: {
(tId, nse) in tId != threadId && nse.expectedMessages.contains(where: { $0.key == entityId })
})
}
func processDroppedNotifications(_ entityId: String) {
if !NSEThreads.shared.droppedNotifications.isEmpty {
let messagesToProcess = NSEThreads.shared.droppedNotifications.filter { (eId, _) in eId == entityId }
NSEThreads.shared.droppedNotifications.removeAll(where: { (eId, _) in eId == entityId })
for (index, (_, ntf)) in messagesToProcess.enumerated() {
if let t = threadId { logger.debug("NotificationService thread \(t, privacy: .private): entity \(entityId, privacy: .private): processing dropped notification \(index, privacy: .private)") }
processReceivedNtf(entityId, ntf, signalReady: false)
}
}
}
override func serviceExtensionTimeWillExpire() {
logger.debug("DEBUGGING: NotificationService.serviceExtensionTimeWillExpire")
deliverBestAttemptNtf(urgent: true)
}
var expectingMoreMessages: Bool {
!expectedMessages.allSatisfy { $0.value.ready }
}
func processReceivedNtf(_ id: ChatId, _ ntf: NSENotificationData, signalReady: Bool) {
guard let expectedMessage = expectedMessages[id] else {
return
}
guard let expectedMsgTs = expectedMessage.ntfConn.expectedMsg_?.msgTs else {
NSEThreads.shared.droppedNotifications.append((id, ntf))
if signalReady { entityReady(id) }
return
}
if case let .msgInfo(info) = ntf {
if info.msgId == expectedMessage.expectedMsgId {
logger.debug("NotificationService processNtf: msgInfo msgId = \(info.msgId, privacy: .private): expected")
expectedMessages[id]?.expectedMsgId = nil
if signalReady { entityReady(id) }
self.deliverBestAttemptNtf()
} else if let msgTs = info.msgTs_, msgTs > expectedMsgTs {
logger.debug("NotificationService processNtf: msgInfo msgId = \(info.msgId, privacy: .private): unexpected msgInfo, let other instance to process it, stopping this one")
NSEThreads.shared.droppedNotifications.append((id, ntf))
if signalReady { entityReady(id) }
self.deliverBestAttemptNtf()
} else if (expectedMessages[id]?.allowedGetNextAttempts ?? 0) > 0, let connMsgReq = expectedMessages[id]?.connMsgReq {
logger.debug("NotificationService processNtf: msgInfo msgId = \(info.msgId, privacy: .private): unexpected msgInfo, get next message")
expectedMessages[id]?.allowedGetNextAttempts -= 1
if let receivedMsg = getConnNtfMessage(connMsgReq: connMsgReq) {
logger.debug("NotificationService processNtf, on getConnNtfMessage: msgInfo msgId = \(info.msgId, privacy: .private), receivedMsg msgId = \(receivedMsg.msgId, privacy: .private)")
} else {
logger.debug("NotificationService processNtf, on getConnNtfMessage: msgInfo msgId = \(info.msgId, privacy: .private): no next message, deliver best attempt")
NSEThreads.shared.droppedNotifications.append((id, ntf))
if signalReady { entityReady(id) }
self.deliverBestAttemptNtf()
}
} else {
logger.debug("NotificationService processNtf: msgInfo msgId = \(info.msgId, privacy: .private): unknown message, let other instance to process it")
NSEThreads.shared.droppedNotifications.append((id, ntf))
if signalReady { entityReady(id) }
self.deliverBestAttemptNtf()
}
} else if expectedMessage.ntfConn.user.showNotifications {
logger.debug("NotificationService processNtf: setting best attempt")
if ntf.notificationEvent != nil {
setBadgeCount()
}
let prevBestAttempt = expectedMessages[id]?.msgBestAttemptNtf
if prevBestAttempt?.callInvitation == nil || ntf.callInvitation != nil {
expectedMessages[id]?.msgBestAttemptNtf = ntf
} // otherwise keep call as best attempt
} else {
NSEThreads.shared.droppedNotifications.append((id, ntf))
if signalReady { entityReady(id) }
}
}
func entityReady(_ entityId: ChatId) {
if let t = threadId { logger.debug("NotificationService thread \(t, privacy: .private): entityReady: entity \(entityId, privacy: .private)") }
expectedMessages[entityId]?.ready = true
if let (tNext, nse) = NSEThreads.shared.activeThreads.first(where: { (_, nse) in nse.expectedMessages[entityId]?.startedProcessingNewMsgs == false }) {
if let t = threadId { logger.debug("NotificationService thread \(t, privacy: .private): entityReady: signal next thread \(tNext, privacy: .private) for entity \(entityId, privacy: .private)") }
nse.expectedMessages[entityId]?.semaphore.signal()
}
}
func setBadgeCount() {
badgeCount = ntfBadgeCountGroupDefault.get() + 1
ntfBadgeCountGroupDefault.set(badgeCount)
}
func setServiceBestAttemptNtf(_ ntf: UNMutableNotificationContent) {
logger.debug("NotificationService.setServiceBestAttemptNtf")
serviceBestAttemptNtf = .nse(ntf)
}
private func deliverBestAttemptNtf(urgent: Bool = false) {
logger.debug("NotificationService.deliverBestAttemptNtf urgent: \(urgent) expectingMoreMessages: \(self.expectingMoreMessages)")
if let handler = contentHandler, urgent || !expectingMoreMessages {
if urgent {
contentHandler = nil
}
logger.debug("NotificationService.deliverBestAttemptNtf")
// stop processing other messages
for (key, _) in expectedMessages {
expectedMessages[key]?.shouldProcessNtf = false
}
let suspend: Bool
if let t = threadId {
threadId = nil
suspend = NSEThreads.shared.endThread(t) && NSEThreads.shared.noThreads
} else {
suspend = false
}
deliverCallkitOrNotification(urgent: urgent, suspend: suspend, handler: handler)
}
}
private func deliverCallkitOrNotification(urgent: Bool, suspend: Bool = false, handler: @escaping (UNNotificationContent) -> Void) {
if useCallKit() && expectedMessages.contains(where: { $0.value.msgBestAttemptNtf?.callInvitation != nil }) {
logger.debug("NotificationService.deliverCallkitOrNotification: will suspend, callkit")
if urgent {
// suspending NSE even though there may be other notifications
// to allow the app to process callkit call
suspendChat(0)
deliverNotification(handler: handler)
} else {
// suspending NSE with delay and delivering after the suspension
// because pushkit notification must be processed without delay
// to avoid app termination
DispatchQueue.global().asyncAfter(deadline: .now() + fastNSESuspendSchedule.delay) {
suspendChat(fastNSESuspendSchedule.timeout)
DispatchQueue.global().asyncAfter(deadline: .now() + Double(fastNSESuspendSchedule.timeout)) {
self.deliverNotification(handler: handler)
}
}
}
} else {
if suspend {
logger.debug("NotificationService.deliverCallkitOrNotification: will suspend")
if urgent {
suspendChat(0)
} else {
// suspension is delayed to allow chat core finalise any processing
// (e.g., send delivery receipts)
DispatchQueue.global().asyncAfter(deadline: .now() + nseSuspendSchedule.delay) {
if NSEThreads.shared.noThreads {
suspendChat(nseSuspendSchedule.timeout)
}
}
}
}
deliverNotification(handler: handler)
}
}
private func deliverNotification(handler: @escaping (UNNotificationContent) -> Void) {
if serviceBestAttemptNtf != nil, let ntf = prepareNotification() {
contentHandler = nil
serviceBestAttemptNtf = nil
switch ntf {
case let .nse(content):
content.badge = badgeCount as NSNumber
handler(content)
case let .callkit(invitation):
logger.debug("NotificationService reportNewIncomingVoIPPushPayload for \(invitation.contact.id)")
CXProvider.reportNewIncomingVoIPPushPayload([
"displayName": invitation.contact.displayName,
"contactId": invitation.contact.id,
"callUUID": invitation.callUUID ?? "",
"media": invitation.callType.media.rawValue,
"callTs": invitation.callTs.timeIntervalSince1970
]) { error in
logger.debug("reportNewIncomingVoIPPushPayload result: \(error)")
handler(error == nil ? UNMutableNotificationContent() : createCallInvitationNtf(invitation, self.badgeCount))
}
case .empty:
handler(UNMutableNotificationContent()) // used to mute notifications that did not unsubscribe yet
}
}
}
private func prepareNotification() -> NSENotification? {
if expectedMessages.isEmpty {
return serviceBestAttemptNtf
} else if let callNtfKV = expectedMessages.first(where: { $0.value.msgBestAttemptNtf?.callInvitation != nil }),
let callInv = callNtfKV.value.msgBestAttemptNtf?.callInvitation,
let callNtf = callNtfKV.value.msgBestAttemptNtf {
return useCallKit() ? .callkit(callInv) : .nse(callNtf.notificationContent(badgeCount))
} else {
logger.debug("NotificationService prepareNotification \(String(describing: self.expectedMessages.map { $0.key }))")
let ntfEvents = expectedMessages.compactMap { $0.value.msgBestAttemptNtf?.notificationEvent }
logger.debug("NotificationService prepareNotification \(ntfEvents.count)")
if ntfEvents.isEmpty {
return .empty
} else if let ntfEvent = ntfEvents.count == 1 ? ntfEvents.first : nil {
return .nse(ntfEvent.notificationContent(badgeCount))
} else {
return .nse(createJointNtf(ntfEvents))
}
}
}
private func createJointNtf(_ ntfEvents: [NSENotificationData]) -> UNMutableNotificationContent {
let previewMode = ntfPreviewModeGroupDefault.get()
let newMsgsData: [(any UserLike, ChatInfo)] = ntfEvents.compactMap { $0.newMsgData }
if !newMsgsData.isEmpty, let userId = newMsgsData.first?.0.userId {
let newMsgsChats: [ChatInfo] = newMsgsData.map { $0.1 }
let uniqueChatsNames = uniqueNewMsgsChatsNames(newMsgsChats)
var body: String
if previewMode == .hidden {
body = String.localizedStringWithFormat(NSLocalizedString("New messages in %d chats", comment: "notification body"), uniqueChatsNames.count)
} else {
body = String.localizedStringWithFormat(NSLocalizedString("From: %@", comment: "notification body"), newMsgsChatsNamesStr(uniqueChatsNames))
}
return createNotification(
categoryIdentifier: ntfCategoryManyEvents,
title: NSLocalizedString("New messages", comment: "notification"),
body: body,
userInfo: ["userId": userId],
badgeCount: badgeCount
)
} else {
return createNotification(
categoryIdentifier: ntfCategoryManyEvents,
title: NSLocalizedString("New events", comment: "notification"),
body: String.localizedStringWithFormat(NSLocalizedString("%d new events", comment: "notification body"), ntfEvents.count),
badgeCount: badgeCount
)
}
}
private func uniqueNewMsgsChatsNames(_ newMsgsChats: [ChatInfo]) -> [String] {
var seenChatIds = Set<ChatId>()
var uniqueChatsNames: [String] = []
for chat in newMsgsChats {
if !seenChatIds.contains(chat.id) {
seenChatIds.insert(chat.id)
uniqueChatsNames.append(chat.chatViewName)
}
}
return uniqueChatsNames
}
private func newMsgsChatsNamesStr(_ names: [String]) -> String {
return switch names.count {
case 1: names[0]
case 2: "\(names[0]) and \(names[1])"
case 3: "\(names[0] + ", " + names[1]) and \(names[2])"
default:
names.count > 3
? "\(names[0]), \(names[1]) and \(names.count - 2) other chats"
: ""
}
}
}
// nseStateGroupDefault must not be used in NSE directly, only via this singleton
class NSEChatState {
static let shared = NSEChatState()
private var value_ = NSEState.created
var value: NSEState {
value_
}
func set(_ state: NSEState) {
nseStateGroupDefault.set(state)
sendNSEState(state)
value_ = state
}
init() {
// This is always set to .created state, as in case previous start of NSE crashed in .active state, it is stored correctly.
// Otherwise the app will be activating slower
set(.created)
}
}
var appSubscriber: AppSubscriber = appStateSubscriber { state in
logger.debug("NotificationService: appSubscriber")
if state.running && NSEChatState.shared.value.canSuspend {
logger.debug("NotificationService: appSubscriber app state \(state.rawValue), suspending")
suspendChat(fastNSESuspendSchedule.timeout)
}
}
func appStateSubscriber(onState: @escaping (AppState) -> Void) -> AppSubscriber {
appMessageSubscriber { msg in
if case let .state(state) = msg {
logger.debug("NotificationService: appStateSubscriber \(state.rawValue)")
onState(state)
}
}
}
let seSubscriber = seMessageSubscriber {
switch $0 {
case let .state(state):
if state == .sendingMessage && NSEChatState.shared.value.canSuspend {
logger.debug("NotificationService: seSubscriber app state \(state.rawValue), suspending")
suspendChat(fastNSESuspendSchedule.timeout)
}
}
}
var receiverStarted = false
let startLock = DispatchSemaphore(value: 1)
let suspendLock = DispatchSemaphore(value: 1)
var networkConfig: NetCfg = getNetCfg()
// startChat uses semaphore startLock to ensure that only one didReceive thread can start chat controller
// Subsequent calls to didReceive will be waiting on semaphore and won't start chat again, as it will be .active
func startChat() -> DBMigrationResult? {
logger.debug("NotificationService: startChat")
// only skip creating if there is chat controller
if case .active = NSEChatState.shared.value, hasChatCtrl() { return .ok }
startLock.wait()
defer { startLock.signal() }
if hasChatCtrl() {
return switch NSEChatState.shared.value {
case .created: doStartChat()
case .starting: .ok // it should never get to this branch, as it would be waiting for start on startLock
case .active: .ok
case .suspending: activateChat()
case .suspended: activateChat()
}
} else {
// Ignore state in preference if there is no chat controller.
// State in preference may have failed to update e.g. because of a crash.
NSEChatState.shared.set(.created)
return doStartChat()
}
}
func doStartChat() -> DBMigrationResult? {
logger.debug("NotificationService: doStartChat")
haskell_init_nse()
let (_, dbStatus) = chatMigrateInit(confirmMigrations: defaultMigrationConfirmation(), backgroundMode: true)
logger.debug("NotificationService: doStartChat \(String(describing: dbStatus))")
if dbStatus != .ok {
resetChatCtrl()
NSEChatState.shared.set(.created)
return dbStatus
}
let state = NSEChatState.shared.value
NSEChatState.shared.set(.starting)
if let user = apiGetActiveUser() {
logger.debug("NotificationService active user \(user.displayName)")
do {
try setNetworkConfig(networkConfig)
try apiSetAppFilePaths(filesFolder: getAppFilesDirectory().path, tempFolder: getTempFilesDirectory().path, assetsFolder: getWallpaperDirectory().deletingLastPathComponent().path)
try apiSetEncryptLocalFiles(privacyEncryptLocalFilesGroupDefault.get())
// prevent suspension while starting chat
suspendLock.wait()
defer { suspendLock.signal() }
if NSEChatState.shared.value == .starting {
updateNetCfg()
let justStarted = try apiStartChat()
NSEChatState.shared.set(.active)
if justStarted {
chatLastStartGroupDefault.set(Date.now)
Task {
if !receiverStarted {
receiverStarted = true
await receiveMessages()
}
}
}
return .ok
}
} catch {
logger.error("NotificationService startChat error: \(responseError(error))")
}
} else {
logger.debug("NotificationService: no active user")
}
if NSEChatState.shared.value == .starting { NSEChatState.shared.set(state) }
return nil
}
func activateChat() -> DBMigrationResult? {
logger.debug("NotificationService: activateChat")
let state = NSEChatState.shared.value
NSEChatState.shared.set(.active)
if apiActivateChat() {
logger.debug("NotificationService: activateChat: after apiActivateChat")
return .ok
} else {
NSEChatState.shared.set(state)
return nil
}
}
// suspendChat uses semaphore suspendLock to ensure that only one suspension can happen.
func suspendChat(_ timeout: Int) {
logger.debug("NotificationService: suspendChat")
let state = NSEChatState.shared.value
if !state.canSuspend {
logger.error("NotificationService suspendChat called, current state: \(state.rawValue)")
} else if hasChatCtrl() {
// only suspend if we have chat controller to avoid crashes when suspension is
// attempted when chat controller was not created
suspendLock.wait()
defer { suspendLock.signal() }
NSEChatState.shared.set(.suspending)
if apiSuspendChat(timeoutMicroseconds: timeout * 1000000) {
logger.debug("NotificationService: suspendChat: after apiSuspendChat")
DispatchQueue.global().asyncAfter(deadline: .now() + Double(timeout) + 1, execute: chatSuspended)
} else {
NSEChatState.shared.set(state)
}
}
}
func chatSuspended() {
logger.debug("NotificationService chatSuspended")
if case .suspending = NSEChatState.shared.value {
NSEChatState.shared.set(.suspended)
chatCloseStore()
logger.debug("NotificationService chatSuspended: suspended")
}
}
// A single loop is used per Notification service extension process to receive and process all messages depending on the NSE state
// If the extension is not active yet, or suspended/suspending, or the app is running, the notifications will not be received.
func receiveMessages() async {
logger.debug("NotificationService receiveMessages")
while true {
switch NSEChatState.shared.value {
// it should never get to "created" and "starting" branches, as NSE state is set to .active before the loop start
case .created: await delayWhenInactive()
case .starting: await delayWhenInactive()
case .active: await receiveMsg()
case .suspending: await receiveMsg()
case .suspended: await delayWhenInactive()
}
}
func receiveMsg() async {
if let msg = await chatRecvMsg() {
logger.debug("NotificationService receiveMsg: message")
if let (id, ntf) = await receivedMsgNtf(msg) {
logger.debug("NotificationService receiveMsg: notification")
await NSEThreads.shared.processNotification(id, ntf)
}
}
}
func delayWhenInactive() async {
logger.debug("NotificationService delayWhenInactive")
_ = try? await Task.sleep(nanoseconds: 1000_000000)
}
}
func chatRecvMsg() async -> NSEChatEvent? {
await withCheckedContinuation { cont in
let resp: NSEChatEvent? = recvSimpleXMsg()
cont.resume(returning: resp)
}
}
private let isInChina = SKStorefront().countryCode == "CHN"
private func useCallKit() -> Bool { !isInChina && callKitEnabledGroupDefault.get() }
func receivedMsgNtf(_ res: NSEChatEvent) async -> (String, NSENotificationData)? {
logger.debug("NotificationService receivedMsgNtf: \(res.eventType)")
switch res {
case let .contactConnected(user, contact, _):
return (contact.id, .contactConnected(user, contact))
// case let .contactConnecting(contact):
// TODO profile update
case let .receivedContactRequest(user, contactRequest):
return (UserContact(contactRequest: contactRequest).id, .contactRequest(user, contactRequest))
case let .newChatItems(user, chatItems):
// Received items are created one at a time
if let chatItem = chatItems.first {
let cInfo = chatItem.chatInfo
var cItem = chatItem.chatItem
if let file = cItem.autoReceiveFile() {
cItem = autoReceiveFile(file) ?? cItem
}
let ntf: NSENotificationData = (cInfo.ntfsEnabled(chatItem: cItem) && cItem.showNotification) ? .messageReceived(user, cInfo, cItem) : .noNtf
let chatIdOrMemberId = if case let .groupRcv(groupMember) = chatItem.chatItem.chatDir {
groupMember.id
} else {
chatItem.chatInfo.id
}
return (chatIdOrMemberId, ntf)
} else {
return nil
}
case let .rcvFileSndCancelled(_, aChatItem, _):
cleanupFile(aChatItem)
return nil
case let .sndFileComplete(_, aChatItem, _):
cleanupDirectFile(aChatItem)
return nil
case let .sndFileRcvCancelled(_, aChatItem, _):
if let aChatItem = aChatItem {
cleanupDirectFile(aChatItem)
}
return nil
case let .callInvitation(invitation):
// Do not post it without CallKit support, iOS will stop launching the app without showing CallKit
return (invitation.contact.id, .callInvitation(invitation))
case let .ntfMessage(_, connEntity, ntfMessage):
return if let id = connEntity.id { (id, .msgInfo(ntfMessage)) } else { nil }
case .chatSuspended:
chatSuspended()
return nil
case let .chatError(_, err):
logger.error("NotificationService receivedMsgNtf error: \(String(describing: err))")
return nil
default:
logger.debug("NotificationService receivedMsgNtf ignored event: \(res.eventType)")
return nil
}
}
func updateNetCfg() {
let newNetConfig = getNetCfg()
if newNetConfig != networkConfig {
logger.debug("NotificationService applying changed network config")
do {
try setNetworkConfig(networkConfig)
networkConfig = newNetConfig
} catch {
logger.error("NotificationService apply changed network config error: \(responseError(error))")
}
}
}
func apiGetActiveUser() -> User? {
let r: NSEChatResponse = sendSimpleXCmd(NSEChatCommand.showActiveUser)
logger.debug("apiGetActiveUser sendSimpleXCmd response: \(r.responseType)")
switch r {
case let .activeUser(user): return user
case .chatCmdError(_, .error(.noActiveUser)):
logger.debug("apiGetActiveUser sendSimpleXCmd no active user")
return nil
case let .chatCmdError(_, err):
logger.debug("apiGetActiveUser sendSimpleXCmd error: \(String(describing: err))")
return nil
default:
logger.error("NotificationService apiGetActiveUser unexpected response: \(String(describing: r))")
return nil
}
}
func apiStartChat() throws -> Bool {
let r: NSEChatResponse = sendSimpleXCmd(NSEChatCommand.startChat(mainApp: false, enableSndFiles: false))
switch r {
case .chatStarted: return true
case .chatRunning: return false
default: throw r
}
}
func apiActivateChat() -> Bool {
chatReopenStore()
let r: NSEChatResponse = sendSimpleXCmd(NSEChatCommand.apiActivateChat(restoreChat: false))
if case .cmdOk = r { return true }
logger.error("NotificationService apiActivateChat error: \(String(describing: r))")
return false
}
func apiSuspendChat(timeoutMicroseconds: Int) -> Bool {
let r: NSEChatResponse = sendSimpleXCmd(NSEChatCommand.apiSuspendChat(timeoutMicroseconds: timeoutMicroseconds))
if case .cmdOk = r { return true }
logger.error("NotificationService apiSuspendChat error: \(String(describing: r))")
return false
}
func apiSetAppFilePaths(filesFolder: String, tempFolder: String, assetsFolder: String) throws {
let r: NSEChatResponse = sendSimpleXCmd(NSEChatCommand.apiSetAppFilePaths(filesFolder: filesFolder, tempFolder: tempFolder, assetsFolder: assetsFolder))
if case .cmdOk = r { return }
throw r
}
func apiSetEncryptLocalFiles(_ enable: Bool) throws {
let r: NSEChatResponse = sendSimpleXCmd(NSEChatCommand.apiSetEncryptLocalFiles(enable: enable))
if case .cmdOk = r { return }
throw r
}
func apiGetNtfConns(nonce: String, encNtfInfo: String) -> [NtfConn]? {
guard apiGetActiveUser() != nil else {
logger.debug("no active user")
return nil
}
let r: NSEChatResponse = sendSimpleXCmd(NSEChatCommand.apiGetNtfConns(nonce: nonce, encNtfInfo: encNtfInfo))
if case let .ntfConns(ntfConns) = r {
logger.debug("apiGetNtfConns response ntfConns: \(ntfConns.count)")
return ntfConns
} else if case let .chatCmdError(_, error) = r {
logger.debug("apiGetNtfMessage error response: \(String.init(describing: error))")
} else {
logger.debug("apiGetNtfMessage ignored response: \(r.responseType) \(String.init(describing: r))")
}
return nil
}
func apiGetConnNtfMessages(connMsgReqs: [ConnMsgReq]) -> [NtfMsgInfo?]? {
guard apiGetActiveUser() != nil else {
logger.debug("no active user")
return nil
}
logger.debug("apiGetConnNtfMessages command: \(NSEChatCommand.apiGetConnNtfMessages(connMsgReqs: connMsgReqs).cmdString)")
let r: NSEChatResponse = sendSimpleXCmd(NSEChatCommand.apiGetConnNtfMessages(connMsgReqs: connMsgReqs))
if case let .connNtfMessages(receivedMsgs) = r {
logger.debug("apiGetConnNtfMessages response receivedMsgs: total \(receivedMsgs.count), expecting messages \(receivedMsgs.count { $0 != nil })")
return receivedMsgs
}
logger.debug("apiGetConnNtfMessages error: \(responseError(r))")
return nil
}
func getConnNtfMessage(connMsgReq: ConnMsgReq) -> NtfMsgInfo? {
let r_ = apiGetConnNtfMessages(connMsgReqs: [connMsgReq])
if let r = r_, let receivedMsg = r.count == 1 ? r.first : nil {
return receivedMsg
}
return nil
}
func apiReceiveFile(fileId: Int64, encrypted: Bool, inline: Bool? = nil) -> AChatItem? {
let userApprovedRelays = !privacyAskToApproveRelaysGroupDefault.get()
let r: NSEChatResponse = sendSimpleXCmd(NSEChatCommand.receiveFile(fileId: fileId, userApprovedRelays: userApprovedRelays, encrypted: encrypted, inline: inline))
if case let .rcvFileAccepted(_, chatItem) = r { return chatItem }
logger.error("receiveFile error: \(responseError(r))")
return nil
}
func apiSetFileToReceive(fileId: Int64, encrypted: Bool) {
let userApprovedRelays = !privacyAskToApproveRelaysGroupDefault.get()
let r: NSEChatResponse = sendSimpleXCmd(NSEChatCommand.setFileToReceive(fileId: fileId, userApprovedRelays: userApprovedRelays, encrypted: encrypted))
if case .cmdOk = r { return }
logger.error("setFileToReceive error: \(responseError(r))")
}
func autoReceiveFile(_ file: CIFile) -> ChatItem? {
let encrypted = privacyEncryptLocalFilesGroupDefault.get()
switch file.fileProtocol {
case .smp:
return apiReceiveFile(fileId: file.fileId, encrypted: encrypted)?.chatItem
case .xftp:
apiSetFileToReceive(fileId: file.fileId, encrypted: encrypted)
return nil
case .local:
return nil
}
}
func setNetworkConfig(_ cfg: NetCfg) throws {
let r: NSEChatResponse = sendSimpleXCmd(NSEChatCommand.apiSetNetworkConfig(networkConfig: cfg))
if case .cmdOk = r { return }
throw r
}
func defaultBestAttemptNtf(_ ntfConn: NtfConn) -> NSENotificationData {
let user = ntfConn.user
let connEntity = ntfConn.connEntity
return if !user.showNotifications {
.noNtf
} else {
switch ntfConn.connEntity {
case let .rcvDirectMsgConnection(_, contact):
contact?.chatSettings.enableNtfs == .all
? .connectionEvent(user, connEntity)
: .noNtf
case let .rcvGroupMsgConnection(_, groupInfo, _):
groupInfo.chatSettings.enableNtfs == .all
? .connectionEvent(user, connEntity)
: .noNtf
case .sndFileConnection: .noNtf
case .rcvFileConnection: .noNtf
case let .userContactConnection(_, userContact):
userContact.groupId == nil
? .connectionEvent(user, connEntity)
: .noNtf
}
}
}