core: adapt simplexmq api for shared msg body (via MsgReq markers) (#5626)

* core: shared msg body 2

* WIP

* compiles

* refactor

* refactor

* refactor

* format

* simplexmq

* refactor

* refactor ChatMsgReq

* agent query plans

* simpler

* test

* test

* fix test

* agent plans

* simplexmq

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
spaced4ndy 2025-02-15 00:12:32 +04:00 committed by GitHub
parent 8dbebbe3d6
commit a90f255df5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 209 additions and 59 deletions

View file

@ -12,7 +12,7 @@ constraints: zip +disable-bzip2 +disable-zstd
source-repository-package source-repository-package
type: git type: git
location: https://github.com/simplex-chat/simplexmq.git location: https://github.com/simplex-chat/simplexmq.git
tag: bd97cb04495b90412c1300fd1a4862f488db85cb tag: 7ac80bffcb51e2461ff8d0f54094943c56f1c4e6
source-repository-package source-repository-package
type: git type: git

View file

@ -1,5 +1,5 @@
{ {
"https://github.com/simplex-chat/simplexmq.git"."bd97cb04495b90412c1300fd1a4862f488db85cb" = "19i0r2b4kfkq2zlbmq134a0hk0vszhm6wdlfyp58d35zqrc0xadf"; "https://github.com/simplex-chat/simplexmq.git"."7ac80bffcb51e2461ff8d0f54094943c56f1c4e6" = "1qzv3vs1f0h5df5i0fi2hqiagkkwqghpzjgh9bnjrgmhkhvkl2iq";
"https://github.com/simplex-chat/hs-socks.git"."a30cc7a79a08d8108316094f8f2f82a0c5e1ac51" = "0yasvnr7g91k76mjkamvzab2kvlb1g5pspjyjn2fr6v83swjhj38"; "https://github.com/simplex-chat/hs-socks.git"."a30cc7a79a08d8108316094f8f2f82a0c5e1ac51" = "0yasvnr7g91k76mjkamvzab2kvlb1g5pspjyjn2fr6v83swjhj38";
"https://github.com/simplex-chat/direct-sqlcipher.git"."f814ee68b16a9447fbb467ccc8f29bdd3546bfd9" = "1ql13f4kfwkbaq7nygkxgw84213i0zm7c1a8hwvramayxl38dq5d"; "https://github.com/simplex-chat/direct-sqlcipher.git"."f814ee68b16a9447fbb467ccc8f29bdd3546bfd9" = "1ql13f4kfwkbaq7nygkxgw84213i0zm7c1a8hwvramayxl38dq5d";
"https://github.com/simplex-chat/sqlcipher-simple.git"."a46bd361a19376c5211f1058908fc0ae6bf42446" = "1z0r78d8f0812kxbgsm735qf6xx8lvaz27k1a0b4a2m0sshpd5gl"; "https://github.com/simplex-chat/sqlcipher-simple.git"."a46bd361a19376c5211f1058908fc0ae6bf42446" = "1z0r78d8f0812kxbgsm735qf6xx8lvaz27k1a0b4a2m0sshpd5gl";

View file

@ -80,7 +80,7 @@ import Simplex.Chat.Store.Shared
import Simplex.Chat.Types import Simplex.Chat.Types
import Simplex.Chat.Types.Preferences import Simplex.Chat.Types.Preferences
import Simplex.Chat.Types.Shared import Simplex.Chat.Types.Shared
import Simplex.Chat.Util (liftIOEither) import Simplex.Chat.Util (liftIOEither, zipWith3')
import qualified Simplex.Chat.Util as U import qualified Simplex.Chat.Util as U
import Simplex.FileTransfer.Description (FileDescriptionURI (..), maxFileSize, maxFileSizeHard) import Simplex.FileTransfer.Description (FileDescriptionURI (..), maxFileSize, maxFileSizeHard)
import Simplex.Messaging.Agent as Agent import Simplex.Messaging.Agent as Agent
@ -1893,6 +1893,12 @@ processChatCommand' vr = \case
pure CRBroadcastSent {user, msgContent = mc, successes = 0, failures = 0, timestamp} pure CRBroadcastSent {user, msgContent = mc, successes = 0, failures = 0, timestamp}
Just (ctConns :: NonEmpty (Contact, Connection)) -> do Just (ctConns :: NonEmpty (Contact, Connection)) -> do
let idsEvts = L.map ctSndEvent ctConns let idsEvts = L.map ctSndEvent ctConns
-- TODO Broadcast rework
-- In createNewSndMessage and encodeChatMessage we could use Nothing for sharedMsgId,
-- then we could reuse message body across broadcast.
-- Encoding different sharedMsgId and reusing body is meaningless as referencing will not work anyway.
-- As an improvement, single message record with its sharedMsgId could be created for new "broadcast" entity.
-- Then all recipients could refer to broadcast message using same sharedMsgId.
sndMsgs <- lift $ createSndMessages idsEvts sndMsgs <- lift $ createSndMessages idsEvts
let msgReqs_ :: NonEmpty (Either ChatError ChatMsgReq) = L.zipWith (fmap . ctMsgReq) ctConns sndMsgs let msgReqs_ :: NonEmpty (Either ChatError ChatMsgReq) = L.zipWith (fmap . ctMsgReq) ctConns sndMsgs
(errs, ctSndMsgs :: [(Contact, SndMessage)]) <- (errs, ctSndMsgs :: [(Contact, SndMessage)]) <-
@ -1909,9 +1915,7 @@ processChatCommand' vr = \case
ctSndEvent :: (Contact, Connection) -> (ConnOrGroupId, ChatMsgEvent 'Json) ctSndEvent :: (Contact, Connection) -> (ConnOrGroupId, ChatMsgEvent 'Json)
ctSndEvent (_, Connection {connId}) = (ConnectionId connId, XMsgNew $ MCSimple (extMsgContent mc Nothing)) ctSndEvent (_, Connection {connId}) = (ConnectionId connId, XMsgNew $ MCSimple (extMsgContent mc Nothing))
ctMsgReq :: (Contact, Connection) -> SndMessage -> ChatMsgReq ctMsgReq :: (Contact, Connection) -> SndMessage -> ChatMsgReq
ctMsgReq (_, conn) SndMessage {msgId, msgBody} = (conn, MsgFlags {notification = hasNotification XMsgNew_}, msgBody, [msgId]) ctMsgReq (_, conn) SndMessage {msgId, msgBody} = (conn, MsgFlags {notification = hasNotification XMsgNew_}, (vrValue msgBody, [msgId]))
zipWith3' :: (a -> b -> c -> d) -> NonEmpty a -> NonEmpty b -> NonEmpty c -> NonEmpty d
zipWith3' f ~(x :| xs) ~(y :| ys) ~(z :| zs) = f x y z :| zipWith3 f xs ys zs
combineResults :: (Contact, Connection) -> Either ChatError SndMessage -> Either ChatError ([Int64], PQEncryption) -> Either ChatError (Contact, SndMessage) combineResults :: (Contact, Connection) -> Either ChatError SndMessage -> Either ChatError ([Int64], PQEncryption) -> Either ChatError (Contact, SndMessage)
combineResults (ct, _) (Right msg') (Right _) = Right (ct, msg') combineResults (ct, _) (Right msg') (Right _) = Right (ct, msg')
combineResults _ (Left e) _ = Left e combineResults _ (Left e) _ = Left e
@ -2662,7 +2666,7 @@ processChatCommand' vr = \case
ctMsgReq :: ChangedProfileContact -> Either ChatError SndMessage -> Either ChatError ChatMsgReq ctMsgReq :: ChangedProfileContact -> Either ChatError SndMessage -> Either ChatError ChatMsgReq
ctMsgReq ChangedProfileContact {conn} = ctMsgReq ChangedProfileContact {conn} =
fmap $ \SndMessage {msgId, msgBody} -> fmap $ \SndMessage {msgId, msgBody} ->
(conn, MsgFlags {notification = hasNotification XInfo_}, msgBody, [msgId]) (conn, MsgFlags {notification = hasNotification XInfo_}, (vrValue msgBody, [msgId]))
updateContactPrefs :: User -> Contact -> Preferences -> CM ChatResponse updateContactPrefs :: User -> Contact -> Preferences -> CM ChatResponse
updateContactPrefs _ ct@Contact {activeConn = Nothing} _ = throwChatError $ CEContactNotActive ct updateContactPrefs _ ct@Contact {activeConn = Nothing} _ = throwChatError $ CEContactNotActive ct
updateContactPrefs user@User {userId} ct@Contact {activeConn = Just Connection {customUserProfileId}, userPreferences = contactUserPrefs} contactUserPrefs' updateContactPrefs user@User {userId} ct@Contact {activeConn = Just Connection {customUserProfileId}, userPreferences = contactUserPrefs} contactUserPrefs'
@ -2713,7 +2717,7 @@ processChatCommand' vr = \case
when (memberStatus membership == GSMemInvited) $ throwChatError (CEGroupNotJoined g) when (memberStatus membership == GSMemInvited) $ throwChatError (CEGroupNotJoined g)
when (memberRemoved membership) $ throwChatError CEGroupMemberUserRemoved when (memberRemoved membership) $ throwChatError CEGroupMemberUserRemoved
unless (memberActive membership) $ throwChatError CEGroupMemberNotActive unless (memberActive membership) $ throwChatError CEGroupMemberNotActive
delGroupChatItemsForMembers :: User -> GroupInfo -> [GroupMember] -> [CChatItem CTGroup] -> CM ChatResponse delGroupChatItemsForMembers :: User -> GroupInfo -> [GroupMember] -> [CChatItem 'CTGroup] -> CM ChatResponse
delGroupChatItemsForMembers user gInfo ms items = do delGroupChatItemsForMembers user gInfo ms items = do
assertDeletable gInfo items assertDeletable gInfo items
assertUserGroupRole gInfo GRAdmin -- TODO GRModerator when most users migrate assertUserGroupRole gInfo GRAdmin -- TODO GRModerator when most users migrate
@ -2723,8 +2727,8 @@ processChatCommand' vr = \case
delGroupChatItems user gInfo items True delGroupChatItems user gInfo items True
where where
assertDeletable :: GroupInfo -> [CChatItem 'CTGroup] -> CM () assertDeletable :: GroupInfo -> [CChatItem 'CTGroup] -> CM ()
assertDeletable GroupInfo {membership = GroupMember {memberRole = membershipMemRole}} items = assertDeletable GroupInfo {membership = GroupMember {memberRole = membershipMemRole}} items' =
unless (all itemDeletable items) $ throwChatError CEInvalidChatItemDelete unless (all itemDeletable items') $ throwChatError CEInvalidChatItemDelete
where where
itemDeletable :: CChatItem 'CTGroup -> Bool itemDeletable :: CChatItem 'CTGroup -> Bool
itemDeletable (CChatItem _ ChatItem {chatDir, meta = CIMeta {itemSharedMsgId}}) = itemDeletable (CChatItem _ ChatItem {chatDir, meta = CIMeta {itemSharedMsgId}}) =

View file

@ -1312,7 +1312,7 @@ batchSendConnMessagesB _user conn msgFlags msgs_ = do
let batched_ = batchSndMessagesJSON msgs_ let batched_ = batchSndMessagesJSON msgs_
case L.nonEmpty batched_ of case L.nonEmpty batched_ of
Just batched' -> do Just batched' -> do
let msgReqs = L.map (fmap (msgBatchReq conn msgFlags)) batched' let msgReqs = L.map (fmap msgBatchReq_) batched'
delivered <- deliverMessagesB msgReqs delivered <- deliverMessagesB msgReqs
let msgs' = concat $ L.zipWith flattenMsgs batched' delivered let msgs' = concat $ L.zipWith flattenMsgs batched' delivered
pqEnc = findLastPQEnc delivered pqEnc = findLastPQEnc delivered
@ -1320,6 +1320,9 @@ batchSendConnMessagesB _user conn msgFlags msgs_ = do
pure (msgs', pqEnc) pure (msgs', pqEnc)
Nothing -> pure ([], Nothing) Nothing -> pure ([], Nothing)
where where
msgBatchReq_ :: MsgBatch -> ChatMsgReq
msgBatchReq_ (MsgBatch batchBody sndMsgs) =
(conn, msgFlags, (vrValue batchBody, map (\SndMessage {msgId} -> msgId) sndMsgs))
flattenMsgs :: Either ChatError MsgBatch -> Either ChatError ([Int64], PQEncryption) -> [Either ChatError SndMessage] flattenMsgs :: Either ChatError MsgBatch -> Either ChatError ([Int64], PQEncryption) -> [Either ChatError SndMessage]
flattenMsgs (Right (MsgBatch _ sndMsgs)) (Right _) = map Right sndMsgs flattenMsgs (Right (MsgBatch _ sndMsgs)) (Right _) = map Right sndMsgs
flattenMsgs (Right (MsgBatch _ sndMsgs)) (Left ce) = replicate (length sndMsgs) (Left ce) flattenMsgs (Right (MsgBatch _ sndMsgs)) (Left ce) = replicate (length sndMsgs) (Left ce)
@ -1330,9 +1333,6 @@ batchSendConnMessagesB _user conn msgFlags msgs_ = do
batchSndMessagesJSON :: NonEmpty (Either ChatError SndMessage) -> [Either ChatError MsgBatch] batchSndMessagesJSON :: NonEmpty (Either ChatError SndMessage) -> [Either ChatError MsgBatch]
batchSndMessagesJSON = batchMessages maxEncodedMsgLength . L.toList batchSndMessagesJSON = batchMessages maxEncodedMsgLength . L.toList
msgBatchReq :: Connection -> MsgFlags -> MsgBatch -> ChatMsgReq
msgBatchReq conn msgFlags (MsgBatch batchBody sndMsgs) = (conn, msgFlags, batchBody, map (\SndMessage {msgId} -> msgId) sndMsgs)
encodeConnInfo :: MsgEncodingI e => ChatMsgEvent e -> CM ByteString encodeConnInfo :: MsgEncodingI e => ChatMsgEvent e -> CM ByteString
encodeConnInfo chatMsgEvent = do encodeConnInfo chatMsgEvent = do
vr <- chatVersionRange vr <- chatVersionRange
@ -1358,7 +1358,7 @@ deliverMessage conn cmEventTag msgBody msgId = do
deliverMessage' :: Connection -> MsgFlags -> MsgBody -> MessageId -> CM (Int64, PQEncryption) deliverMessage' :: Connection -> MsgFlags -> MsgBody -> MessageId -> CM (Int64, PQEncryption)
deliverMessage' conn msgFlags msgBody msgId = deliverMessage' conn msgFlags msgBody msgId =
deliverMessages ((conn, msgFlags, msgBody, [msgId]) :| []) >>= \case deliverMessages ((conn, msgFlags, (vrValue msgBody, [msgId])) :| []) >>= \case
r :| [] -> case r of r :| [] -> case r of
Right ([deliveryId], pqEnc) -> pure (deliveryId, pqEnc) Right ([deliveryId], pqEnc) -> pure (deliveryId, pqEnc)
Right (deliveryIds, _) -> throwChatError $ CEInternalError $ "deliverMessage: expected 1 delivery id, got " <> show (length deliveryIds) Right (deliveryIds, _) -> throwChatError $ CEInternalError $ "deliverMessage: expected 1 delivery id, got " <> show (length deliveryIds)
@ -1366,45 +1366,45 @@ deliverMessage' conn msgFlags msgBody msgId =
rs -> throwChatError $ CEInternalError $ "deliverMessage: expected 1 result, got " <> show (length rs) rs -> throwChatError $ CEInternalError $ "deliverMessage: expected 1 result, got " <> show (length rs)
-- [MessageId] - SndMessage ids inside MsgBatch, or single message id -- [MessageId] - SndMessage ids inside MsgBatch, or single message id
type ChatMsgReq = (Connection, MsgFlags, MsgBody, [MessageId]) type ChatMsgReq = (Connection, MsgFlags, (ValueOrRef MsgBody, [MessageId]))
deliverMessages :: NonEmpty ChatMsgReq -> CM (NonEmpty (Either ChatError ([Int64], PQEncryption))) deliverMessages :: NonEmpty ChatMsgReq -> CM (NonEmpty (Either ChatError ([Int64], PQEncryption)))
deliverMessages msgs = deliverMessagesB $ L.map Right msgs deliverMessages msgs = deliverMessagesB $ L.map Right msgs
deliverMessagesB :: NonEmpty (Either ChatError ChatMsgReq) -> CM (NonEmpty (Either ChatError ([Int64], PQEncryption))) deliverMessagesB :: NonEmpty (Either ChatError ChatMsgReq) -> CM (NonEmpty (Either ChatError ([Int64], PQEncryption)))
deliverMessagesB msgReqs = do deliverMessagesB msgReqs = do
msgReqs' <- liftIO compressBodies msgReqs' <- if any connSupportsPQ msgReqs then liftIO compressBodies else pure msgReqs
sent <- L.zipWith prepareBatch msgReqs' <$> withAgent (`sendMessagesB` snd (mapAccumL toAgent Nothing msgReqs')) sent <- L.zipWith prepareBatch msgReqs' <$> withAgent (`sendMessagesB` snd (mapAccumL toAgent Nothing msgReqs'))
lift . void $ withStoreBatch' $ \db -> map (updatePQSndEnabled db) (rights . L.toList $ sent) lift . void $ withStoreBatch' $ \db -> map (updatePQSndEnabled db) (rights . L.toList $ sent)
lift . withStoreBatch $ \db -> L.map (bindRight $ createDelivery db) sent lift . withStoreBatch $ \db -> L.map (bindRight $ createDelivery db) sent
where where
connSupportsPQ = \case
Right (Connection {pqSupport = PQSupportOn, connChatVersion = v}, _, _) -> v >= pqEncryptionCompressionVersion
_ -> False
compressBodies = compressBodies =
forME msgReqs $ \mr@(conn@Connection {pqSupport, connChatVersion = v}, msgFlags, msgBody, msgIds) -> forME msgReqs $ \(conn, msgFlags, (mbr, msgIds)) -> runExceptT $ do
runExceptT $ case pqSupport of mbr' <- case mbr of
-- we only compress messages when: VRValue i msgBody | B.length msgBody > maxCompressedMsgLength -> do
-- 1) PQ support is enabled
-- 2) version is compatible with compression
-- 3) message is longer than max compressed size (as this function is not used for batched messages anyway)
PQSupportOn | v >= pqEncryptionCompressionVersion && B.length msgBody > maxCompressedMsgLength -> do
let msgBody' = compressedBatchMsgBody_ msgBody let msgBody' = compressedBatchMsgBody_ msgBody
when (B.length msgBody' > maxCompressedMsgLength) $ throwError $ ChatError $ CEException "large compressed message" when (B.length msgBody' > maxCompressedMsgLength) $ throwError $ ChatError $ CEException "large compressed message"
pure (conn, msgFlags, msgBody', msgIds) pure $ VRValue i msgBody'
_ -> pure mr v -> pure v
pure (conn, msgFlags, (mbr', msgIds))
toAgent prev = \case toAgent prev = \case
Right (conn@Connection {connId, pqEncryption}, msgFlags, msgBody, _msgIds) -> Right (conn@Connection {connId, pqEncryption}, msgFlags, (mbr, _msgIds)) ->
let cId = case prev of let cId = case prev of
Just prevId | prevId == connId -> "" Just prevId | prevId == connId -> ""
_ -> aConnId conn _ -> aConnId conn
in (Just connId, Right (cId, pqEncryption, msgFlags, msgBody)) in (Just connId, Right (cId, pqEncryption, msgFlags, mbr))
Left _ce -> (prev, Left (AP.INTERNAL "ChatError, skip")) -- as long as it is Left, the agent batchers should just step over it Left _ce -> (prev, Left (AP.INTERNAL "ChatError, skip")) -- as long as it is Left, the agent batchers should just step over it
prepareBatch (Right req) (Right ar) = Right (req, ar) prepareBatch (Right req) (Right ar) = Right (req, ar)
prepareBatch (Left ce) _ = Left ce -- restore original ChatError prepareBatch (Left ce) _ = Left ce -- restore original ChatError
prepareBatch _ (Left ae) = Left $ ChatErrorAgent ae Nothing prepareBatch _ (Left ae) = Left $ ChatErrorAgent ae Nothing
createDelivery :: DB.Connection -> (ChatMsgReq, (AgentMsgId, PQEncryption)) -> IO (Either ChatError ([Int64], PQEncryption)) createDelivery :: DB.Connection -> (ChatMsgReq, (AgentMsgId, PQEncryption)) -> IO (Either ChatError ([Int64], PQEncryption))
createDelivery db ((Connection {connId}, _, _, msgIds), (agentMsgId, pqEnc')) = do createDelivery db ((Connection {connId}, _, (_, msgIds)), (agentMsgId, pqEnc')) = do
Right . (,pqEnc') <$> mapM (createSndMsgDelivery db (SndMsgDelivery {connId, agentMsgId})) msgIds Right . (,pqEnc') <$> mapM (createSndMsgDelivery db (SndMsgDelivery {connId, agentMsgId})) msgIds
updatePQSndEnabled :: DB.Connection -> (ChatMsgReq, (AgentMsgId, PQEncryption)) -> IO () updatePQSndEnabled :: DB.Connection -> (ChatMsgReq, (AgentMsgId, PQEncryption)) -> IO ()
updatePQSndEnabled db ((Connection {connId, pqSndEnabled}, _, _, _), (_, pqSndEnabled')) = updatePQSndEnabled db ((Connection {connId, pqSndEnabled}, _, _), (_, pqSndEnabled')) =
case (pqSndEnabled, pqSndEnabled') of case (pqSndEnabled, pqSndEnabled') of
(Just b, b') | b' /= b -> updatePQ (Just b, b') | b' /= b -> updatePQ
(Nothing, PQEncOn) -> updatePQ (Nothing, PQEncOn) -> updatePQ
@ -1471,7 +1471,7 @@ sendGroupMessages_ _user gInfo@GroupInfo {groupId} members events = do
stored <- lift $ withStoreBatch (\db -> map (bindRight $ createPendingMsg db) pendingReqs) stored <- lift $ withStoreBatch (\db -> map (bindRight $ createPendingMsg db) pendingReqs)
when (length stored /= length pendingMemIds) $ logError "sendGroupMessages_: pendingMemIds and stored length mismatch" when (length stored /= length pendingMemIds) $ logError "sendGroupMessages_: pendingMemIds and stored length mismatch"
-- Zip for easier access to results -- Zip for easier access to results
let sentTo = zipWith3 (\mId mReq r -> (mId, fmap (\(_, _, _, msgIds) -> msgIds) mReq, r)) sendToMemIds msgReqs delivered let sentTo = zipWith3 (\mId mReq r -> (mId, fmap (\(_, _, (_, msgIds)) -> msgIds) mReq, r)) sendToMemIds msgReqs delivered
pending = zipWith3 (\mId pReq r -> (mId, fmap snd pReq, r)) pendingMemIds pendingReqs stored pending = zipWith3 (\mId pReq r -> (mId, fmap snd pReq, r)) pendingMemIds pendingReqs stored
pure (sndMsgs_, GroupSndResult {sentTo, pending, forwarded}) pure (sndMsgs_, GroupSndResult {sentTo, pending, forwarded})
where where
@ -1495,24 +1495,36 @@ sendGroupMessages_ _user gInfo@GroupInfo {groupId} members events = do
mId = groupMemberId' m mId = groupMemberId' m
mIds' = S.insert mId mIds mIds' = S.insert mId mIds
prepareMsgReqs :: MsgFlags -> NonEmpty (Either ChatError SndMessage) -> [(GroupMember, Connection)] -> [(GroupMember, Connection)] -> ([GroupMemberId], [Either ChatError ChatMsgReq]) prepareMsgReqs :: MsgFlags -> NonEmpty (Either ChatError SndMessage) -> [(GroupMember, Connection)] -> [(GroupMember, Connection)] -> ([GroupMemberId], [Either ChatError ChatMsgReq])
prepareMsgReqs msgFlags msgs_ toSendSeparate toSendBatched = do prepareMsgReqs msgFlags msgs toSendSeparate toSendBatched = do
let batched_ = batchSndMessagesJSON msgs_ let batched_ = batchSndMessagesJSON msgs
case L.nonEmpty batched_ of case L.nonEmpty batched_ of
Just batched' -> do Just batched' -> do
let (memsSep, mreqsSep) = foldr' foldMsgs ([], []) toSendSeparate let lenMsgs = length msgs
(memsBtch, mreqsBtch) = foldr' (foldBatches batched') ([], []) toSendBatched (memsSep, mreqsSep) = foldMembers lenMsgs sndMessageMBR msgs toSendSeparate
(memsBtch, mreqsBtch) = foldMembers (length batched' + lenMsgs) msgBatchMBR batched' toSendBatched
(memsSep <> memsBtch, mreqsSep <> mreqsBtch) (memsSep <> memsBtch, mreqsSep <> mreqsBtch)
Nothing -> ([], []) Nothing -> ([], [])
where where
foldMsgs :: (GroupMember, Connection) -> ([GroupMemberId], [Either ChatError ChatMsgReq]) -> ([GroupMemberId], [Either ChatError ChatMsgReq]) foldMembers :: forall a. Int -> (Maybe Int -> Int -> a -> (ValueOrRef MsgBody, [MessageId])) -> NonEmpty (Either ChatError a) -> [(GroupMember, Connection)] -> ([GroupMemberId], [Either ChatError ChatMsgReq])
foldMsgs (GroupMember {groupMemberId}, conn) memIdsReqs = foldMembers lastRef mkMb mbs mems = snd $ foldr' foldMsgBodies (lastMemIdx_, ([], [])) mems
foldr' (\msg_ (memIds, reqs) -> (groupMemberId : memIds, fmap sndMessageReq msg_ : reqs)) memIdsReqs msgs_
where where
sndMessageReq :: SndMessage -> ChatMsgReq lastMemIdx_ = let len = length mems in if len > 1 then Just len else Nothing
sndMessageReq SndMessage {msgId, msgBody} = (conn, msgFlags, msgBody, [msgId]) foldMsgBodies :: (GroupMember, Connection) -> (Maybe Int, ([GroupMemberId], [Either ChatError ChatMsgReq])) -> (Maybe Int, ([GroupMemberId], [Either ChatError ChatMsgReq]))
foldBatches :: NonEmpty (Either ChatError MsgBatch) -> (GroupMember, Connection) -> ([GroupMemberId], [Either ChatError ChatMsgReq]) -> ([GroupMemberId], [Either ChatError ChatMsgReq]) foldMsgBodies (GroupMember {groupMemberId}, conn) (memIdx_, memIdsReqs) =
foldBatches batched' (GroupMember {groupMemberId}, conn) memIdsReqs = (subtract 1 <$> memIdx_,) $ snd $ foldr' addBody (lastRef, memIdsReqs) mbs
foldr' (\batch_ (memIds, reqs) -> (groupMemberId : memIds, fmap (msgBatchReq conn msgFlags) batch_ : reqs)) memIdsReqs batched' where
addBody :: Either ChatError a -> (Int, ([GroupMemberId], [Either ChatError ChatMsgReq])) -> (Int, ([GroupMemberId], [Either ChatError ChatMsgReq]))
addBody mb (i, (memIds, reqs)) =
let req = (conn,msgFlags,) . mkMb memIdx_ i <$> mb
in (i - 1, (groupMemberId : memIds, req : reqs))
sndMessageMBR :: Maybe Int -> Int -> SndMessage -> (ValueOrRef MsgBody, [MessageId])
sndMessageMBR memIdx_ i SndMessage {msgId, msgBody} = (vrValue_ memIdx_ i msgBody, [msgId])
msgBatchMBR :: Maybe Int -> Int -> MsgBatch -> (ValueOrRef MsgBody, [MessageId])
msgBatchMBR memIdx_ i (MsgBatch batchBody sndMsgs) = (vrValue_ memIdx_ i batchBody, map (\SndMessage {msgId} -> msgId) sndMsgs)
vrValue_ memIdx_ i v = case memIdx_ of
Nothing -> VRValue Nothing v -- sending to one member, do not reference bodies
Just 1 -> VRValue (Just i) v
Just _ -> VRRef i
preparePending :: NonEmpty (Either ChatError SndMessage) -> [GroupMember] -> ([GroupMemberId], [Either ChatError (GroupMemberId, MessageId)]) preparePending :: NonEmpty (Either ChatError SndMessage) -> [GroupMember] -> ([GroupMemberId], [Either ChatError (GroupMemberId, MessageId)])
preparePending msgs_ = preparePending msgs_ =
foldr' foldMsgs ([], []) foldr' foldMsgs ([], [])

View file

@ -274,7 +274,7 @@ processAgentMsgSndFile _corrId aFileId msg = do
map (\fileDescr -> (conn, (connOrGroupId, XMsgFileDescr {msgId = sharedMsgId, fileDescr}))) (L.toList $ splitFileDescr partSize rfdText) map (\fileDescr -> (conn, (connOrGroupId, XMsgFileDescr {msgId = sharedMsgId, fileDescr}))) (L.toList $ splitFileDescr partSize rfdText)
toMsgReq :: (Connection, (ConnOrGroupId, ChatMsgEvent 'Json)) -> SndMessage -> ChatMsgReq toMsgReq :: (Connection, (ConnOrGroupId, ChatMsgEvent 'Json)) -> SndMessage -> ChatMsgReq
toMsgReq (conn, _) SndMessage {msgId, msgBody} = toMsgReq (conn, _) SndMessage {msgId, msgBody} =
(conn, MsgFlags {notification = hasNotification XMsgFileDescr_}, msgBody, [msgId]) (conn, MsgFlags {notification = hasNotification XMsgFileDescr_}, (vrValue msgBody, [msgId]))
sendFileError :: FileError -> Text -> VersionRangeChat -> FileTransferMeta -> CM () sendFileError :: FileError -> Text -> VersionRangeChat -> FileTransferMeta -> CM ()
sendFileError ferr err vr ft = do sendFileError ferr err vr ft = do
logError $ "Sent file error: " <> err logError $ "Sent file error: " <> err

View file

@ -15,6 +15,20 @@ SEARCH s USING INTEGER PRIMARY KEY (rowid=?)
SEARCH c USING INTEGER PRIMARY KEY (rowid=?) SEARCH c USING INTEGER PRIMARY KEY (rowid=?)
SEARCH f USING INTEGER PRIMARY KEY (rowid=?) SEARCH f USING INTEGER PRIMARY KEY (rowid=?)
Query:
SELECT
m.msg_type, m.msg_flags, m.msg_body, m.pq_encryption, m.internal_ts, m.internal_snd_id, s.previous_msg_hash,
s.retry_int_slow, s.retry_int_fast, s.msg_encrypt_key, s.padded_msg_len, sb.agent_msg
FROM messages m
JOIN snd_messages s ON s.conn_id = m.conn_id AND s.internal_id = m.internal_id
LEFT JOIN snd_message_bodies sb ON sb.snd_message_body_id = s.snd_message_body_id
WHERE m.conn_id = ? AND m.internal_id = ?
Plan:
SEARCH m USING PRIMARY KEY (conn_id=? AND internal_id=?)
SEARCH s USING PRIMARY KEY (conn_id=?)
SEARCH sb USING INTEGER PRIMARY KEY (rowid=?) LEFT-JOIN
Query: Query:
SELECT SELECT
r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries, r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries,
@ -45,16 +59,6 @@ Query:
Plan: Plan:
SEARCH commands USING INDEX idx_commands_server_commands (host=? AND port=?) SEARCH commands USING INDEX idx_commands_server_commands (host=? AND port=?)
Query:
SELECT m.msg_type, m.msg_flags, m.msg_body, m.pq_encryption, m.internal_ts, s.retry_int_slow, s.retry_int_fast
FROM messages m
JOIN snd_messages s ON s.conn_id = m.conn_id AND s.internal_id = m.internal_id
WHERE m.conn_id = ? AND m.internal_id = ?
Plan:
SEARCH m USING PRIMARY KEY (conn_id=? AND internal_id=?)
SEARCH s USING PRIMARY KEY (conn_id=?)
Query: Query:
SELECT rcv_file_chunk_id, chunk_no, chunk_size, digest, tmp_path SELECT rcv_file_chunk_id, chunk_no, chunk_size, digest, tmp_path
FROM rcv_file_chunks FROM rcv_file_chunks
@ -512,9 +516,9 @@ Plan:
Query: Query:
INSERT INTO snd_messages INSERT INTO snd_messages
( conn_id, internal_snd_id, internal_id, internal_hash, previous_msg_hash) ( conn_id, internal_snd_id, internal_id, internal_hash, previous_msg_hash, msg_encrypt_key, padded_msg_len, snd_message_body_id)
VALUES VALUES
(?,?,?,?,?) (?,?,?,?,?,?,?,?)
Plan: Plan:
SEARCH messages USING COVERING INDEX idx_messages_conn_id_internal_snd_id (conn_id=? AND internal_snd_id=?) SEARCH messages USING COVERING INDEX idx_messages_conn_id_internal_snd_id (conn_id=? AND internal_snd_id=?)
@ -805,6 +809,11 @@ Plan:
SEARCH snd_files USING INTEGER PRIMARY KEY (rowid=?) SEARCH snd_files USING INTEGER PRIMARY KEY (rowid=?)
SEARCH snd_file_chunks USING COVERING INDEX idx_snd_file_chunks_snd_file_id (snd_file_id=?) SEARCH snd_file_chunks USING COVERING INDEX idx_snd_file_chunks_snd_file_id (snd_file_id=?)
Query: DELETE FROM snd_message_bodies WHERE snd_message_body_id = ?
Plan:
SEARCH snd_message_bodies USING INTEGER PRIMARY KEY (rowid=?)
SEARCH snd_messages USING COVERING INDEX idx_snd_messages_snd_message_body_id (snd_message_body_id=?)
Query: DELETE FROM snd_message_deliveries WHERE conn_id = ? AND snd_queue_id = ? Query: DELETE FROM snd_message_deliveries WHERE conn_id = ? AND snd_queue_id = ?
Plan: Plan:
SEARCH snd_message_deliveries USING COVERING INDEX idx_snd_message_deliveries (conn_id=? AND snd_queue_id=?) SEARCH snd_message_deliveries USING COVERING INDEX idx_snd_message_deliveries (conn_id=? AND snd_queue_id=?)
@ -861,6 +870,10 @@ Plan:
Query: INSERT INTO snd_files (snd_file_entity_id, user_id, path, src_file_key, src_file_nonce, num_recipients, prefix_path, key, nonce, status, redirect_size, redirect_digest) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) Query: INSERT INTO snd_files (snd_file_entity_id, user_id, path, src_file_key, src_file_nonce, num_recipients, prefix_path, key, nonce, status, redirect_size, redirect_digest) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
Plan: Plan:
Query: INSERT INTO snd_message_bodies (agent_msg) VALUES (?) RETURNING snd_message_body_id
Plan:
SEARCH snd_messages USING COVERING INDEX idx_snd_messages_snd_message_body_id (snd_message_body_id=?)
Query: INSERT INTO snd_message_deliveries (conn_id, snd_queue_id, internal_id) VALUES (?, ?, ?) Query: INSERT INTO snd_message_deliveries (conn_id, snd_queue_id, internal_id) VALUES (?, ?, ?)
Plan: Plan:
@ -897,6 +910,10 @@ Query: SELECT count(*) FROM snd_message_deliveries WHERE conn_id = ? AND interna
Plan: Plan:
SEARCH snd_message_deliveries USING COVERING INDEX idx_snd_message_deliveries_expired (conn_id=?) SEARCH snd_message_deliveries USING COVERING INDEX idx_snd_message_deliveries_expired (conn_id=?)
Query: SELECT count(1) FROM snd_message_bodies
Plan:
SCAN snd_message_bodies
Query: SELECT deleted FROM snd_files WHERE snd_file_id = ? Query: SELECT deleted FROM snd_files WHERE snd_file_id = ?
Plan: Plan:
SEARCH snd_files USING INTEGER PRIMARY KEY (rowid=?) SEARCH snd_files USING INTEGER PRIMARY KEY (rowid=?)
@ -921,7 +938,7 @@ Query: SELECT ratchet_state, x3dh_pub_key_1, x3dh_pub_key_2, pq_pub_kem FROM rat
Plan: Plan:
SEARCH ratchets USING PRIMARY KEY (conn_id=?) SEARCH ratchets USING PRIMARY KEY (conn_id=?)
Query: SELECT rcpt_internal_id, rcpt_status FROM snd_messages WHERE conn_id = ? AND internal_id = ? Query: SELECT rcpt_internal_id, rcpt_status, snd_message_body_id FROM snd_messages WHERE conn_id = ? AND internal_id = ?
Plan: Plan:
SEARCH snd_messages USING PRIMARY KEY (conn_id=?) SEARCH snd_messages USING PRIMARY KEY (conn_id=?)

View file

@ -4,7 +4,7 @@
{-# LANGUAGE TupleSections #-} {-# LANGUAGE TupleSections #-}
{-# OPTIONS_GHC -Wno-orphans #-} {-# OPTIONS_GHC -Wno-orphans #-}
module Simplex.Chat.Util (week, encryptFile, chunkSize, liftIOEither, shuffle) where module Simplex.Chat.Util (week, encryptFile, chunkSize, liftIOEither, shuffle, zipWith3') where
import Control.Exception (Exception) import Control.Exception (Exception)
import Control.Monad import Control.Monad
@ -15,6 +15,7 @@ import Control.Monad.Reader
import Data.Bifunctor (first) import Data.Bifunctor (first)
import qualified Data.ByteString.Lazy as LB import qualified Data.ByteString.Lazy as LB
import Data.List (sortBy) import Data.List (sortBy)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Ord (comparing) import Data.Ord (comparing)
import Data.Time (NominalDiffTime) import Data.Time (NominalDiffTime)
import Data.Word (Word16) import Data.Word (Word16)
@ -52,6 +53,9 @@ shuffle xs = map snd . sortBy (comparing fst) <$> mapM (\x -> (,x) <$> random) x
random :: IO Word16 random :: IO Word16
random = randomRIO (0, 65535) random = randomRIO (0, 65535)
zipWith3' :: (a -> b -> c -> d) -> NonEmpty a -> NonEmpty b -> NonEmpty c -> NonEmpty d
zipWith3' f ~(x :| xs) ~(y :| ys) ~(z :| zs) = f x y z :| zipWith3 f xs ys zs
liftIOEither :: (MonadIO m, MonadError e m) => IO (Either e a) -> m a liftIOEither :: (MonadIO m, MonadError e m) => IO (Either e a) -> m a
liftIOEither a = liftIO a >>= liftEither liftIOEither a = liftIO a >>= liftEither
{-# INLINE liftIOEither #-} {-# INLINE liftIOEither #-}

View file

@ -83,6 +83,8 @@ chatGroupTests = do
it "send multiple messages api" testSendMulti it "send multiple messages api" testSendMulti
it "send multiple timed messages" testSendMultiTimed it "send multiple timed messages" testSendMultiTimed
it "send multiple messages (many chat batches)" testSendMultiManyBatches it "send multiple messages (many chat batches)" testSendMultiManyBatches
xit'' "shared message body is reused" testSharedMessageBody
xit'' "shared batch body is reused" testSharedBatchBody
describe "async group connections" $ do describe "async group connections" $ do
xit "create and join group when clients go offline" testGroupAsync xit "create and join group when clients go offline" testGroupAsync
describe "group links" $ do describe "group links" $ do
@ -1883,6 +1885,112 @@ testSendMultiManyBatches =
DB.query db "SELECT count(1) FROM chat_items WHERE chat_item_id > ?" (Only msgIdCath) :: IO [[Int]] DB.query db "SELECT count(1) FROM chat_items WHERE chat_item_id > ?" (Only msgIdCath) :: IO [[Int]]
cathItemsCount `shouldBe` [[300]] cathItemsCount `shouldBe` [[300]]
testSharedMessageBody :: HasCallStack => TestParams -> IO ()
testSharedMessageBody ps =
withNewTestChatOpts ps opts' "alice" aliceProfile $ \alice -> do
withSmpServer' serverCfg' $
withNewTestChatOpts ps opts' "bob" bobProfile $ \bob ->
withNewTestChatOpts ps opts' "cath" cathProfile $ \cath -> do
createGroup3 "team" alice bob cath
alice <##. "server disconnected localhost"
alice #> "#team hello"
bodiesCount1 <- withCCAgentTransaction alice $ \db ->
DB.query_ db "SELECT count(1) FROM snd_message_bodies" :: IO [[Int]]
bodiesCount1 `shouldBe` [[1]]
withSmpServer' serverCfg' $
withTestChatOpts ps opts' "bob" $ \bob ->
withTestChatOpts ps opts' "cath" $ \cath -> do
concurrentlyN_
[ alice <##. "server connected localhost",
do
bob <## "1 contacts connected (use /cs for the list)"
bob <## "#team: connected to server(s)",
do
cath <## "1 contacts connected (use /cs for the list)"
cath <## "#team: connected to server(s)"
]
bob <# "#team alice> hello"
cath <# "#team alice> hello"
bodiesCount2 <- withCCAgentTransaction alice $ \db ->
DB.query_ db "SELECT count(1) FROM snd_message_bodies" :: IO [[Int]]
bodiesCount2 `shouldBe` [[0]]
alice <##. "server disconnected localhost"
where
tmp = tmpPath ps
serverCfg' =
smpServerCfg
{ transports = [("7003", transport @TLS, False)],
storeLogFile = Just $ tmp <> "/smp-server-store.log",
storeMsgsFile = Just $ tmp <> "/smp-server-messages.log"
}
opts' =
testOpts
{ coreOptions =
testCoreOpts
{ smpServers = ["smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=:server_password@localhost:7003"]
}
}
testSharedBatchBody :: HasCallStack => TestParams -> IO ()
testSharedBatchBody ps =
withNewTestChatOpts ps opts' "alice" aliceProfile $ \alice -> do
withSmpServer' serverCfg' $
withNewTestChatOpts ps opts' "bob" bobProfile $ \bob ->
withNewTestChatOpts ps opts' "cath" cathProfile $ \cath -> do
createGroup3 "team" alice bob cath
alice <##. "server disconnected localhost"
let cm i = "{\"msgContent\": {\"type\": \"text\", \"text\": \"message " <> show i <> "\"}}"
cms = intercalate ", " (map cm [1 .. 300 :: Int])
alice `send` ("/_send #1 json [" <> cms <> "]")
_ <- getTermLine alice
alice <## "300 messages sent"
bodiesCount1 <- withCCAgentTransaction alice $ \db ->
DB.query_ db "SELECT count(1) FROM snd_message_bodies" :: IO [[Int]]
bodiesCount1 `shouldBe` [[3]]
withSmpServer' serverCfg' $
withTestChatOpts ps opts' "bob" $ \bob ->
withTestChatOpts ps opts' "cath" $ \cath -> do
concurrentlyN_
[ alice <##. "server connected localhost",
do
bob <## "1 contacts connected (use /cs for the list)"
bob <## "#team: connected to server(s)",
do
cath <## "1 contacts connected (use /cs for the list)"
cath <## "#team: connected to server(s)"
]
forM_ [(1 :: Int) .. 300] $ \i -> do
concurrently_
(bob <# ("#team alice> message " <> show i))
(cath <# ("#team alice> message " <> show i))
bodiesCount2 <- withCCAgentTransaction alice $ \db ->
DB.query_ db "SELECT count(1) FROM snd_message_bodies" :: IO [[Int]]
bodiesCount2 `shouldBe` [[0]]
alice <##. "server disconnected localhost"
where
tmp = tmpPath ps
serverCfg' =
smpServerCfg
{ transports = [("7003", transport @TLS, False)],
storeLogFile = Just $ tmp <> "/smp-server-store.log",
storeMsgsFile = Just $ tmp <> "/smp-server-messages.log"
}
opts' =
testOpts
{ coreOptions =
testCoreOpts
{ smpServers = ["smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=:server_password@localhost:7003"]
}
}
testGroupAsync :: HasCallStack => TestParams -> IO () testGroupAsync :: HasCallStack => TestParams -> IO ()
testGroupAsync ps = do testGroupAsync ps = do
withNewTestChat ps "alice" aliceProfile $ \alice -> do withNewTestChat ps "alice" aliceProfile $ \alice -> do

View file

@ -33,6 +33,7 @@ import Simplex.Chat.Types
import Simplex.Chat.Types.Preferences import Simplex.Chat.Types.Preferences
import Simplex.Chat.Types.Shared import Simplex.Chat.Types.Shared
import Simplex.FileTransfer.Client.Main (xftpClientCLI) import Simplex.FileTransfer.Client.Main (xftpClientCLI)
import Simplex.Messaging.Agent.Client (agentClientStore)
import Simplex.Messaging.Agent.Store.AgentStore (maybeFirstRow, withTransaction) import Simplex.Messaging.Agent.Store.AgentStore (maybeFirstRow, withTransaction)
import qualified Simplex.Messaging.Agent.Store.DB as DB import qualified Simplex.Messaging.Agent.Store.DB as DB
import qualified Simplex.Messaging.Crypto as C import qualified Simplex.Messaging.Crypto as C
@ -556,6 +557,10 @@ withCCTransaction :: TestCC -> (DB.Connection -> IO a) -> IO a
withCCTransaction cc action = withCCTransaction cc action =
withTransaction (chatStore $ chatController cc) $ \db -> action db withTransaction (chatStore $ chatController cc) $ \db -> action db
withCCAgentTransaction :: TestCC -> (DB.Connection -> IO a) -> IO a
withCCAgentTransaction TestCC {chatController = ChatController {smpAgent}} action =
withTransaction (agentClientStore smpAgent) $ \db -> action db
createCCNoteFolder :: TestCC -> IO () createCCNoteFolder :: TestCC -> IO ()
createCCNoteFolder cc = createCCNoteFolder cc =
withCCTransaction cc $ \db -> withCCTransaction cc $ \db ->