core: batch send file descriptions (#4684)

* core: batch send file descriptions

* fix useMember

* fix result interpretation

* remove comment

* refactor

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
spaced4ndy 2024-08-15 13:43:57 +04:00 committed by GitHub
parent 82c4d77c73
commit 1d0d7bbd01
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -3902,15 +3902,22 @@ processAgentMsgSndFile _corrId aFileId msg = do
case (rfds, sfts, d, cInfo) of case (rfds, sfts, d, cInfo) of
(rfd : extraRFDs, sft : _, SMDSnd, DirectChat ct) -> do (rfd : extraRFDs, sft : _, SMDSnd, DirectChat ct) -> do
withStore' $ \db -> createExtraSndFTDescrs db user fileId (map fileDescrText extraRFDs) withStore' $ \db -> createExtraSndFTDescrs db user fileId (map fileDescrText extraRFDs)
msgDeliveryId <- sendFileDescription sft rfd sharedMsgId $ sendDirectContactMessage user ct conn@Connection {connId} <- liftEither $ contactSendConn_ ct
withStore' $ \db -> updateSndFTDeliveryXFTP db sft msgDeliveryId sendFileDescriptions (ConnectionId connId) ((conn, sft, fileDescrText rfd) :| []) sharedMsgId >>= \case
Just rs -> case L.last rs of
Right ([msgDeliveryId], _) ->
withStore' $ \db -> updateSndFTDeliveryXFTP db sft msgDeliveryId
Right (deliveryIds, _) -> toView $ CRChatError (Just user) $ ChatError $ CEInternalError $ "SFDONE, sendFileDescriptions: expected 1 delivery id, got " <> show (length deliveryIds)
Left e -> toView $ CRChatError (Just user) e
Nothing -> toView $ CRChatError (Just user) $ ChatError $ CEInternalError "SFDONE, sendFileDescriptions: expected at least 1 result"
lift $ withAgent' (`xftpDeleteSndFileInternal` aFileId) lift $ withAgent' (`xftpDeleteSndFileInternal` aFileId)
(_, _, SMDSnd, GroupChat g@GroupInfo {groupId}) -> do (_, _, SMDSnd, GroupChat g@GroupInfo {groupId}) -> do
ms <- withStore' $ \db -> getGroupMembers db vr user g ms <- withStore' $ \db -> getGroupMembers db vr user g
let rfdsMemberFTs = zip rfds $ memberFTs ms let rfdsMemberFTs = zipWith (\rfd (conn, sft) -> (conn, sft, fileDescrText rfd)) rfds (memberFTs ms)
extraRFDs = drop (length rfdsMemberFTs) rfds extraRFDs = drop (length rfdsMemberFTs) rfds
withStore' $ \db -> createExtraSndFTDescrs db user fileId (map fileDescrText extraRFDs) withStore' $ \db -> createExtraSndFTDescrs db user fileId (map fileDescrText extraRFDs)
forM_ rfdsMemberFTs $ \mt -> sendToMember mt `catchChatError` (toView . CRChatError (Just user)) forM_ (L.nonEmpty rfdsMemberFTs) $ \rfdsMemberFTs' ->
sendFileDescriptions (GroupId groupId) rfdsMemberFTs' sharedMsgId
ci' <- withStore $ \db -> do ci' <- withStore $ \db -> do
liftIO $ updateCIFileStatus db user fileId CIFSSndComplete liftIO $ updateCIFileStatus db user fileId CIFSSndComplete
getChatItemByFileId db vr user fileId getChatItemByFileId db vr user fileId
@ -3922,15 +3929,12 @@ processAgentMsgSndFile _corrId aFileId msg = do
where where
mConns' = mapMaybe useMember ms mConns' = mapMaybe useMember ms
sfts' = mapMaybe (\sft@SndFileTransfer {groupMemberId} -> (,sft) <$> groupMemberId) sfts sfts' = mapMaybe (\sft@SndFileTransfer {groupMemberId} -> (,sft) <$> groupMemberId) sfts
-- Should match memberSendAction logic
useMember GroupMember {groupMemberId, activeConn = Just conn@Connection {connStatus}} useMember GroupMember {groupMemberId, activeConn = Just conn@Connection {connStatus}}
| (connStatus == ConnReady || connStatus == ConnSndReady) && not (connDisabled conn) = Just (groupMemberId, conn) | (connStatus == ConnReady || connStatus == ConnSndReady) && not (connDisabled conn) && not (connInactive conn) =
Just (groupMemberId, conn)
| otherwise = Nothing | otherwise = Nothing
useMember _ = Nothing useMember _ = Nothing
sendToMember :: (ValidFileDescription 'FRecipient, (Connection, SndFileTransfer)) -> CM ()
sendToMember (rfd, (conn, sft)) =
void $ sendFileDescription sft rfd sharedMsgId $ \msg' -> do
(sndMsg, msgDeliveryId, _) <- sendDirectMemberMessage conn msg' groupId
pure (sndMsg, msgDeliveryId)
_ -> pure () _ -> pure ()
_ -> pure () -- TODO error? _ -> pure () -- TODO error?
SFWARN e -> do SFWARN e -> do
@ -3945,20 +3949,27 @@ processAgentMsgSndFile _corrId aFileId msg = do
where where
fileDescrText :: FilePartyI p => ValidFileDescription p -> T.Text fileDescrText :: FilePartyI p => ValidFileDescription p -> T.Text
fileDescrText = safeDecodeUtf8 . strEncode fileDescrText = safeDecodeUtf8 . strEncode
sendFileDescription :: SndFileTransfer -> ValidFileDescription 'FRecipient -> SharedMsgId -> (ChatMsgEvent 'Json -> CM (SndMessage, Int64)) -> CM Int64 sendFileDescriptions :: ConnOrGroupId -> NonEmpty (Connection, SndFileTransfer, RcvFileDescrText) -> SharedMsgId -> CM (Maybe (NonEmpty (Either ChatError ([Int64], PQEncryption))))
sendFileDescription sft rfd msgId sendMsg = do sendFileDescriptions connOrGroupId connsTransfersDescrs sharedMsgId = do
let rfdText = fileDescrText rfd lift . void . withStoreBatch' $ \db -> L.map (\(_, sft, rfdText) -> updateSndFTDescrXFTP db user sft rfdText) connsTransfersDescrs
withStore' $ \db -> updateSndFTDescrXFTP db user sft rfdText partSize <- asks $ xftpDescrPartSize . config
parts <- splitFileDescr rfdText let connsIdsEvts = connDescrEvents partSize
loopSend parts sndMsgs_ <- lift $ createSndMessages $ L.map snd connsIdsEvts
let (errs, msgReqs) = partitionEithers . L.toList $ L.zipWith (fmap . toMsgReq) connsIdsEvts sndMsgs_
delivered <- mapM deliverMessages (L.nonEmpty msgReqs)
let errs' = errs <> maybe [] (lefts . L.toList) delivered
unless (null errs') $ toView $ CRChatErrors (Just user) errs'
pure delivered
where where
-- returns msgDeliveryId of the last file description message connDescrEvents :: Int -> NonEmpty (Connection, (ConnOrGroupId, ChatMsgEvent 'Json))
loopSend :: NonEmpty FileDescr -> CM Int64 connDescrEvents partSize = L.fromList $ concatMap splitText (L.toList connsTransfersDescrs)
loopSend (fileDescr :| fds) = do where
(_, msgDeliveryId) <- sendMsg $ XMsgFileDescr {msgId, fileDescr} splitText :: (Connection, SndFileTransfer, RcvFileDescrText) -> [(Connection, (ConnOrGroupId, ChatMsgEvent 'Json))]
case L.nonEmpty fds of splitText (conn, _, rfdText) =
Just fds' -> loopSend fds' map (\fileDescr -> (conn, (connOrGroupId, XMsgFileDescr {msgId = sharedMsgId, fileDescr}))) (L.toList $ splitFileDescr partSize rfdText)
Nothing -> pure msgDeliveryId toMsgReq :: (Connection, (ConnOrGroupId, ChatMsgEvent 'Json)) -> SndMessage -> ChatMsgReq
toMsgReq (conn, _) SndMessage {msgId, msgBody} =
(conn, MsgFlags {notification = hasNotification XMsgFileDescr_}, msgBody, [msgId])
sendFileError :: FileError -> Text -> VersionRangeChat -> FileTransferMeta -> CM () sendFileError :: FileError -> Text -> VersionRangeChat -> FileTransferMeta -> CM ()
sendFileError ferr err vr ft = do sendFileError ferr err vr ft = do
logError $ "Sent file error: " <> err logError $ "Sent file error: " <> err
@ -3980,18 +3991,16 @@ agentFileError = \case
SMP.TRANSPORT TEVersion -> srvErr SrvErrVersion SMP.TRANSPORT TEVersion -> srvErr SrvErrVersion
e -> srvErr . SrvErrOther $ tshow e e -> srvErr . SrvErrOther $ tshow e
splitFileDescr :: RcvFileDescrText -> CM (NonEmpty FileDescr) splitFileDescr :: Int -> RcvFileDescrText -> NonEmpty FileDescr
splitFileDescr rfdText = do splitFileDescr partSize rfdText = splitParts 1 rfdText
partSize <- asks $ xftpDescrPartSize . config
pure $ splitParts 1 partSize rfdText
where where
splitParts partNo partSize remText = splitParts partNo remText =
let (part, rest) = T.splitAt partSize remText let (part, rest) = T.splitAt partSize remText
complete = T.null rest complete = T.null rest
fileDescr = FileDescr {fileDescrText = part, fileDescrPartNo = partNo, fileDescrComplete = complete} fileDescr = FileDescr {fileDescrText = part, fileDescrPartNo = partNo, fileDescrComplete = complete}
in if complete in if complete
then fileDescr :| [] then fileDescr :| []
else fileDescr <| splitParts (partNo + 1) partSize rest else fileDescr <| splitParts (partNo + 1) rest
processAgentMsgRcvFile :: ACorrId -> RcvFileId -> AEvent 'AERcvFile -> CM () processAgentMsgRcvFile :: ACorrId -> RcvFileId -> AEvent 'AERcvFile -> CM ()
processAgentMsgRcvFile _corrId aFileId msg = do processAgentMsgRcvFile _corrId aFileId msg = do
@ -4573,7 +4582,8 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
xMsgNewChatMsg = ChatMessage {chatVRange = senderVRange, msgId = itemSharedMsgId, chatMsgEvent = XMsgNew msgContainer} xMsgNewChatMsg = ChatMessage {chatVRange = senderVRange, msgId = itemSharedMsgId, chatMsgEvent = XMsgNew msgContainer}
fileDescrEvents <- case (snd <$> fInvDescr_, itemSharedMsgId) of fileDescrEvents <- case (snd <$> fInvDescr_, itemSharedMsgId) of
(Just fileDescrText, Just msgId) -> do (Just fileDescrText, Just msgId) -> do
parts <- splitFileDescr fileDescrText partSize <- asks $ xftpDescrPartSize . config
let parts = splitFileDescr partSize fileDescrText
pure . toList $ L.map (XMsgFileDescr msgId) parts pure . toList $ L.map (XMsgFileDescr msgId) parts
_ -> pure [] _ -> pure []
let fileDescrChatMsgs = map (ChatMessage senderVRange Nothing) fileDescrEvents let fileDescrChatMsgs = map (ChatMessage senderVRange Nothing) fileDescrEvents