core: use async agent commands when establishing connections w/t user action (#977)

* wip

* wip

* wip

* schema

* schema

* wip

* wip

* rework

* revert

* update simplexmq

* async commands

* corr id wip

* wip

* update simplexmq

* corr id

* wip

* rename variable

* wip

* refactor

* ACK continuation

* wip

* fix queries

* fix queries

* clean up schema

* update simplexmq, do not lock on stopping chat

* clean up

* refactor

* refactor

Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
JRoberts 2022-09-14 19:45:21 +04:00 committed by GitHub
parent 5089dfdada
commit c07d4a5e4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 549 additions and 116 deletions

View file

@ -5,7 +5,7 @@ constraints: zip +disable-bzip2 +disable-zstd
source-repository-package
type: git
location: https://github.com/simplex-chat/simplexmq.git
tag: f2c1455a2755e1275983dc154321fc0a5c0d7b17
tag: e328ae5d060645a8ef090b1b3d88bc20a5902e45
source-repository-package
type: git

View file

@ -0,0 +1,57 @@
# Group connections recovery after asynchronous commands
## Problem
Similar to direct chat connections, group connections can fail to be established on failing IO, for example on bad network. When this happens agent throws error. For direct connections it is viable to propagate the error to UI to indicate failure, so the user can retry. For group connections this becomes unviable as multiple connections are being established automatically on each new joining member (2 for each existing member except host), and the user can't retry.
## Proposal 2
- Have separate command processing queue in agent
- Chat creates correlation id for command
- Chat has data structure to represent possible continuations
- Chat saves correlation id and continuation to database
- Agent asynchronously responds after command completion with correlation id
- Chat restores continuation by correlation id and processes it
## Proposal
- Add special versions of commands `createConnection`, `joinConnection` (`allowConnection` as well?), to which agent responds synchronously with connection id and status;
- Add new connection type `NewConnection` / `NoQueueConnection`;
- Add agent responses signalling these commands' asynchronous completion, e.g. `CREATE_SUCCESS`, `JOIN_SUCCESS` (?);
- Return connection status on subscriptions;
In chat:
- Event-driven processing - for group connections use commands with synchronous responses, save intermediate connection state, process success responses;
- On subscription check whether connection's status has changed, if yes, run respective continuation - same as on success event.
### Commands use
`joinConnection` in group connections is used:
- #779 : when joining group - `APIJoinGroup`;
- #2147 : on receiving `XGrpMemFwd` message for both group and direct connections; -- from inviting member to existing members
`allowConnection`:
- #1419 : on receiving `XGrpMemInfo` in `CONF` inside direct connection; -- from existing member to invitee after XGrpMemFwd
- #1486 : on receiving `XGrpAcpt` in `CONF` inside group connection; -- from invitee to host
- #1495 : on receiving `XGrpMemInfo` in `CONF` inside group connection; -- from existing member to invitee after XGrpMemFwd
`createConnection`:
- #756 : when inviting new member - `APIAddMember`, probably doesn't have to be recovered - error can be signalled;
- #2112 : on receiving `XGrpMemIntro` message for both group and direct connections for each introduced member; -- from host to invitee
## Misc
Chat.hs #1419 - why is XOk sent instead of XGrpMemInfo?

View file

@ -1,5 +1,5 @@
{
"https://github.com/simplex-chat/simplexmq.git"."f2c1455a2755e1275983dc154321fc0a5c0d7b17" = "10l74d751jmgsr0ifyprglsvqdpcir86qs1vkwc4dn4n4q503p5q";
"https://github.com/simplex-chat/simplexmq.git"."e328ae5d060645a8ef090b1b3d88bc20a5902e45" = "0y9k1v2ss7f68md01azhh55b7xbk654hcpmyjkxkazy261nn9rxg";
"https://github.com/simplex-chat/aeson.git"."3eb66f9a68f103b5f1489382aad89f5712a64db7" = "0kilkx59fl6c3qy3kjczqvm8c3f4n3p0bdk9biyflf51ljnzp4yp";
"https://github.com/simplex-chat/haskell-terminal.git"."f708b00009b54890172068f168bf98508ffcd495" = "0zmq7lmfsk8m340g47g5963yba7i88n4afa6z93sg9px5jv1mijj";
"https://github.com/zw3rk/android-support.git"."3c3a5ab0b8b137a072c98d3d0937cbdc96918ddb" = "1r6jyxbim3dsvrmakqfyxbd6ms6miaghpbwyl0sr6dzwpgaprz97";

View file

@ -48,6 +48,7 @@ library
Simplex.Chat.Migrations.M20220822_groups_host_conn_custom_user_profile_id
Simplex.Chat.Migrations.M20220823_delete_broken_group_event_chat_items
Simplex.Chat.Migrations.M20220824_profiles_local_alias
Simplex.Chat.Migrations.M20220909_commands
Simplex.Chat.Mobile
Simplex.Chat.Options
Simplex.Chat.ProfileGenerator

View file

@ -261,6 +261,7 @@ processChatCommand = \case
setupSndFileTransfer :: Contact -> m (Maybe (FileInvitation, CIFile 'MDSnd))
setupSndFileTransfer ct = forM file_ $ \file -> do
(fileSize, chSize) <- checkSndFile file
-- [async agent commands] keep command synchronous, but process error
(agentConnId, fileConnReq) <- withAgent $ \a -> createConnection a True SCMInvitation
let fileName = takeFileName file
fileInvitation = FileInvitation {fileName, fileSize, fileConnReq = Just fileConnReq}
@ -429,7 +430,7 @@ processChatCommand = \case
withFilesFolder $ \filesFolder -> deleteFile filesFolder fileInfo
withAgent $ \a -> forM_ conns $ \conn ->
deleteConnection a (aConnId conn) `catchError` \(_ :: AgentErrorType) -> pure ()
-- two functions below are called in separate transactions to prevent crashes on android
-- functions below are called in separate transactions to prevent crashes on android
-- (possibly, race condition on integrity check?)
withStore' $ \db -> deleteContactConnectionsAndFiles db userId ct
withStore' $ \db -> deleteContact db userId ct
@ -449,7 +450,7 @@ processChatCommand = \case
withChatLock . procCmd $ do
when (memberActive membership) . void $ sendGroupMessage gInfo members XGrpDel
mapM_ deleteMemberConnection members
-- two functions below are called in separate transactions to prevent crashes on android
-- functions below are called in separate transactions to prevent crashes on android
-- (possibly, race condition on integrity check?)
withStore' $ \db -> deleteGroupConnectionsAndFiles db user gInfo members
withStore' $ \db -> deleteGroupItemsAndMembers db user gInfo
@ -754,6 +755,7 @@ processChatCommand = \case
case contactMember contact members of
Nothing -> do
gVar <- asks idsDrg
-- [async agent commands] keep command synchronous, but process error
(agentConnId, cReq) <- withAgent $ \a -> createConnection a True SCMInvitation
member <- withStore $ \db -> createNewContactMember db gVar user groupId contact memRole agentConnId cReq
sendInvitation member cReq
@ -766,6 +768,7 @@ processChatCommand = \case
APIJoinGroup groupId -> withUser $ \user@User {userId} -> do
ReceivedGroupInvitation {fromMember, connRequest, groupInfo = g@GroupInfo {membership}} <- withStore $ \db -> getGroupInvitation db user groupId
withChatLock . procCmd $ do
-- [async agent commands] keep command synchronous, but process error
agentConnId <- withAgent $ \a -> joinConnection a True connRequest . directMessage $ XGrpAcpt (memberId (membership :: GroupMember))
withStore' $ \db -> do
createMemberConnection db userId fromMember agentConnId
@ -1114,6 +1117,7 @@ acceptFileReceive user@User {userId} RcvFileTransfer {fileId, fileInvitation = F
case fileConnReq of
-- direct file protocol
Just connReq ->
-- [async agent commands] keep command synchronous, but process error
tryError (withAgent $ \a -> joinConnection a True connReq . directMessage $ XFileAcpt fName) >>= \case
Right agentConnId -> do
filePath <- getRcvFilePath filePath_ fName
@ -1128,6 +1132,7 @@ acceptFileReceive user@User {userId} RcvFileTransfer {fileId, fileInvitation = F
case activeConn of
Just conn -> do
sharedMsgId <- withStore $ \db -> getSharedMsgIdByFileId db userId fileId
-- [async agent commands] keep command synchronous, but process error
(agentConnId, fileInvConnReq) <- withAgent $ \a -> createConnection a True SCMInvitation
filePath <- getRcvFilePath filePath_ fName
ci <- withStore $ \db -> acceptRcvFileTransfer db user fileId agentConnId ConnNew filePath
@ -1187,10 +1192,10 @@ agentSubscriber = do
q <- asks $ subQ . smpAgent
l <- asks chatLock
forever $ do
(_, connId, msg) <- atomically $ readTBQueue q
(corrId, connId, msg) <- atomically $ readTBQueue q
u <- readTVarIO =<< asks currentUser
withLock l . void . runExceptT $
processAgentMessage u connId msg `catchError` (toView . CRChatError)
processAgentMessage u corrId connId msg `catchError` (toView . CRChatError)
type AgentBatchSubscribe m = AgentClient -> [ConnId] -> ExceptT AgentErrorType m (Map ConnId (Either AgentErrorType ()))
@ -1307,9 +1312,9 @@ subscribeUserConnections agentBatchSubscribe user = do
Just _ -> Nothing
_ -> Just . ChatError . CEAgentNoSubResult $ AgentConnId connId
processAgentMessage :: forall m. ChatMonad m => Maybe User -> ConnId -> ACommand 'Agent -> m ()
processAgentMessage Nothing _ _ = throwChatError CENoActiveUser
processAgentMessage (Just User {userId}) "" agentMessage = case agentMessage of
processAgentMessage :: forall m. ChatMonad m => Maybe User -> ConnId -> ACorrId -> ACommand 'Agent -> m ()
processAgentMessage Nothing _ _ _ = throwChatError CENoActiveUser
processAgentMessage (Just User {userId}) _ "" agentMessage = case agentMessage of
CONNECT p h -> hostEvent $ CRHostConnected p h
DISCONNECT p h -> hostEvent $ CRHostDisconnected p h
DOWN srv conns -> serverEvent srv conns CRContactsDisconnected "disconnected"
@ -1322,7 +1327,7 @@ processAgentMessage (Just User {userId}) "" agentMessage = case agentMessage of
cs <- withStore' $ \db -> getConnectionsContacts db userId conns
toView $ event srv cs
showToast ("server " <> str) (safeDecodeUtf8 $ strEncode host)
processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage =
processAgentMessage (Just user@User {userId, profile}) corrId agentConnId agentMessage =
(withStore (\db -> getConnectionEntity db user $ AgentConnId agentConnId) >>= updateConnStatus) >>= \case
RcvDirectMsgConnection conn contact_ ->
processDirectMessage agentMessage conn contact_
@ -1362,24 +1367,41 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
incognitoProfile <- forM customUserProfileId $ \profileId -> withStore (\db -> getProfileById db userId profileId)
let profileToSend = fromLocalProfile $ fromMaybe profile incognitoProfile
saveConnInfo conn connInfo
allowAgentConnection conn confId $ XInfo profileToSend
-- [async agent commands] no continuation needed, but command should be asynchronous for stability
allowAgentConnectionAsync user conn confId $ XInfo profileToSend
INFO connInfo ->
saveConnInfo conn connInfo
MSG meta _msgFlags msgBody -> do
_ <- saveRcvMSG conn (ConnectionId connId) meta msgBody
withAckMessage agentConnId meta $ pure ()
ackMsgDeliveryEvent conn meta
cmdId <- createAckCmd conn
_ <- saveRcvMSG conn (ConnectionId connId) meta msgBody cmdId
withAckMessage agentConnId cmdId meta $ pure ()
SENT msgId ->
-- ? updateDirectChatItemStatus
sentMsgDeliveryEvent conn msgId
OK ->
-- [async agent commands] continuation on receiving OK
withCompletedCommand conn agentMsg $ \CommandData {cmdFunction, cmdId} ->
when (cmdFunction == CFAckMessage) $ ackMsgDeliveryEvent conn cmdId
MERR _ err -> toView . CRChatError $ ChatErrorAgent err -- ? updateDirectChatItemStatus
ERR err -> toView . CRChatError $ ChatErrorAgent err
-- TODO add debugging output
_ -> pure ()
Just ct@Contact {localDisplayName = c, contactId} -> case agentMsg of
INV (ACR _ cReq) ->
-- [async agent commands] XGrpMemIntro continuation on receiving INV
withCompletedCommand conn agentMsg $ \_ ->
case cReq of
directConnReq@(CRInvitationUri _ _) -> do
contData <- withStore' $ \db -> do
setConnConnReqInv db user connId cReq
getXGrpMemIntroContDirect db user ct
forM_ contData $ \(hostConnId, xGrpMemIntroCont) ->
sendXGrpMemIntro hostConnId directConnReq xGrpMemIntroCont
CRContactUri _ -> throwChatError $ CECommandError "unexpected ConnectionRequestUri type"
MSG msgMeta _msgFlags msgBody -> do
msg@RcvMessage {chatMsgEvent} <- saveRcvMSG conn (ConnectionId connId) msgMeta msgBody
withAckMessage agentConnId msgMeta $
cmdId <- createAckCmd conn
msg@RcvMessage {chatMsgEvent} <- saveRcvMSG conn (ConnectionId connId) msgMeta msgBody cmdId
withAckMessage agentConnId cmdId msgMeta $
case chatMsgEvent of
XMsgNew mc -> newContentMessage ct mc msg msgMeta
XMsgUpdate sharedMsgId mContent -> messageUpdate ct sharedMsgId mContent msg msgMeta
@ -1398,7 +1420,6 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
XCallExtra callId extraInfo -> xCallExtra ct callId extraInfo msg msgMeta
XCallEnd callId -> xCallEnd ct callId msg msgMeta
_ -> pure ()
ackMsgDeliveryEvent conn msgMeta
CONF confId _ connInfo -> do
-- confirming direct connection with a member
ChatMessage {chatMsgEvent} <- liftEither $ parseChatMessage connInfo
@ -1406,7 +1427,8 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
XGrpMemInfo _memId _memProfile -> do
-- TODO check member ID
-- TODO update member profile
allowAgentConnection conn confId XOk
-- [async agent commands] no continuation needed, but command should be asynchronous for stability
allowAgentConnectionAsync user conn confId XOk
_ -> messageError "CONF from member must have x.grp.mem.info"
INFO connInfo -> do
ChatMessage {chatMsgEvent} <- liftEither $ parseChatMessage connInfo
@ -1447,6 +1469,10 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
chatItem <- withStore $ \db -> updateDirectChatItemStatus db userId contactId (chatItemId' ci) CISSndSent
toView $ CRChatItemStatusUpdated (AChatItem SCTDirect SMDSnd (DirectChat ct) chatItem)
_ -> pure ()
OK ->
-- [async agent commands] continuation on receiving OK
withCompletedCommand conn agentMsg $ \CommandData {cmdFunction, cmdId} ->
when (cmdFunction == CFAckMessage) $ ackMsgDeliveryEvent conn cmdId
END -> do
toView $ CRContactAnotherClient ct
showToast (c <> "> ") "connected to another client"
@ -1462,7 +1488,19 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
_ -> pure ()
processGroupMessage :: ACommand 'Agent -> Connection -> GroupInfo -> GroupMember -> m ()
processGroupMessage agentMsg conn gInfo@GroupInfo {groupId, localDisplayName = gName, membership, chatSettings} m = case agentMsg of
processGroupMessage agentMsg conn@Connection {connId} gInfo@GroupInfo {groupId, localDisplayName = gName, membership, chatSettings} m = case agentMsg of
INV (ACR _ cReq) ->
-- [async agent commands] XGrpMemIntro continuation on receiving INV
withCompletedCommand conn agentMsg $ \_ ->
case cReq of
groupConnReq@(CRInvitationUri _ _) -> do
contData <- withStore' $ \db -> do
setConnConnReqInv db user connId cReq
getXGrpMemIntroContGroup db user m
forM_ contData $ \(hostConnId, directConnReq) -> do
let GroupMember {groupMemberId, memberId} = m
sendXGrpMemIntro hostConnId directConnReq XGrpMemIntroCont {groupId, groupMemberId, memberId, groupConnReq}
CRContactUri _ -> throwChatError $ CECommandError "unexpected ConnectionRequestUri type"
CONF confId _ connInfo -> do
ChatMessage {chatMsgEvent} <- liftEither $ parseChatMessage connInfo
case memberCategory m of
@ -1471,7 +1509,8 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
XGrpAcpt memId
| sameMemberId memId m -> do
withStore $ \db -> liftIO $ updateGroupMemberStatus db userId m GSMemAccepted
allowAgentConnection conn confId XOk
-- [async agent commands] no continuation needed, but command should be asynchronous for stability
allowAgentConnectionAsync user conn confId XOk
| otherwise -> messageError "x.grp.acpt: memberId is different from expected"
_ -> messageError "CONF from invited member must have x.grp.acpt"
_ ->
@ -1479,7 +1518,8 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
XGrpMemInfo memId _memProfile
| sameMemberId memId m -> do
-- TODO update member profile
allowAgentConnection conn confId $ XGrpMemInfo (memberId (membership :: GroupMember)) (fromLocalProfile $ memberProfile membership)
-- [async agent commands] no continuation needed, but command should be asynchronous for stability
allowAgentConnectionAsync user conn confId $ XGrpMemInfo (memberId (membership :: GroupMember)) (fromLocalProfile $ memberProfile membership)
| otherwise -> messageError "x.grp.mem.info: memberId is different from expected"
_ -> messageError "CONF from member must have x.grp.mem.info"
INFO connInfo -> do
@ -1530,8 +1570,9 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
let connectedIncognito = contactConnIncognito ct || memberIncognito membership
when (memberCategory m == GCPreMember) $ probeMatchingContacts ct connectedIncognito
MSG msgMeta _msgFlags msgBody -> do
msg@RcvMessage {chatMsgEvent} <- saveRcvMSG conn (GroupId groupId) msgMeta msgBody
withAckMessage agentConnId msgMeta $
cmdId <- createAckCmd conn
msg@RcvMessage {chatMsgEvent} <- saveRcvMSG conn (GroupId groupId) msgMeta msgBody cmdId
withAckMessage agentConnId cmdId msgMeta $
case chatMsgEvent of
XMsgNew mc -> newGroupContentMessage gInfo m mc msg msgMeta
XMsgUpdate sharedMsgId mContent -> groupMessageUpdate gInfo m sharedMsgId mContent msg
@ -1541,7 +1582,7 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
XFileCancel sharedMsgId -> xFileCancelGroup gInfo m sharedMsgId msgMeta
XFileAcptInv sharedMsgId fileConnReq fName -> xFileAcptInvGroup gInfo m sharedMsgId fileConnReq fName msgMeta
XGrpMemNew memInfo -> xGrpMemNew gInfo m memInfo msg msgMeta
XGrpMemIntro memInfo -> xGrpMemIntro conn gInfo m memInfo
XGrpMemIntro memInfo -> xGrpMemIntro gInfo m memInfo
XGrpMemInv memId introInv -> xGrpMemInv gInfo m memId introInv
XGrpMemFwd memInfo introInv -> xGrpMemFwd gInfo m memInfo introInv
XGrpMemDel memId -> xGrpMemDel gInfo m memId msg msgMeta
@ -1549,9 +1590,12 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
XGrpDel -> xGrpDel gInfo m msg msgMeta
XGrpInfo p' -> xGrpInfo gInfo m p' msg msgMeta
_ -> messageError $ "unsupported message: " <> T.pack (show chatMsgEvent)
ackMsgDeliveryEvent conn msgMeta
SENT msgId ->
sentMsgDeliveryEvent conn msgId
OK ->
-- [async agent commands] continuation on receiving OK
withCompletedCommand conn agentMsg $ \CommandData {cmdFunction, cmdId} ->
when (cmdFunction == CFAckMessage) $ ackMsgDeliveryEvent conn cmdId
MERR _ err -> toView . CRChatError $ ChatErrorAgent err
ERR err -> toView . CRChatError $ ChatErrorAgent err
-- TODO add debugging output
@ -1569,7 +1613,8 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
XFileAcpt name
| name == fileName -> do
withStore' $ \db -> updateSndFileStatus db ft FSAccepted
allowAgentConnection conn confId XOk
-- [async agent commands] no continuation needed, but command should be asynchronous for stability
allowAgentConnectionAsync user conn confId XOk
| otherwise -> messageError "x.file.acpt: fileName is different from expected"
_ -> messageError "CONF from file connection must have x.file.acpt"
CON -> do
@ -1588,8 +1633,12 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
ci <- withStore $ \db -> getChatItemByFileId db user fileId
toView $ CRSndFileRcvCancelled ci ft
_ -> throwChatError $ CEFileSend fileId err
MSG meta _ _ ->
withAckMessage agentConnId meta $ pure ()
MSG meta _ _ -> do
cmdId <- createAckCmd conn
withAckMessage agentConnId cmdId meta $ pure ()
OK ->
-- [async agent commands] continuation on receiving OK
withCompletedCommand conn agentMsg $ \_cmdData -> pure ()
ERR err -> toView . CRChatError $ ChatErrorAgent err
-- TODO add debugging output
_ -> pure ()
@ -1603,7 +1652,7 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
CONF confId _ connInfo -> do
ChatMessage {chatMsgEvent} <- liftEither $ parseChatMessage connInfo
case chatMsgEvent of
XOk -> allowAgentConnection conn confId XOk
XOk -> allowAgentConnectionAsync user conn confId XOk -- [async agent commands] no continuation needed, but command should be asynchronous for stability
_ -> pure ()
CON -> do
ci <- withStore $ \db -> do
@ -1611,39 +1660,44 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
liftIO $ updateCIFileStatus db user fileId CIFSRcvTransfer
getChatItemByFileId db user fileId
toView $ CRRcvFileStart ci
MSG meta@MsgMeta {recipient = (msgId, _), integrity} _ msgBody -> withAckMessage agentConnId meta $ do
parseFileChunk msgBody >>= \case
FileChunkCancel ->
unless cancelled $ do
cancelRcvFileTransfer user ft
toView (CRRcvFileSndCancelled ft)
FileChunk {chunkNo, chunkBytes = chunk} -> do
case integrity of
MsgOk -> pure ()
MsgError MsgDuplicate -> pure () -- TODO remove once agent removes duplicates
MsgError e ->
badRcvFileChunk ft $ "invalid file chunk number " <> show chunkNo <> ": " <> show e
withStore' (\db -> createRcvFileChunk db ft chunkNo msgId) >>= \case
RcvChunkOk ->
if B.length chunk /= fromInteger chunkSize
then badRcvFileChunk ft "incorrect chunk size"
else appendFileChunk ft chunkNo chunk
RcvChunkFinal ->
if B.length chunk > fromInteger chunkSize
then badRcvFileChunk ft "incorrect chunk size"
else do
appendFileChunk ft chunkNo chunk
ci <- withStore $ \db -> do
liftIO $ do
updateRcvFileStatus db ft FSComplete
updateCIFileStatus db user fileId CIFSRcvComplete
deleteRcvFileChunks db ft
getChatItemByFileId db user fileId
toView $ CRRcvFileComplete ci
closeFileHandle fileId rcvFiles
withAgent (`deleteConnection` agentConnId)
RcvChunkDuplicate -> pure ()
RcvChunkError -> badRcvFileChunk ft $ "incorrect chunk number " <> show chunkNo
MSG meta@MsgMeta {recipient = (msgId, _), integrity} _ msgBody -> do
cmdId <- createAckCmd conn
withAckMessage agentConnId cmdId meta $ do
parseFileChunk msgBody >>= \case
FileChunkCancel ->
unless cancelled $ do
cancelRcvFileTransfer user ft
toView (CRRcvFileSndCancelled ft)
FileChunk {chunkNo, chunkBytes = chunk} -> do
case integrity of
MsgOk -> pure ()
MsgError MsgDuplicate -> pure () -- TODO remove once agent removes duplicates
MsgError e ->
badRcvFileChunk ft $ "invalid file chunk number " <> show chunkNo <> ": " <> show e
withStore' (\db -> createRcvFileChunk db ft chunkNo msgId) >>= \case
RcvChunkOk ->
if B.length chunk /= fromInteger chunkSize
then badRcvFileChunk ft "incorrect chunk size"
else appendFileChunk ft chunkNo chunk
RcvChunkFinal ->
if B.length chunk > fromInteger chunkSize
then badRcvFileChunk ft "incorrect chunk size"
else do
appendFileChunk ft chunkNo chunk
ci <- withStore $ \db -> do
liftIO $ do
updateRcvFileStatus db ft FSComplete
updateCIFileStatus db user fileId CIFSRcvComplete
deleteRcvFileChunks db ft
getChatItemByFileId db user fileId
toView $ CRRcvFileComplete ci
closeFileHandle fileId rcvFiles
withAgent (`deleteConnection` agentConnId)
RcvChunkDuplicate -> pure ()
RcvChunkError -> badRcvFileChunk ft $ "incorrect chunk number " <> show chunkNo
OK ->
-- [async agent commands] continuation on receiving OK
withCompletedCommand conn agentMsg $ \_cmdData -> pure ()
MERR _ err -> toView . CRChatError $ ChatErrorAgent err
ERR err -> toView . CRChatError $ ChatErrorAgent err
-- TODO add debugging output
@ -1675,13 +1729,30 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
toView $ CRReceivedContactRequest cReq
showToast (localDisplayName <> "> ") "wants to connect to you"
withAckMessage :: ConnId -> MsgMeta -> m () -> m ()
withAckMessage cId MsgMeta {recipient = (msgId, _)} action =
action `E.finally` withAgent (\a -> ackMessage a cId msgId `catchError` \_ -> pure ())
withCompletedCommand :: Connection -> ACommand 'Agent -> (CommandData -> m ()) -> m ()
withCompletedCommand Connection {connId} agentMsg action = do
let agentMsgTag = aCommandTag agentMsg
cmdData_ <- withStore' $ \db -> getCommandDataByCorrId db user corrId
case cmdData_ of
Just cmdData@CommandData {cmdId, cmdConnId = Just cmdConnId', cmdFunction}
| connId == cmdConnId' && agentMsgTag == commandExpectedResponse cmdFunction -> do
withStore' $ \db -> updateCommandStatus db user cmdId CSCompleted
action cmdData
| otherwise -> throwChatError . CEAgentCommandError $ "not matching connection id or unexpected response, details - connId = " <> show connId <> ", agentMsgTag = " <> show agentMsgTag <> ", cmdData " <> show cmdData
_ -> throwChatError . CEAgentCommandError $ "no connection or connection id, details - connId = " <> show connId <> ", agentMsgTag = " <> show agentMsgTag <> ", corrId = " <> commandId corrId
ackMsgDeliveryEvent :: Connection -> MsgMeta -> m ()
ackMsgDeliveryEvent Connection {connId} MsgMeta {recipient = (msgId, _)} =
withStore $ \db -> createRcvMsgDeliveryEvent db connId msgId MDSRcvAcknowledged
createAckCmd :: Connection -> m CommandId
createAckCmd Connection {connId} = do
withStore' $ \db -> createCommand db user (Just connId) CFAckMessage
withAckMessage :: ConnId -> CommandId -> MsgMeta -> m () -> m ()
withAckMessage cId cmdId MsgMeta {recipient = (msgId, _)} action =
-- [async agent commands] command should be asynchronous, continuation is ackMsgDeliveryEvent
action `E.finally` withAgent (\a -> ackMessageAsync a (aCorrId cmdId) cId msgId `catchError` \_ -> pure ())
ackMsgDeliveryEvent :: Connection -> CommandId -> m ()
ackMsgDeliveryEvent Connection {connId} ackCmdId =
withStore' $ \db -> createRcvMsgDeliveryEvent db connId ackCmdId MDSRcvAcknowledged
sentMsgDeliveryEvent :: Connection -> AgentMsgId -> m ()
sentMsgDeliveryEvent Connection {connId} msgId =
@ -1887,10 +1958,13 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
unless cancelled $
if fName == fileName
then
tryError (withAgent $ \a -> joinConnection a True fileConnReq . directMessage $ XOk) >>= \case
Right acId ->
withStore' $ \db -> createSndGroupFileTransferConnection db userId fileId acId m
Left e -> throwError e
tryError
-- [async agent commands] no continuation needed, but command should be asynchronous for stability
(joinAgentConnectionAsync user True fileConnReq . directMessage $ XOk)
>>= \case
Right connIds ->
withStore' $ \db -> createSndGroupFileTransferConnection db user fileId connIds m
Left e -> throwError e
else messageError "x.file.acpt.inv: fileName is different from expected"
groupMsgToView :: GroupInfo -> GroupMember -> ChatItem 'CTGroup 'MDRcv -> MsgMeta -> m ()
@ -2082,24 +2156,29 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
groupMsgToView gInfo m ci msgMeta
toView $ CRJoinedGroupMemberConnecting gInfo m newMember
xGrpMemIntro :: Connection -> GroupInfo -> GroupMember -> MemberInfo -> m ()
xGrpMemIntro conn gInfo@GroupInfo {groupId, membership} m memInfo@(MemberInfo memId _ _) = do
xGrpMemIntro :: GroupInfo -> GroupMember -> MemberInfo -> m ()
xGrpMemIntro gInfo@GroupInfo {membership} m memInfo@(MemberInfo memId _ _) = do
case memberCategory m of
GCHostMember -> do
members <- withStore' $ \db -> getGroupMembers db user gInfo
if isMember memId gInfo members
then messageWarning "x.grp.mem.intro ignored: member already exists"
else do
(groupConnId, groupConnReq) <- withAgent $ \a -> createConnection a True SCMInvitation
(directConnId, directConnReq) <- withAgent $ \a -> createConnection a True SCMInvitation
-- [async agent commands] commands should be asynchronous, continuation is to send XGrpMemInv - have to remember one has completed and process on second
groupConnIds <- createAgentConnectionAsync user True SCMInvitation
directConnIds <- createAgentConnectionAsync user True SCMInvitation
-- [incognito] direct connection with member has to be established using the same incognito profile [that was known to host and used for group membership]
let customUserProfileId = if memberIncognito membership then Just (localProfileId $ memberProfile membership) else Nothing
newMember <- withStore $ \db -> createIntroReMember db user gInfo m memInfo groupConnId directConnId customUserProfileId
let msg = XGrpMemInv memId IntroInvitation {groupConnReq, directConnReq}
void $ sendDirectMessage conn msg (GroupId groupId)
withStore' $ \db -> updateGroupMemberStatus db userId newMember GSMemIntroInvited
void $ withStore $ \db -> createIntroReMember db user gInfo m memInfo groupConnIds directConnIds customUserProfileId
_ -> messageError "x.grp.mem.intro can be only sent by host member"
sendXGrpMemIntro :: Int64 -> ConnReqInvitation -> XGrpMemIntroCont -> m ()
sendXGrpMemIntro hostConnId directConnReq XGrpMemIntroCont {groupId, groupMemberId, memberId, groupConnReq} = do
hostConn <- withStore $ \db -> getConnectionById db user hostConnId
let msg = XGrpMemInv memberId IntroInvitation {groupConnReq, directConnReq}
void $ sendDirectMessage hostConn msg (GroupId groupId)
withStore' $ \db -> updateGroupMemberStatusById db userId groupMemberId GSMemIntroInvited
xGrpMemInv :: GroupInfo -> GroupMember -> MemberId -> IntroInvitation -> m ()
xGrpMemInv gInfo m memId introInv = do
case memberCategory m of
@ -2125,10 +2204,11 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage
withStore' $ \db -> saveMemberInvitation db toMember introInv
-- [incognito] send membership incognito profile, create direct connection as incognito
let msg = XGrpMemInfo (memberId (membership :: GroupMember)) (fromLocalProfile $ memberProfile membership)
groupConnId <- withAgent $ \a -> joinConnection a True groupConnReq $ directMessage msg
directConnId <- withAgent $ \a -> joinConnection a True directConnReq $ directMessage msg
-- [async agent commands] no continuation needed, but commands should be asynchronous for stability
groupConnIds <- joinAgentConnectionAsync user True groupConnReq $ directMessage msg
directConnIds <- joinAgentConnectionAsync user True directConnReq $ directMessage msg
let customUserProfileId = if memberIncognito membership then Just (localProfileId $ memberProfile membership) else Nothing
withStore' $ \db -> createIntroToMemberContact db userId m toMember groupConnId directConnId customUserProfileId
withStore' $ \db -> createIntroToMemberContact db user m toMember groupConnIds directConnIds customUserProfileId
xGrpMemDel :: GroupInfo -> GroupMember -> MemberId -> RcvMessage -> MsgMeta -> m ()
xGrpMemDel gInfo@GroupInfo {membership} m memId msg msgMeta = do
@ -2382,12 +2462,12 @@ sendPendingGroupMessages GroupMember {groupMemberId, localDisplayName} conn = do
Nothing -> throwChatError $ CEGroupMemberIntroNotFound localDisplayName
Just introId -> withStore' $ \db -> updateIntroStatus db introId GMIntroInvForwarded
saveRcvMSG :: ChatMonad m => Connection -> ConnOrGroupId -> MsgMeta -> MsgBody -> m RcvMessage
saveRcvMSG Connection {connId} connOrGroupId agentMsgMeta msgBody = do
saveRcvMSG :: ChatMonad m => Connection -> ConnOrGroupId -> MsgMeta -> MsgBody -> CommandId -> m RcvMessage
saveRcvMSG Connection {connId} connOrGroupId agentMsgMeta msgBody agentAckCmdId = do
ChatMessage {msgId = sharedMsgId_, chatMsgEvent} <- liftEither $ parseChatMessage msgBody
let agentMsgId = fst $ recipient agentMsgMeta
newMsg = NewMessage {chatMsgEvent, msgBody}
rcvMsgDelivery = RcvMsgDelivery {connId, agentMsgId, agentMsgMeta}
rcvMsgDelivery = RcvMsgDelivery {connId, agentMsgId, agentMsgMeta, agentAckCmdId}
withStore' $ \db -> createNewMessageAndRcvMsgDelivery db connOrGroupId newMsg sharedMsgId_ rcvMsgDelivery
saveSndChatItem :: ChatMonad m => User -> ChatDirection c 'MDSnd -> SndMessage -> CIContent 'MDSnd -> Maybe (CIFile 'MDSnd) -> Maybe (CIQuote c) -> m (ChatItem c 'MDSnd)
@ -2414,9 +2494,22 @@ mkChatItem cd ciId content file quotedItem sharedMsgId itemTs currentTs = do
meta = mkCIMeta ciId content itemText ciStatusNew sharedMsgId False False tz currentTs itemTs currentTs currentTs
pure ChatItem {chatDir = toCIDirection cd, meta, content, formattedText = parseMaybeMarkdownList itemText, quotedItem, file}
allowAgentConnection :: ChatMonad m => Connection -> ConfirmationId -> ChatMsgEvent -> m ()
allowAgentConnection conn confId msg = do
withAgent $ \a -> allowConnection a (aConnId conn) confId $ directMessage msg
createAgentConnectionAsync :: forall m c. (ChatMonad m, ConnectionModeI c) => User -> Bool -> SConnectionMode c -> m (CommandId, ConnId)
createAgentConnectionAsync user enableNtfs cMode = do
cmdId <- withStore' $ \db -> createCommand db user Nothing CFCreateConn
connId <- withAgent $ \a -> createConnectionAsync a (aCorrId cmdId) enableNtfs cMode
pure (cmdId, connId)
joinAgentConnectionAsync :: ChatMonad m => User -> Bool -> ConnectionRequestUri c -> ConnInfo -> m (CommandId, ConnId)
joinAgentConnectionAsync user enableNtfs cReqUri cInfo = do
cmdId <- withStore' $ \db -> createCommand db user Nothing CFJoinConn
connId <- withAgent $ \a -> joinConnectionAsync a (aCorrId cmdId) enableNtfs cReqUri cInfo
pure (cmdId, connId)
allowAgentConnectionAsync :: ChatMonad m => User -> Connection -> ConfirmationId -> ChatMsgEvent -> m ()
allowAgentConnectionAsync user conn@Connection {connId} confId msg = do
cmdId <- withStore' $ \db -> createCommand db user (Just connId) CFAllowConn
withAgent $ \a -> allowConnectionAsync a (aCorrId cmdId) (aConnId conn) confId $ directMessage msg
withStore' $ \db -> updateConnectionStatus db conn ConnAccepted
getCreateActiveUser :: SQLiteStore -> IO User

View file

@ -423,6 +423,7 @@ data ChatErrorType
| CEAgentVersion
| CEAgentNoSubResult {agentConnId :: AgentConnId}
| CECommandError {message :: String}
| CEAgentCommandError {message :: String}
deriving (Show, Exception, Generic)
instance ToJSON ChatErrorType where

View file

@ -913,7 +913,8 @@ data SndMsgDelivery = SndMsgDelivery
data RcvMsgDelivery = RcvMsgDelivery
{ connId :: Int64,
agentMsgId :: AgentMsgId,
agentMsgMeta :: MsgMeta
agentMsgMeta :: MsgMeta,
agentAckCmdId :: CommandId
}
data MsgMetaJSON = MsgMetaJSON

View file

@ -0,0 +1,24 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Chat.Migrations.M20220909_commands where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
m20220909_commands :: Query
m20220909_commands =
[sql|
CREATE TABLE commands (
command_id INTEGER PRIMARY KEY, -- used as ACorrId
connection_id INTEGER REFERENCES connections ON DELETE CASCADE,
command_function TEXT NOT NULL,
command_status TEXT NOT NULL,
user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
ALTER TABLE msg_deliveries ADD COLUMN agent_ack_cmd_id INTEGER; -- correlation id
ALTER TABLE connections ADD COLUMN conn_req_inv BLOB;
|]

View file

@ -243,6 +243,7 @@ CREATE TABLE connections(
via_user_contact_link INTEGER DEFAULT NULL
REFERENCES user_contact_links(user_contact_link_id) ON DELETE SET NULL,
custom_user_profile_id INTEGER REFERENCES contact_profiles ON DELETE SET NULL,
conn_req_inv BLOB,
FOREIGN KEY(snd_file_id, connection_id)
REFERENCES snd_files(file_id, connection_id)
ON DELETE CASCADE
@ -299,7 +300,8 @@ CREATE TABLE msg_deliveries(
agent_msg_meta TEXT, -- JSON with timestamps etc. sent in MSG, NULL for sent
chat_ts TEXT NOT NULL DEFAULT(datetime('now')),
created_at TEXT CHECK(created_at NOT NULL),
updated_at TEXT CHECK(updated_at NOT NULL), -- broker_ts for received, created_at for sent
updated_at TEXT CHECK(updated_at NOT NULL),
agent_ack_cmd_id INTEGER, -- broker_ts for received, created_at for sent
UNIQUE(connection_id, agent_msg_id)
);
CREATE TABLE msg_delivery_events(
@ -400,3 +402,12 @@ CREATE INDEX idx_chat_items_contacts ON chat_items(
contact_id,
chat_item_id
);
CREATE TABLE commands(
command_id INTEGER PRIMARY KEY, -- used as ACorrId
connection_id INTEGER REFERENCES connections ON DELETE CASCADE,
command_function TEXT NOT NULL,
command_status TEXT NOT NULL,
user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);

View file

@ -58,6 +58,7 @@ module Simplex.Chat.Store
getPendingContactConnections,
getContactConnections,
getConnectionEntity,
getConnectionById,
getConnectionsContacts,
getGroupAndMember,
updateConnectionStatus,
@ -82,6 +83,7 @@ module Simplex.Chat.Store
getMemberInvitation,
createMemberConnection,
updateGroupMemberStatus,
updateGroupMemberStatusById,
createNewGroupMember,
deleteGroupMember,
deleteGroupMemberConnection,
@ -174,6 +176,13 @@ module Simplex.Chat.Store
createCall,
deleteCalls,
getCalls,
createCommand,
setCommandConnId,
updateCommandStatus,
getCommandDataByCorrId,
setConnConnReqInv,
getXGrpMemIntroContDirect,
getXGrpMemIntroContGroup,
getPendingContactConnection,
deletePendingContactConnection,
updateContactSettings,
@ -233,9 +242,10 @@ import Simplex.Chat.Migrations.M20220818_chat_notifications
import Simplex.Chat.Migrations.M20220822_groups_host_conn_custom_user_profile_id
import Simplex.Chat.Migrations.M20220823_delete_broken_group_event_chat_items
import Simplex.Chat.Migrations.M20220824_profiles_local_alias
import Simplex.Chat.Migrations.M20220909_commands
import Simplex.Chat.Protocol
import Simplex.Chat.Types
import Simplex.Messaging.Agent.Protocol (AgentMsgId, ConnId, InvitationId, MsgMeta (..))
import Simplex.Messaging.Agent.Protocol (ACorrId, AgentMsgId, ConnId, InvitationId, MsgMeta (..))
import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), createSQLiteStore, firstRow, firstRow', maybeFirstRow, withTransaction)
import Simplex.Messaging.Agent.Store.SQLite.Migrations (Migration (..))
import qualified Simplex.Messaging.Crypto as C
@ -267,7 +277,8 @@ schemaMigrations =
("20220818_chat_notifications", m20220818_chat_notifications),
("20220822_groups_host_conn_custom_user_profile_id", m20220822_groups_host_conn_custom_user_profile_id),
("20220823_delete_broken_group_event_chat_items", m20220823_delete_broken_group_event_chat_items),
("20220824_profiles_local_alias", m20220824_profiles_local_alias)
("20220824_profiles_local_alias", m20220824_profiles_local_alias),
("20220909_commands", m20220909_commands)
]
-- | The list of migrations in ascending order by date
@ -1264,6 +1275,19 @@ getConnectionEntity db user@User {userId, userContactId} agentConnId = do
userContact_ [Only cReq] = Right UserContact {userContactLinkId, connReqContact = cReq}
userContact_ _ = Left SEUserContactLinkNotFound
getConnectionById :: DB.Connection -> User -> Int64 -> ExceptT StoreError IO Connection
getConnectionById db User {userId} connId = ExceptT $ do
firstRow toConnection (SEConnectionNotFoundById connId) $
DB.query
db
[sql|
SELECT connection_id, agent_conn_id, conn_level, via_contact, via_user_contact_link, custom_user_profile_id,
conn_status, conn_type, contact_id, group_member_id, snd_file_id, rcv_file_id, user_contact_link_id, created_at
FROM connections
WHERE user_id = ? AND connection_id = ?
|]
(userId, connId)
getConnectionsContacts :: DB.Connection -> UserId -> [ConnId] -> IO [ContactRef]
getConnectionsContacts db userId agentConnIds = do
DB.execute_ db "DROP TABLE IF EXISTS temp.conn_ids"
@ -1654,7 +1678,10 @@ createMemberConnection db userId GroupMember {groupMemberId} agentConnId = do
void $ createMemberConnection_ db userId groupMemberId agentConnId Nothing 0 currentTs
updateGroupMemberStatus :: DB.Connection -> UserId -> GroupMember -> GroupMemberStatus -> IO ()
updateGroupMemberStatus db userId GroupMember {groupMemberId} memStatus = do
updateGroupMemberStatus db userId GroupMember {groupMemberId} = updateGroupMemberStatusById db userId groupMemberId
updateGroupMemberStatusById :: DB.Connection -> UserId -> GroupMemberId -> GroupMemberStatus -> IO ()
updateGroupMemberStatusById db userId groupMemberId memStatus = do
currentTs <- getCurrentTime
DB.execute
db
@ -1775,7 +1802,7 @@ saveIntroInvitation db reMember toMember introInv = do
WHERE group_member_intro_id = :intro_id
|]
[ ":intro_status" := GMIntroInvReceived,
":group_queue_info" := groupConnReq introInv,
":group_queue_info" := groupConnReq (introInv :: IntroInvitation),
":direct_queue_info" := directConnReq introInv,
":updated_at" := currentTs,
":intro_id" := introId intro
@ -1820,11 +1847,12 @@ getIntroduction_ db reMember toMember = ExceptT $ do
in Right GroupMemberIntro {introId, reMember, toMember, introStatus, introInvitation}
toIntro _ = Left SEIntroNotFound
createIntroReMember :: DB.Connection -> User -> GroupInfo -> GroupMember -> MemberInfo -> ConnId -> ConnId -> Maybe ProfileId -> ExceptT StoreError IO GroupMember
createIntroReMember db user@User {userId} gInfo@GroupInfo {groupId} _host@GroupMember {memberContactId, activeConn} memInfo@(MemberInfo _ _ memberProfile) groupAgentConnId directAgentConnId customUserProfileId = do
createIntroReMember :: DB.Connection -> User -> GroupInfo -> GroupMember -> MemberInfo -> (CommandId, ConnId) -> (CommandId, ConnId) -> Maybe ProfileId -> ExceptT StoreError IO GroupMember
createIntroReMember db user@User {userId} gInfo@GroupInfo {groupId} _host@GroupMember {memberContactId, activeConn} memInfo@(MemberInfo _ _ memberProfile) (groupCmdId, groupAgentConnId) (directCmdId, directAgentConnId) customUserProfileId = do
let cLevel = 1 + maybe 0 (connLevel :: Connection -> Int) activeConn
currentTs <- liftIO getCurrentTime
Connection {connId = directConnId} <- liftIO $ createConnection_ db userId ConnContact Nothing directAgentConnId memberContactId Nothing customUserProfileId cLevel currentTs
liftIO $ setCommandConnId db user directCmdId directConnId
(localDisplayName, contactId, memProfileId) <- createContact_ db userId directConnId memberProfile (Just groupId) currentTs
liftIO $ do
let newMember =
@ -1838,15 +1866,18 @@ createIntroReMember db user@User {userId} gInfo@GroupInfo {groupId} _host@GroupM
memProfileId
}
member <- createNewMember_ db user gInfo newMember currentTs
conn <- createMemberConnection_ db userId (groupMemberId' member) groupAgentConnId memberContactId cLevel currentTs
conn@Connection {connId = groupConnId} <- createMemberConnection_ db userId (groupMemberId' member) groupAgentConnId memberContactId cLevel currentTs
liftIO $ setCommandConnId db user groupCmdId groupConnId
pure (member :: GroupMember) {activeConn = Just conn}
createIntroToMemberContact :: DB.Connection -> UserId -> GroupMember -> GroupMember -> ConnId -> ConnId -> Maybe ProfileId -> IO ()
createIntroToMemberContact db userId GroupMember {memberContactId = viaContactId, activeConn} _to@GroupMember {groupMemberId, localDisplayName} groupAgentConnId directAgentConnId customUserProfileId = do
createIntroToMemberContact :: DB.Connection -> User -> GroupMember -> GroupMember -> (CommandId, ConnId) -> (CommandId, ConnId) -> Maybe ProfileId -> IO ()
createIntroToMemberContact db user@User {userId} GroupMember {memberContactId = viaContactId, activeConn} _to@GroupMember {groupMemberId, localDisplayName} (groupCmdId, groupAgentConnId) (directCmdId, directAgentConnId) customUserProfileId = do
let cLevel = 1 + maybe 0 (connLevel :: Connection -> Int) activeConn
currentTs <- getCurrentTime
void $ createMemberConnection_ db userId groupMemberId groupAgentConnId viaContactId cLevel currentTs
Connection {connId = groupConnId} <- createMemberConnection_ db userId groupMemberId groupAgentConnId viaContactId cLevel currentTs
setCommandConnId db user groupCmdId groupConnId
Connection {connId = directConnId} <- createConnection_ db userId ConnContact Nothing directAgentConnId viaContactId Nothing customUserProfileId cLevel currentTs
setCommandConnId db user directCmdId directConnId
contactId <- createMemberContact_ directConnId currentTs
updateMember_ contactId currentTs
where
@ -1974,10 +2005,11 @@ createSndGroupFileTransfer db userId GroupInfo {groupId} filePath FileInvitation
(userId, groupId, fileName, filePath, fileSize, chunkSize, CIFSSndStored, currentTs, currentTs)
insertedRowId db
createSndGroupFileTransferConnection :: DB.Connection -> UserId -> Int64 -> ConnId -> GroupMember -> IO ()
createSndGroupFileTransferConnection db userId fileId acId GroupMember {groupMemberId} = do
createSndGroupFileTransferConnection :: DB.Connection -> User -> Int64 -> (CommandId, ConnId) -> GroupMember -> IO ()
createSndGroupFileTransferConnection db user@User {userId} fileId (cmdId, acId) GroupMember {groupMemberId} = do
currentTs <- getCurrentTime
Connection {connId} <- createSndFileConnection_ db userId fileId acId
setCommandConnId db user cmdId connId
DB.execute
db
"INSERT INTO snd_files (file_id, file_status, connection_id, group_member_id, created_at, updated_at) VALUES (?,?,?,?,?,?)"
@ -2427,7 +2459,7 @@ createSndMsgDelivery db sndMsgDelivery messageId = do
createMsgDeliveryEvent_ db msgDeliveryId MDSSndAgent currentTs
createNewMessageAndRcvMsgDelivery :: DB.Connection -> ConnOrGroupId -> NewMessage -> Maybe SharedMsgId -> RcvMsgDelivery -> IO RcvMessage
createNewMessageAndRcvMsgDelivery db connOrGroupId NewMessage {chatMsgEvent, msgBody} sharedMsgId_ RcvMsgDelivery {connId, agentMsgId, agentMsgMeta} = do
createNewMessageAndRcvMsgDelivery db connOrGroupId NewMessage {chatMsgEvent, msgBody} sharedMsgId_ RcvMsgDelivery {connId, agentMsgId, agentMsgMeta, agentAckCmdId} = do
currentTs <- getCurrentTime
DB.execute
db
@ -2436,8 +2468,8 @@ createNewMessageAndRcvMsgDelivery db connOrGroupId NewMessage {chatMsgEvent, msg
msgId <- insertedRowId db
DB.execute
db
"INSERT INTO msg_deliveries (message_id, connection_id, agent_msg_id, agent_msg_meta, chat_ts, created_at, updated_at) VALUES (?,?,?,?,?,?,?)"
(msgId, connId, agentMsgId, msgMetaJson agentMsgMeta, snd $ broker agentMsgMeta, currentTs, currentTs)
"INSERT INTO msg_deliveries (message_id, connection_id, agent_msg_id, agent_msg_meta, agent_ack_cmd_id, chat_ts, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?)"
(msgId, connId, agentMsgId, msgMetaJson agentMsgMeta, agentAckCmdId, snd $ broker agentMsgMeta, currentTs, currentTs)
msgDeliveryId <- insertedRowId db
createMsgDeliveryEvent_ db msgDeliveryId MDSRcvAgent currentTs
pure RcvMessage {msgId, chatMsgEvent, sharedMsgId_, msgBody}
@ -2453,12 +2485,12 @@ createSndMsgDeliveryEvent db connId agentMsgId sndMsgDeliveryStatus = do
currentTs <- getCurrentTime
createMsgDeliveryEvent_ db msgDeliveryId sndMsgDeliveryStatus currentTs
createRcvMsgDeliveryEvent :: DB.Connection -> Int64 -> AgentMsgId -> MsgDeliveryStatus 'MDRcv -> ExceptT StoreError IO ()
createRcvMsgDeliveryEvent db connId agentMsgId rcvMsgDeliveryStatus = do
msgDeliveryId <- getMsgDeliveryId_ db connId agentMsgId
liftIO $ do
createRcvMsgDeliveryEvent :: DB.Connection -> Int64 -> CommandId -> MsgDeliveryStatus 'MDRcv -> IO ()
createRcvMsgDeliveryEvent db connId cmdId rcvMsgDeliveryStatus = do
msgDeliveryId <- getMsgDeliveryIdByCmdId_ db connId cmdId
forM_ msgDeliveryId $ \mdId -> do
currentTs <- getCurrentTime
createMsgDeliveryEvent_ db msgDeliveryId rcvMsgDeliveryStatus currentTs
createMsgDeliveryEvent_ db mdId rcvMsgDeliveryStatus currentTs
createSndMsgDelivery_ :: DB.Connection -> SndMsgDelivery -> MessageId -> UTCTime -> IO Int64
createSndMsgDelivery_ db SndMsgDelivery {connId, agentMsgId} messageId createdAt = do
@ -2496,6 +2528,19 @@ getMsgDeliveryId_ db connId agentMsgId =
|]
(connId, agentMsgId)
getMsgDeliveryIdByCmdId_ :: DB.Connection -> Int64 -> CommandId -> IO (Maybe AgentMsgId)
getMsgDeliveryIdByCmdId_ db connId cmdId =
maybeFirstRow fromOnly $
DB.query
db
[sql|
SELECT msg_delivery_id
FROM msg_deliveries
WHERE connection_id = ? AND agent_ack_cmd_id = ?
LIMIT 1
|]
(connId, cmdId)
createPendingGroupMessage :: DB.Connection -> Int64 -> MessageId -> Maybe Int64 -> IO ()
createPendingGroupMessage db groupMemberId messageId introId_ = do
currentTs <- getCurrentTime
@ -3846,6 +3891,129 @@ getCalls db User {userId} = do
toCall :: (ContactId, CallId, ChatItemId, CallState, UTCTime) -> Call
toCall (contactId, callId, chatItemId, callState, callTs) = Call {contactId, callId, chatItemId, callState, callTs}
createCommand :: DB.Connection -> User -> Maybe Int64 -> CommandFunction -> IO CommandId
createCommand db User {userId} connId commandFunction = do
currentTs <- getCurrentTime
DB.execute
db
[sql|
INSERT INTO commands (connection_id, command_function, command_status, user_id, created_at, updated_at)
VALUES (?,?,?,?,?,?)
|]
(connId, commandFunction, CSCreated, userId, currentTs, currentTs)
insertedRowId db
setCommandConnId :: DB.Connection -> User -> CommandId -> Int64 -> IO ()
setCommandConnId db User {userId} cmdId connId = do
updatedAt <- getCurrentTime
DB.execute
db
[sql|
UPDATE commands
SET connection_id = ?, updated_at = ?
WHERE user_id = ? AND command_id = ?
|]
(connId, updatedAt, userId, cmdId)
updateCommandStatus :: DB.Connection -> User -> CommandId -> CommandStatus -> IO ()
updateCommandStatus db User {userId} cmdId status = do
updatedAt <- getCurrentTime
DB.execute
db
[sql|
UPDATE commands
SET command_status = ?, updated_at = ?
WHERE user_id = ? AND command_id = ?
|]
(status, updatedAt, userId, cmdId)
getCommandDataByCorrId :: DB.Connection -> User -> ACorrId -> IO (Maybe CommandData)
getCommandDataByCorrId db User {userId} corrId =
maybeFirstRow toCommandData $
DB.query
db
[sql|
SELECT command_id, connection_id, command_function, command_status
FROM commands
WHERE user_id = ? AND command_id = ?
|]
(userId, commandId corrId)
where
toCommandData :: (CommandId, Maybe Int64, CommandFunction, CommandStatus) -> CommandData
toCommandData (cmdId, cmdConnId, cmdFunction, cmdStatus) = CommandData {cmdId, cmdConnId, cmdFunction, cmdStatus}
setConnConnReqInv :: DB.Connection -> User -> Int64 -> ConnReqInvitation -> IO ()
setConnConnReqInv db User {userId} connId connReq = do
updatedAt <- getCurrentTime
DB.execute
db
[sql|
UPDATE connections
SET conn_req_inv = ?, updated_at = ?
WHERE user_id = ? AND connection_id = ?
|]
(connReq, updatedAt, userId, connId)
getXGrpMemIntroContDirect :: DB.Connection -> User -> Contact -> IO (Maybe (Int64, XGrpMemIntroCont))
getXGrpMemIntroContDirect db User {userId} Contact {contactId} = do
fmap join . maybeFirstRow toCont $
DB.query
db
[sql|
SELECT ch.connection_id, g.group_id, m.group_member_id, m.member_id, c.conn_req_inv
FROM contacts ct
JOIN group_members m ON m.contact_id = ct.contact_id
LEFT JOIN connections c ON c.connection_id = (
SELECT MAX(cc.connection_id)
FROM connections cc
WHERE cc.group_member_id = m.group_member_id
)
JOIN groups g ON g.group_id = m.group_id AND g.group_id = ct.via_group
JOIN group_members mh ON mh.group_id = g.group_id
LEFT JOIN connections ch ON ch.connection_id = (
SELECT max(cc.connection_id)
FROM connections cc
where cc.group_member_id = mh.group_member_id
)
WHERE ct.user_id = ? AND ct.contact_id = ? AND mh.member_category = ?
|]
(userId, contactId, GCHostMember)
where
toCont :: (Int64, GroupId, GroupMemberId, MemberId, Maybe ConnReqInvitation) -> Maybe (Int64, XGrpMemIntroCont)
toCont (hostConnId, groupId, groupMemberId, memberId, connReq_) = case connReq_ of
Just groupConnReq -> Just (hostConnId, XGrpMemIntroCont {groupId, groupMemberId, memberId, groupConnReq})
_ -> Nothing
getXGrpMemIntroContGroup :: DB.Connection -> User -> GroupMember -> IO (Maybe (Int64, ConnReqInvitation))
getXGrpMemIntroContGroup db User {userId} GroupMember {groupMemberId} = do
fmap join . maybeFirstRow toCont $
DB.query
db
[sql|
SELECT ch.connection_id, c.conn_req_inv
FROM group_members m
JOIN contacts ct ON ct.contact_id = m.contact_id
LEFT JOIN connections c ON c.connection_id = (
SELECT MAX(cc.connection_id)
FROM connections cc
WHERE cc.contact_id = ct.contact_id
)
JOIN groups g ON g.group_id = m.group_id AND g.group_id = ct.via_group
JOIN group_members mh ON mh.group_id = g.group_id
LEFT JOIN connections ch ON ch.connection_id = (
SELECT max(cc.connection_id)
FROM connections cc
where cc.group_member_id = mh.group_member_id
)
WHERE m.user_id = ? AND m.group_member_id = ? AND mh.member_category = ?
|]
(userId, groupMemberId, GCHostMember)
where
toCont :: (Int64, Maybe ConnReqInvitation) -> Maybe (Int64, ConnReqInvitation)
toCont (hostConnId, connReq_) = case connReq_ of
Just connReq -> Just (hostConnId, connReq)
_ -> Nothing
-- | Saves unique local display name based on passed displayName, suffixed with _N if required.
-- This function should be called inside transaction.
withLocalDisplayName :: forall a. DB.Connection -> UserId -> Text -> (Text -> IO (Either StoreError a)) -> IO (Either StoreError a)
@ -3932,6 +4100,7 @@ data StoreError
| SESharedMsgIdNotFoundByFileId {fileId :: FileTransferId}
| SEFileIdNotFoundBySharedMsgId {sharedMsgId :: SharedMsgId}
| SEConnectionNotFound {agentConnId :: AgentConnId}
| SEConnectionNotFoundById {connId :: Int64}
| SEPendingConnectionNotFound {connId :: Int64}
| SEIntroNotFound
| SEUniqueID

View file

@ -20,7 +20,7 @@ import qualified Data.Aeson as J
import qualified Data.Aeson.Encoding as JE
import qualified Data.Aeson.Types as JT
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.ByteString.Char8 (ByteString)
import Data.ByteString.Char8 (ByteString, pack, unpack)
import qualified Data.ByteString.Char8 as B
import Data.Int (Int64)
import Data.Maybe (isJust)
@ -34,7 +34,7 @@ import Database.SQLite.Simple.Internal (Field (..))
import Database.SQLite.Simple.Ok (Ok (Ok))
import Database.SQLite.Simple.ToField (ToField (..))
import GHC.Generics (Generic)
import Simplex.Messaging.Agent.Protocol (ConnId, ConnectionMode (..), ConnectionRequestUri, InvitationId)
import Simplex.Messaging.Agent.Protocol (ACommandTag (..), ACorrId, AParty (..), ConnId, ConnectionMode (..), ConnectionRequestUri, InvitationId)
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (dropPrefix, fromTextField_, sumTypeJSON)
import Simplex.Messaging.Util ((<$?>))
@ -915,3 +915,77 @@ type JSONString = String
textParseJSON :: TextEncoding a => String -> J.Value -> JT.Parser a
textParseJSON name = J.withText name $ maybe (fail $ "bad " <> name) pure . textDecode
type CommandId = Int64
aCorrId :: CommandId -> ACorrId
aCorrId = pack . show
commandId :: ACorrId -> String
commandId = unpack
data CommandStatus
= CSCreated
| CSCompleted
deriving (Show, Generic)
instance FromField CommandStatus where fromField = fromTextField_ textDecode
instance ToField CommandStatus where toField = toField . textEncode
instance TextEncoding CommandStatus where
textDecode = \case
"created" -> Just CSCreated
"completed" -> Just CSCompleted
_ -> Nothing
textEncode = \case
CSCreated -> "created"
CSCompleted -> "completed"
data CommandFunction
= CFCreateConn
| CFJoinConn
| CFAllowConn
| CFAckMessage
deriving (Eq, Show, Generic)
instance FromField CommandFunction where fromField = fromTextField_ textDecode
instance ToField CommandFunction where toField = toField . textEncode
instance TextEncoding CommandFunction where
textDecode = \case
"create_conn" -> Just CFCreateConn
"join_conn" -> Just CFJoinConn
"allow_conn" -> Just CFAllowConn
"ack_message" -> Just CFAckMessage
_ -> Nothing
textEncode = \case
CFCreateConn -> "create_conn"
CFJoinConn -> "join_conn"
CFAllowConn -> "allow_conn"
CFAckMessage -> "ack_message"
commandExpectedResponse :: CommandFunction -> ACommandTag 'Agent
commandExpectedResponse = \case
CFCreateConn -> INV_
CFJoinConn -> OK_
CFAllowConn -> OK_
CFAckMessage -> OK_
data CommandData = CommandData
{ cmdId :: CommandId,
cmdConnId :: Maybe Int64,
cmdFunction :: CommandFunction,
cmdStatus :: CommandStatus
}
deriving (Show)
-- ad-hoc type for data required for XGrpMemIntro continuation
data XGrpMemIntroCont = XGrpMemIntroCont
{ groupId :: GroupId,
groupMemberId :: GroupMemberId,
memberId :: MemberId,
groupConnReq :: ConnReqInvitation
}
deriving (Show)

View file

@ -926,6 +926,7 @@ viewChatError = \case
CEAgentVersion -> ["unsupported agent version"]
CEAgentNoSubResult connId -> ["no subscription result for connection: " <> sShow connId]
CECommandError e -> ["bad chat command: " <> plain e]
CEAgentCommandError e -> ["agent command error: " <> plain e]
-- e -> ["chat error: " <> sShow e]
ChatErrorStore err -> case err of
SEDuplicateName -> ["this display name is already used by user, contact or group"]

View file

@ -49,7 +49,7 @@ extra-deps:
# - simplexmq-1.0.0@sha256:34b2004728ae396e3ae449cd090ba7410781e2b3cefc59259915f4ca5daa9ea8,8561
# - ../simplexmq
- github: simplex-chat/simplexmq
commit: f2c1455a2755e1275983dc154321fc0a5c0d7b17
commit: e328ae5d060645a8ef090b1b3d88bc20a5902e45
# - terminal-0.2.0.0@sha256:de6770ecaae3197c66ac1f0db5a80cf5a5b1d3b64a66a05b50f442de5ad39570,2977
- github: simplex-chat/aeson
commit: 3eb66f9a68f103b5f1489382aad89f5712a64db7

View file

@ -8,7 +8,7 @@
module ChatClient where
import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread)
import Control.Concurrent (ThreadId, forkIO, forkIOWithUnmask, killThread, threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (bracket, bracket_)
@ -127,9 +127,10 @@ startTestChat_ st cfg opts dbFilePrefix user = do
stopTestChat :: TestCC -> IO ()
stopTestChat TestCC {chatController = cc, chatAsync, termAsync} = do
stopChatController cc
void . forkIO $ stopChatController cc
uninterruptibleCancel termAsync
uninterruptibleCancel chatAsync
threadDelay 100000
withNewTestChat :: String -> Profile -> (TestCC -> IO a) -> IO a
withNewTestChat = withNewTestChatCfgOpts testCfg testOpts