diff --git a/cabal.project b/cabal.project index dcd465c658..70f66604de 100644 --- a/cabal.project +++ b/cabal.project @@ -12,7 +12,7 @@ constraints: zip +disable-bzip2 +disable-zstd source-repository-package type: git location: https://github.com/simplex-chat/simplexmq.git - tag: bd97cb04495b90412c1300fd1a4862f488db85cb + tag: 7ac80bffcb51e2461ff8d0f54094943c56f1c4e6 source-repository-package type: git diff --git a/scripts/nix/sha256map.nix b/scripts/nix/sha256map.nix index d440afb61f..0975e772b9 100644 --- a/scripts/nix/sha256map.nix +++ b/scripts/nix/sha256map.nix @@ -1,5 +1,5 @@ { - "https://github.com/simplex-chat/simplexmq.git"."bd97cb04495b90412c1300fd1a4862f488db85cb" = "19i0r2b4kfkq2zlbmq134a0hk0vszhm6wdlfyp58d35zqrc0xadf"; + "https://github.com/simplex-chat/simplexmq.git"."7ac80bffcb51e2461ff8d0f54094943c56f1c4e6" = "1qzv3vs1f0h5df5i0fi2hqiagkkwqghpzjgh9bnjrgmhkhvkl2iq"; "https://github.com/simplex-chat/hs-socks.git"."a30cc7a79a08d8108316094f8f2f82a0c5e1ac51" = "0yasvnr7g91k76mjkamvzab2kvlb1g5pspjyjn2fr6v83swjhj38"; "https://github.com/simplex-chat/direct-sqlcipher.git"."f814ee68b16a9447fbb467ccc8f29bdd3546bfd9" = "1ql13f4kfwkbaq7nygkxgw84213i0zm7c1a8hwvramayxl38dq5d"; "https://github.com/simplex-chat/sqlcipher-simple.git"."a46bd361a19376c5211f1058908fc0ae6bf42446" = "1z0r78d8f0812kxbgsm735qf6xx8lvaz27k1a0b4a2m0sshpd5gl"; diff --git a/src/Simplex/Chat/Library/Commands.hs b/src/Simplex/Chat/Library/Commands.hs index 4c5beb9cee..ebc63e131b 100644 --- a/src/Simplex/Chat/Library/Commands.hs +++ b/src/Simplex/Chat/Library/Commands.hs @@ -80,7 +80,7 @@ import Simplex.Chat.Store.Shared import Simplex.Chat.Types import Simplex.Chat.Types.Preferences import Simplex.Chat.Types.Shared -import Simplex.Chat.Util (liftIOEither) +import Simplex.Chat.Util (liftIOEither, zipWith3') import qualified Simplex.Chat.Util as U import Simplex.FileTransfer.Description (FileDescriptionURI (..), maxFileSize, maxFileSizeHard) import Simplex.Messaging.Agent as Agent @@ -1893,6 +1893,12 @@ processChatCommand' vr = \case pure CRBroadcastSent {user, msgContent = mc, successes = 0, failures = 0, timestamp} Just (ctConns :: NonEmpty (Contact, Connection)) -> do let idsEvts = L.map ctSndEvent ctConns + -- TODO Broadcast rework + -- In createNewSndMessage and encodeChatMessage we could use Nothing for sharedMsgId, + -- then we could reuse message body across broadcast. + -- Encoding different sharedMsgId and reusing body is meaningless as referencing will not work anyway. + -- As an improvement, single message record with its sharedMsgId could be created for new "broadcast" entity. + -- Then all recipients could refer to broadcast message using same sharedMsgId. sndMsgs <- lift $ createSndMessages idsEvts let msgReqs_ :: NonEmpty (Either ChatError ChatMsgReq) = L.zipWith (fmap . ctMsgReq) ctConns sndMsgs (errs, ctSndMsgs :: [(Contact, SndMessage)]) <- @@ -1909,9 +1915,7 @@ processChatCommand' vr = \case ctSndEvent :: (Contact, Connection) -> (ConnOrGroupId, ChatMsgEvent 'Json) ctSndEvent (_, Connection {connId}) = (ConnectionId connId, XMsgNew $ MCSimple (extMsgContent mc Nothing)) ctMsgReq :: (Contact, Connection) -> SndMessage -> ChatMsgReq - ctMsgReq (_, conn) SndMessage {msgId, msgBody} = (conn, MsgFlags {notification = hasNotification XMsgNew_}, msgBody, [msgId]) - zipWith3' :: (a -> b -> c -> d) -> NonEmpty a -> NonEmpty b -> NonEmpty c -> NonEmpty d - zipWith3' f ~(x :| xs) ~(y :| ys) ~(z :| zs) = f x y z :| zipWith3 f xs ys zs + ctMsgReq (_, conn) SndMessage {msgId, msgBody} = (conn, MsgFlags {notification = hasNotification XMsgNew_}, (vrValue msgBody, [msgId])) combineResults :: (Contact, Connection) -> Either ChatError SndMessage -> Either ChatError ([Int64], PQEncryption) -> Either ChatError (Contact, SndMessage) combineResults (ct, _) (Right msg') (Right _) = Right (ct, msg') combineResults _ (Left e) _ = Left e @@ -2662,7 +2666,7 @@ processChatCommand' vr = \case ctMsgReq :: ChangedProfileContact -> Either ChatError SndMessage -> Either ChatError ChatMsgReq ctMsgReq ChangedProfileContact {conn} = fmap $ \SndMessage {msgId, msgBody} -> - (conn, MsgFlags {notification = hasNotification XInfo_}, msgBody, [msgId]) + (conn, MsgFlags {notification = hasNotification XInfo_}, (vrValue msgBody, [msgId])) updateContactPrefs :: User -> Contact -> Preferences -> CM ChatResponse updateContactPrefs _ ct@Contact {activeConn = Nothing} _ = throwChatError $ CEContactNotActive ct updateContactPrefs user@User {userId} ct@Contact {activeConn = Just Connection {customUserProfileId}, userPreferences = contactUserPrefs} contactUserPrefs' @@ -2713,7 +2717,7 @@ processChatCommand' vr = \case when (memberStatus membership == GSMemInvited) $ throwChatError (CEGroupNotJoined g) when (memberRemoved membership) $ throwChatError CEGroupMemberUserRemoved unless (memberActive membership) $ throwChatError CEGroupMemberNotActive - delGroupChatItemsForMembers :: User -> GroupInfo -> [GroupMember] -> [CChatItem CTGroup] -> CM ChatResponse + delGroupChatItemsForMembers :: User -> GroupInfo -> [GroupMember] -> [CChatItem 'CTGroup] -> CM ChatResponse delGroupChatItemsForMembers user gInfo ms items = do assertDeletable gInfo items assertUserGroupRole gInfo GRAdmin -- TODO GRModerator when most users migrate @@ -2723,8 +2727,8 @@ processChatCommand' vr = \case delGroupChatItems user gInfo items True where assertDeletable :: GroupInfo -> [CChatItem 'CTGroup] -> CM () - assertDeletable GroupInfo {membership = GroupMember {memberRole = membershipMemRole}} items = - unless (all itemDeletable items) $ throwChatError CEInvalidChatItemDelete + assertDeletable GroupInfo {membership = GroupMember {memberRole = membershipMemRole}} items' = + unless (all itemDeletable items') $ throwChatError CEInvalidChatItemDelete where itemDeletable :: CChatItem 'CTGroup -> Bool itemDeletable (CChatItem _ ChatItem {chatDir, meta = CIMeta {itemSharedMsgId}}) = diff --git a/src/Simplex/Chat/Library/Internal.hs b/src/Simplex/Chat/Library/Internal.hs index ea838206fe..3e96e1e496 100644 --- a/src/Simplex/Chat/Library/Internal.hs +++ b/src/Simplex/Chat/Library/Internal.hs @@ -274,7 +274,7 @@ uniqueMsgMentions maxMentions mentions = go M.empty S.empty 0 go acc seen n (name : rest) | n >= maxMentions = acc | otherwise = case M.lookup name mentions of - Just mm@MsgMention {memberId} | S.notMember memberId seen -> + Just mm@MsgMention {memberId} | S.notMember memberId seen -> go (M.insert name mm acc) (S.insert memberId seen) (n + 1) rest _ -> go acc seen n rest @@ -1312,7 +1312,7 @@ batchSendConnMessagesB _user conn msgFlags msgs_ = do let batched_ = batchSndMessagesJSON msgs_ case L.nonEmpty batched_ of Just batched' -> do - let msgReqs = L.map (fmap (msgBatchReq conn msgFlags)) batched' + let msgReqs = L.map (fmap msgBatchReq_) batched' delivered <- deliverMessagesB msgReqs let msgs' = concat $ L.zipWith flattenMsgs batched' delivered pqEnc = findLastPQEnc delivered @@ -1320,6 +1320,9 @@ batchSendConnMessagesB _user conn msgFlags msgs_ = do pure (msgs', pqEnc) Nothing -> pure ([], Nothing) where + msgBatchReq_ :: MsgBatch -> ChatMsgReq + msgBatchReq_ (MsgBatch batchBody sndMsgs) = + (conn, msgFlags, (vrValue batchBody, map (\SndMessage {msgId} -> msgId) sndMsgs)) flattenMsgs :: Either ChatError MsgBatch -> Either ChatError ([Int64], PQEncryption) -> [Either ChatError SndMessage] flattenMsgs (Right (MsgBatch _ sndMsgs)) (Right _) = map Right sndMsgs flattenMsgs (Right (MsgBatch _ sndMsgs)) (Left ce) = replicate (length sndMsgs) (Left ce) @@ -1330,9 +1333,6 @@ batchSendConnMessagesB _user conn msgFlags msgs_ = do batchSndMessagesJSON :: NonEmpty (Either ChatError SndMessage) -> [Either ChatError MsgBatch] batchSndMessagesJSON = batchMessages maxEncodedMsgLength . L.toList -msgBatchReq :: Connection -> MsgFlags -> MsgBatch -> ChatMsgReq -msgBatchReq conn msgFlags (MsgBatch batchBody sndMsgs) = (conn, msgFlags, batchBody, map (\SndMessage {msgId} -> msgId) sndMsgs) - encodeConnInfo :: MsgEncodingI e => ChatMsgEvent e -> CM ByteString encodeConnInfo chatMsgEvent = do vr <- chatVersionRange @@ -1358,7 +1358,7 @@ deliverMessage conn cmEventTag msgBody msgId = do deliverMessage' :: Connection -> MsgFlags -> MsgBody -> MessageId -> CM (Int64, PQEncryption) deliverMessage' conn msgFlags msgBody msgId = - deliverMessages ((conn, msgFlags, msgBody, [msgId]) :| []) >>= \case + deliverMessages ((conn, msgFlags, (vrValue msgBody, [msgId])) :| []) >>= \case r :| [] -> case r of Right ([deliveryId], pqEnc) -> pure (deliveryId, pqEnc) Right (deliveryIds, _) -> throwChatError $ CEInternalError $ "deliverMessage: expected 1 delivery id, got " <> show (length deliveryIds) @@ -1366,45 +1366,45 @@ deliverMessage' conn msgFlags msgBody msgId = rs -> throwChatError $ CEInternalError $ "deliverMessage: expected 1 result, got " <> show (length rs) -- [MessageId] - SndMessage ids inside MsgBatch, or single message id -type ChatMsgReq = (Connection, MsgFlags, MsgBody, [MessageId]) +type ChatMsgReq = (Connection, MsgFlags, (ValueOrRef MsgBody, [MessageId])) deliverMessages :: NonEmpty ChatMsgReq -> CM (NonEmpty (Either ChatError ([Int64], PQEncryption))) deliverMessages msgs = deliverMessagesB $ L.map Right msgs deliverMessagesB :: NonEmpty (Either ChatError ChatMsgReq) -> CM (NonEmpty (Either ChatError ([Int64], PQEncryption))) deliverMessagesB msgReqs = do - msgReqs' <- liftIO compressBodies + msgReqs' <- if any connSupportsPQ msgReqs then liftIO compressBodies else pure msgReqs sent <- L.zipWith prepareBatch msgReqs' <$> withAgent (`sendMessagesB` snd (mapAccumL toAgent Nothing msgReqs')) lift . void $ withStoreBatch' $ \db -> map (updatePQSndEnabled db) (rights . L.toList $ sent) lift . withStoreBatch $ \db -> L.map (bindRight $ createDelivery db) sent where + connSupportsPQ = \case + Right (Connection {pqSupport = PQSupportOn, connChatVersion = v}, _, _) -> v >= pqEncryptionCompressionVersion + _ -> False compressBodies = - forME msgReqs $ \mr@(conn@Connection {pqSupport, connChatVersion = v}, msgFlags, msgBody, msgIds) -> - runExceptT $ case pqSupport of - -- we only compress messages when: - -- 1) PQ support is enabled - -- 2) version is compatible with compression - -- 3) message is longer than max compressed size (as this function is not used for batched messages anyway) - PQSupportOn | v >= pqEncryptionCompressionVersion && B.length msgBody > maxCompressedMsgLength -> do + forME msgReqs $ \(conn, msgFlags, (mbr, msgIds)) -> runExceptT $ do + mbr' <- case mbr of + VRValue i msgBody | B.length msgBody > maxCompressedMsgLength -> do let msgBody' = compressedBatchMsgBody_ msgBody when (B.length msgBody' > maxCompressedMsgLength) $ throwError $ ChatError $ CEException "large compressed message" - pure (conn, msgFlags, msgBody', msgIds) - _ -> pure mr + pure $ VRValue i msgBody' + v -> pure v + pure (conn, msgFlags, (mbr', msgIds)) toAgent prev = \case - Right (conn@Connection {connId, pqEncryption}, msgFlags, msgBody, _msgIds) -> + Right (conn@Connection {connId, pqEncryption}, msgFlags, (mbr, _msgIds)) -> let cId = case prev of Just prevId | prevId == connId -> "" _ -> aConnId conn - in (Just connId, Right (cId, pqEncryption, msgFlags, msgBody)) + in (Just connId, Right (cId, pqEncryption, msgFlags, mbr)) Left _ce -> (prev, Left (AP.INTERNAL "ChatError, skip")) -- as long as it is Left, the agent batchers should just step over it prepareBatch (Right req) (Right ar) = Right (req, ar) prepareBatch (Left ce) _ = Left ce -- restore original ChatError prepareBatch _ (Left ae) = Left $ ChatErrorAgent ae Nothing createDelivery :: DB.Connection -> (ChatMsgReq, (AgentMsgId, PQEncryption)) -> IO (Either ChatError ([Int64], PQEncryption)) - createDelivery db ((Connection {connId}, _, _, msgIds), (agentMsgId, pqEnc')) = do + createDelivery db ((Connection {connId}, _, (_, msgIds)), (agentMsgId, pqEnc')) = do Right . (,pqEnc') <$> mapM (createSndMsgDelivery db (SndMsgDelivery {connId, agentMsgId})) msgIds updatePQSndEnabled :: DB.Connection -> (ChatMsgReq, (AgentMsgId, PQEncryption)) -> IO () - updatePQSndEnabled db ((Connection {connId, pqSndEnabled}, _, _, _), (_, pqSndEnabled')) = + updatePQSndEnabled db ((Connection {connId, pqSndEnabled}, _, _), (_, pqSndEnabled')) = case (pqSndEnabled, pqSndEnabled') of (Just b, b') | b' /= b -> updatePQ (Nothing, PQEncOn) -> updatePQ @@ -1471,7 +1471,7 @@ sendGroupMessages_ _user gInfo@GroupInfo {groupId} members events = do stored <- lift $ withStoreBatch (\db -> map (bindRight $ createPendingMsg db) pendingReqs) when (length stored /= length pendingMemIds) $ logError "sendGroupMessages_: pendingMemIds and stored length mismatch" -- Zip for easier access to results - let sentTo = zipWith3 (\mId mReq r -> (mId, fmap (\(_, _, _, msgIds) -> msgIds) mReq, r)) sendToMemIds msgReqs delivered + let sentTo = zipWith3 (\mId mReq r -> (mId, fmap (\(_, _, (_, msgIds)) -> msgIds) mReq, r)) sendToMemIds msgReqs delivered pending = zipWith3 (\mId pReq r -> (mId, fmap snd pReq, r)) pendingMemIds pendingReqs stored pure (sndMsgs_, GroupSndResult {sentTo, pending, forwarded}) where @@ -1495,24 +1495,36 @@ sendGroupMessages_ _user gInfo@GroupInfo {groupId} members events = do mId = groupMemberId' m mIds' = S.insert mId mIds prepareMsgReqs :: MsgFlags -> NonEmpty (Either ChatError SndMessage) -> [(GroupMember, Connection)] -> [(GroupMember, Connection)] -> ([GroupMemberId], [Either ChatError ChatMsgReq]) - prepareMsgReqs msgFlags msgs_ toSendSeparate toSendBatched = do - let batched_ = batchSndMessagesJSON msgs_ + prepareMsgReqs msgFlags msgs toSendSeparate toSendBatched = do + let batched_ = batchSndMessagesJSON msgs case L.nonEmpty batched_ of Just batched' -> do - let (memsSep, mreqsSep) = foldr' foldMsgs ([], []) toSendSeparate - (memsBtch, mreqsBtch) = foldr' (foldBatches batched') ([], []) toSendBatched + let lenMsgs = length msgs + (memsSep, mreqsSep) = foldMembers lenMsgs sndMessageMBR msgs toSendSeparate + (memsBtch, mreqsBtch) = foldMembers (length batched' + lenMsgs) msgBatchMBR batched' toSendBatched (memsSep <> memsBtch, mreqsSep <> mreqsBtch) Nothing -> ([], []) where - foldMsgs :: (GroupMember, Connection) -> ([GroupMemberId], [Either ChatError ChatMsgReq]) -> ([GroupMemberId], [Either ChatError ChatMsgReq]) - foldMsgs (GroupMember {groupMemberId}, conn) memIdsReqs = - foldr' (\msg_ (memIds, reqs) -> (groupMemberId : memIds, fmap sndMessageReq msg_ : reqs)) memIdsReqs msgs_ + foldMembers :: forall a. Int -> (Maybe Int -> Int -> a -> (ValueOrRef MsgBody, [MessageId])) -> NonEmpty (Either ChatError a) -> [(GroupMember, Connection)] -> ([GroupMemberId], [Either ChatError ChatMsgReq]) + foldMembers lastRef mkMb mbs mems = snd $ foldr' foldMsgBodies (lastMemIdx_, ([], [])) mems where - sndMessageReq :: SndMessage -> ChatMsgReq - sndMessageReq SndMessage {msgId, msgBody} = (conn, msgFlags, msgBody, [msgId]) - foldBatches :: NonEmpty (Either ChatError MsgBatch) -> (GroupMember, Connection) -> ([GroupMemberId], [Either ChatError ChatMsgReq]) -> ([GroupMemberId], [Either ChatError ChatMsgReq]) - foldBatches batched' (GroupMember {groupMemberId}, conn) memIdsReqs = - foldr' (\batch_ (memIds, reqs) -> (groupMemberId : memIds, fmap (msgBatchReq conn msgFlags) batch_ : reqs)) memIdsReqs batched' + lastMemIdx_ = let len = length mems in if len > 1 then Just len else Nothing + foldMsgBodies :: (GroupMember, Connection) -> (Maybe Int, ([GroupMemberId], [Either ChatError ChatMsgReq])) -> (Maybe Int, ([GroupMemberId], [Either ChatError ChatMsgReq])) + foldMsgBodies (GroupMember {groupMemberId}, conn) (memIdx_, memIdsReqs) = + (subtract 1 <$> memIdx_,) $ snd $ foldr' addBody (lastRef, memIdsReqs) mbs + where + addBody :: Either ChatError a -> (Int, ([GroupMemberId], [Either ChatError ChatMsgReq])) -> (Int, ([GroupMemberId], [Either ChatError ChatMsgReq])) + addBody mb (i, (memIds, reqs)) = + let req = (conn,msgFlags,) . mkMb memIdx_ i <$> mb + in (i - 1, (groupMemberId : memIds, req : reqs)) + sndMessageMBR :: Maybe Int -> Int -> SndMessage -> (ValueOrRef MsgBody, [MessageId]) + sndMessageMBR memIdx_ i SndMessage {msgId, msgBody} = (vrValue_ memIdx_ i msgBody, [msgId]) + msgBatchMBR :: Maybe Int -> Int -> MsgBatch -> (ValueOrRef MsgBody, [MessageId]) + msgBatchMBR memIdx_ i (MsgBatch batchBody sndMsgs) = (vrValue_ memIdx_ i batchBody, map (\SndMessage {msgId} -> msgId) sndMsgs) + vrValue_ memIdx_ i v = case memIdx_ of + Nothing -> VRValue Nothing v -- sending to one member, do not reference bodies + Just 1 -> VRValue (Just i) v + Just _ -> VRRef i preparePending :: NonEmpty (Either ChatError SndMessage) -> [GroupMember] -> ([GroupMemberId], [Either ChatError (GroupMemberId, MessageId)]) preparePending msgs_ = foldr' foldMsgs ([], []) diff --git a/src/Simplex/Chat/Library/Subscriber.hs b/src/Simplex/Chat/Library/Subscriber.hs index 902c19ed63..1530f8e5c4 100644 --- a/src/Simplex/Chat/Library/Subscriber.hs +++ b/src/Simplex/Chat/Library/Subscriber.hs @@ -274,7 +274,7 @@ processAgentMsgSndFile _corrId aFileId msg = do map (\fileDescr -> (conn, (connOrGroupId, XMsgFileDescr {msgId = sharedMsgId, fileDescr}))) (L.toList $ splitFileDescr partSize rfdText) toMsgReq :: (Connection, (ConnOrGroupId, ChatMsgEvent 'Json)) -> SndMessage -> ChatMsgReq toMsgReq (conn, _) SndMessage {msgId, msgBody} = - (conn, MsgFlags {notification = hasNotification XMsgFileDescr_}, msgBody, [msgId]) + (conn, MsgFlags {notification = hasNotification XMsgFileDescr_}, (vrValue msgBody, [msgId])) sendFileError :: FileError -> Text -> VersionRangeChat -> FileTransferMeta -> CM () sendFileError ferr err vr ft = do logError $ "Sent file error: " <> err diff --git a/src/Simplex/Chat/Store/SQLite/Migrations/agent_query_plans.txt b/src/Simplex/Chat/Store/SQLite/Migrations/agent_query_plans.txt index c4f5007040..f470121f87 100644 --- a/src/Simplex/Chat/Store/SQLite/Migrations/agent_query_plans.txt +++ b/src/Simplex/Chat/Store/SQLite/Migrations/agent_query_plans.txt @@ -15,6 +15,20 @@ SEARCH s USING INTEGER PRIMARY KEY (rowid=?) SEARCH c USING INTEGER PRIMARY KEY (rowid=?) SEARCH f USING INTEGER PRIMARY KEY (rowid=?) +Query: + SELECT + m.msg_type, m.msg_flags, m.msg_body, m.pq_encryption, m.internal_ts, m.internal_snd_id, s.previous_msg_hash, + s.retry_int_slow, s.retry_int_fast, s.msg_encrypt_key, s.padded_msg_len, sb.agent_msg + FROM messages m + JOIN snd_messages s ON s.conn_id = m.conn_id AND s.internal_id = m.internal_id + LEFT JOIN snd_message_bodies sb ON sb.snd_message_body_id = s.snd_message_body_id + WHERE m.conn_id = ? AND m.internal_id = ? + +Plan: +SEARCH m USING PRIMARY KEY (conn_id=? AND internal_id=?) +SEARCH s USING PRIMARY KEY (conn_id=?) +SEARCH sb USING INTEGER PRIMARY KEY (rowid=?) LEFT-JOIN + Query: SELECT r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries, @@ -45,16 +59,6 @@ Query: Plan: SEARCH commands USING INDEX idx_commands_server_commands (host=? AND port=?) -Query: - SELECT m.msg_type, m.msg_flags, m.msg_body, m.pq_encryption, m.internal_ts, s.retry_int_slow, s.retry_int_fast - FROM messages m - JOIN snd_messages s ON s.conn_id = m.conn_id AND s.internal_id = m.internal_id - WHERE m.conn_id = ? AND m.internal_id = ? - -Plan: -SEARCH m USING PRIMARY KEY (conn_id=? AND internal_id=?) -SEARCH s USING PRIMARY KEY (conn_id=?) - Query: SELECT rcv_file_chunk_id, chunk_no, chunk_size, digest, tmp_path FROM rcv_file_chunks @@ -512,9 +516,9 @@ Plan: Query: INSERT INTO snd_messages - ( conn_id, internal_snd_id, internal_id, internal_hash, previous_msg_hash) + ( conn_id, internal_snd_id, internal_id, internal_hash, previous_msg_hash, msg_encrypt_key, padded_msg_len, snd_message_body_id) VALUES - (?,?,?,?,?) + (?,?,?,?,?,?,?,?) Plan: SEARCH messages USING COVERING INDEX idx_messages_conn_id_internal_snd_id (conn_id=? AND internal_snd_id=?) @@ -805,6 +809,11 @@ Plan: SEARCH snd_files USING INTEGER PRIMARY KEY (rowid=?) SEARCH snd_file_chunks USING COVERING INDEX idx_snd_file_chunks_snd_file_id (snd_file_id=?) +Query: DELETE FROM snd_message_bodies WHERE snd_message_body_id = ? +Plan: +SEARCH snd_message_bodies USING INTEGER PRIMARY KEY (rowid=?) +SEARCH snd_messages USING COVERING INDEX idx_snd_messages_snd_message_body_id (snd_message_body_id=?) + Query: DELETE FROM snd_message_deliveries WHERE conn_id = ? AND snd_queue_id = ? Plan: SEARCH snd_message_deliveries USING COVERING INDEX idx_snd_message_deliveries (conn_id=? AND snd_queue_id=?) @@ -861,6 +870,10 @@ Plan: Query: INSERT INTO snd_files (snd_file_entity_id, user_id, path, src_file_key, src_file_nonce, num_recipients, prefix_path, key, nonce, status, redirect_size, redirect_digest) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) Plan: +Query: INSERT INTO snd_message_bodies (agent_msg) VALUES (?) RETURNING snd_message_body_id +Plan: +SEARCH snd_messages USING COVERING INDEX idx_snd_messages_snd_message_body_id (snd_message_body_id=?) + Query: INSERT INTO snd_message_deliveries (conn_id, snd_queue_id, internal_id) VALUES (?, ?, ?) Plan: @@ -897,6 +910,10 @@ Query: SELECT count(*) FROM snd_message_deliveries WHERE conn_id = ? AND interna Plan: SEARCH snd_message_deliveries USING COVERING INDEX idx_snd_message_deliveries_expired (conn_id=?) +Query: SELECT count(1) FROM snd_message_bodies +Plan: +SCAN snd_message_bodies + Query: SELECT deleted FROM snd_files WHERE snd_file_id = ? Plan: SEARCH snd_files USING INTEGER PRIMARY KEY (rowid=?) @@ -921,7 +938,7 @@ Query: SELECT ratchet_state, x3dh_pub_key_1, x3dh_pub_key_2, pq_pub_kem FROM rat Plan: SEARCH ratchets USING PRIMARY KEY (conn_id=?) -Query: SELECT rcpt_internal_id, rcpt_status FROM snd_messages WHERE conn_id = ? AND internal_id = ? +Query: SELECT rcpt_internal_id, rcpt_status, snd_message_body_id FROM snd_messages WHERE conn_id = ? AND internal_id = ? Plan: SEARCH snd_messages USING PRIMARY KEY (conn_id=?) diff --git a/src/Simplex/Chat/Util.hs b/src/Simplex/Chat/Util.hs index 3f7d19fd6d..796b32778b 100644 --- a/src/Simplex/Chat/Util.hs +++ b/src/Simplex/Chat/Util.hs @@ -4,7 +4,7 @@ {-# LANGUAGE TupleSections #-} {-# OPTIONS_GHC -Wno-orphans #-} -module Simplex.Chat.Util (week, encryptFile, chunkSize, liftIOEither, shuffle) where +module Simplex.Chat.Util (week, encryptFile, chunkSize, liftIOEither, shuffle, zipWith3') where import Control.Exception (Exception) import Control.Monad @@ -15,6 +15,7 @@ import Control.Monad.Reader import Data.Bifunctor (first) import qualified Data.ByteString.Lazy as LB import Data.List (sortBy) +import Data.List.NonEmpty (NonEmpty (..)) import Data.Ord (comparing) import Data.Time (NominalDiffTime) import Data.Word (Word16) @@ -52,6 +53,9 @@ shuffle xs = map snd . sortBy (comparing fst) <$> mapM (\x -> (,x) <$> random) x random :: IO Word16 random = randomRIO (0, 65535) +zipWith3' :: (a -> b -> c -> d) -> NonEmpty a -> NonEmpty b -> NonEmpty c -> NonEmpty d +zipWith3' f ~(x :| xs) ~(y :| ys) ~(z :| zs) = f x y z :| zipWith3 f xs ys zs + liftIOEither :: (MonadIO m, MonadError e m) => IO (Either e a) -> m a liftIOEither a = liftIO a >>= liftEither {-# INLINE liftIOEither #-} diff --git a/tests/ChatTests/Groups.hs b/tests/ChatTests/Groups.hs index ca74e5e1bc..d039fb05ff 100644 --- a/tests/ChatTests/Groups.hs +++ b/tests/ChatTests/Groups.hs @@ -83,6 +83,8 @@ chatGroupTests = do it "send multiple messages api" testSendMulti it "send multiple timed messages" testSendMultiTimed it "send multiple messages (many chat batches)" testSendMultiManyBatches + xit'' "shared message body is reused" testSharedMessageBody + xit'' "shared batch body is reused" testSharedBatchBody describe "async group connections" $ do xit "create and join group when clients go offline" testGroupAsync describe "group links" $ do @@ -1883,6 +1885,112 @@ testSendMultiManyBatches = DB.query db "SELECT count(1) FROM chat_items WHERE chat_item_id > ?" (Only msgIdCath) :: IO [[Int]] cathItemsCount `shouldBe` [[300]] +testSharedMessageBody :: HasCallStack => TestParams -> IO () +testSharedMessageBody ps = + withNewTestChatOpts ps opts' "alice" aliceProfile $ \alice -> do + withSmpServer' serverCfg' $ + withNewTestChatOpts ps opts' "bob" bobProfile $ \bob -> + withNewTestChatOpts ps opts' "cath" cathProfile $ \cath -> do + createGroup3 "team" alice bob cath + + alice <##. "server disconnected localhost" + alice #> "#team hello" + bodiesCount1 <- withCCAgentTransaction alice $ \db -> + DB.query_ db "SELECT count(1) FROM snd_message_bodies" :: IO [[Int]] + bodiesCount1 `shouldBe` [[1]] + + withSmpServer' serverCfg' $ + withTestChatOpts ps opts' "bob" $ \bob -> + withTestChatOpts ps opts' "cath" $ \cath -> do + concurrentlyN_ + [ alice <##. "server connected localhost", + do + bob <## "1 contacts connected (use /cs for the list)" + bob <## "#team: connected to server(s)", + do + cath <## "1 contacts connected (use /cs for the list)" + cath <## "#team: connected to server(s)" + ] + bob <# "#team alice> hello" + cath <# "#team alice> hello" + bodiesCount2 <- withCCAgentTransaction alice $ \db -> + DB.query_ db "SELECT count(1) FROM snd_message_bodies" :: IO [[Int]] + bodiesCount2 `shouldBe` [[0]] + + alice <##. "server disconnected localhost" + where + tmp = tmpPath ps + serverCfg' = + smpServerCfg + { transports = [("7003", transport @TLS, False)], + storeLogFile = Just $ tmp <> "/smp-server-store.log", + storeMsgsFile = Just $ tmp <> "/smp-server-messages.log" + } + opts' = + testOpts + { coreOptions = + testCoreOpts + { smpServers = ["smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=:server_password@localhost:7003"] + } + } + +testSharedBatchBody :: HasCallStack => TestParams -> IO () +testSharedBatchBody ps = + withNewTestChatOpts ps opts' "alice" aliceProfile $ \alice -> do + withSmpServer' serverCfg' $ + withNewTestChatOpts ps opts' "bob" bobProfile $ \bob -> + withNewTestChatOpts ps opts' "cath" cathProfile $ \cath -> do + createGroup3 "team" alice bob cath + + alice <##. "server disconnected localhost" + + let cm i = "{\"msgContent\": {\"type\": \"text\", \"text\": \"message " <> show i <> "\"}}" + cms = intercalate ", " (map cm [1 .. 300 :: Int]) + alice `send` ("/_send #1 json [" <> cms <> "]") + _ <- getTermLine alice + alice <## "300 messages sent" + + bodiesCount1 <- withCCAgentTransaction alice $ \db -> + DB.query_ db "SELECT count(1) FROM snd_message_bodies" :: IO [[Int]] + bodiesCount1 `shouldBe` [[3]] + + withSmpServer' serverCfg' $ + withTestChatOpts ps opts' "bob" $ \bob -> + withTestChatOpts ps opts' "cath" $ \cath -> do + concurrentlyN_ + [ alice <##. "server connected localhost", + do + bob <## "1 contacts connected (use /cs for the list)" + bob <## "#team: connected to server(s)", + do + cath <## "1 contacts connected (use /cs for the list)" + cath <## "#team: connected to server(s)" + ] + forM_ [(1 :: Int) .. 300] $ \i -> do + concurrently_ + (bob <# ("#team alice> message " <> show i)) + (cath <# ("#team alice> message " <> show i)) + bodiesCount2 <- withCCAgentTransaction alice $ \db -> + DB.query_ db "SELECT count(1) FROM snd_message_bodies" :: IO [[Int]] + bodiesCount2 `shouldBe` [[0]] + + alice <##. "server disconnected localhost" + where + tmp = tmpPath ps + serverCfg' = + smpServerCfg + { transports = [("7003", transport @TLS, False)], + storeLogFile = Just $ tmp <> "/smp-server-store.log", + storeMsgsFile = Just $ tmp <> "/smp-server-messages.log" + } + opts' = + testOpts + { coreOptions = + testCoreOpts + { smpServers = ["smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=:server_password@localhost:7003"] + } + } + testGroupAsync :: HasCallStack => TestParams -> IO () testGroupAsync ps = do withNewTestChat ps "alice" aliceProfile $ \alice -> do diff --git a/tests/ChatTests/Utils.hs b/tests/ChatTests/Utils.hs index 7bb25dbe9b..9a3b560b29 100644 --- a/tests/ChatTests/Utils.hs +++ b/tests/ChatTests/Utils.hs @@ -33,6 +33,7 @@ import Simplex.Chat.Types import Simplex.Chat.Types.Preferences import Simplex.Chat.Types.Shared import Simplex.FileTransfer.Client.Main (xftpClientCLI) +import Simplex.Messaging.Agent.Client (agentClientStore) import Simplex.Messaging.Agent.Store.AgentStore (maybeFirstRow, withTransaction) import qualified Simplex.Messaging.Agent.Store.DB as DB import qualified Simplex.Messaging.Crypto as C @@ -556,6 +557,10 @@ withCCTransaction :: TestCC -> (DB.Connection -> IO a) -> IO a withCCTransaction cc action = withTransaction (chatStore $ chatController cc) $ \db -> action db +withCCAgentTransaction :: TestCC -> (DB.Connection -> IO a) -> IO a +withCCAgentTransaction TestCC {chatController = ChatController {smpAgent}} action = + withTransaction (agentClientStore smpAgent) $ \db -> action db + createCCNoteFolder :: TestCC -> IO () createCCNoteFolder cc = withCCTransaction cc $ \db ->