fix: batch limit exceeded errors [skip ci]

This commit is contained in:
Rafael Saes 2025-04-15 17:59:11 -03:00
parent 2afad62c0c
commit c74227ee36
3 changed files with 137 additions and 74 deletions

View file

@ -1225,20 +1225,12 @@ abstract class ElectrumWalletBase<T extends ElectrumWalletAddresses>
} }
@action @action
Future<void> onScripthashesStatusResponse(Map<String, dynamic>? result) async { Future<void> onScripthashesStatusResponse(ElectrumWorkerScripthashesResponse result) async {
if (result != null) { if (result.status == null) {
for (final entry in result.entries) { return;
final address = entry.key;
final scripthash = walletAddresses.allAddresses
.firstWhereOrNull((element) => element.address == address)
?.scriptHash;
if (scripthash != null) {
scripthashesWithStatus.add(scripthash);
}
}
} }
scripthashesWithStatus.add(result.scripthash);
} }
@action @action
@ -1653,20 +1645,24 @@ abstract class ElectrumWalletBase<T extends ElectrumWalletAddresses>
@action @action
Future<void> subscribeForStatuses([bool? wait]) async { Future<void> subscribeForStatuses([bool? wait]) async {
Map<String, String> scripthashByAddress = {}; Map<String, String> scripthashByAddress = {};
Map<String, String> addressByScripthashes = {};
walletAddresses.allAddresses.forEach((addressRecord) { walletAddresses.allAddresses.forEach((addressRecord) {
scripthashByAddress[addressRecord.address] = addressRecord.scriptHash; scripthashByAddress[addressRecord.address] = addressRecord.scriptHash;
addressByScripthashes[addressRecord.scriptHash] = addressRecord.address;
}); });
if (wait == true) { if (wait == true) {
await waitSendWorker( await waitSendWorker(
ElectrumWorkerScripthashesSubscribeRequest( ElectrumWorkerScripthashesSubscribeRequest(
scripthashByAddress: scripthashByAddress, scripthashByAddress: scripthashByAddress,
addressByScripthashes: addressByScripthashes,
), ),
); );
} else { } else {
workerSendPort!.send( workerSendPort!.send(
ElectrumWorkerScripthashesSubscribeRequest( ElectrumWorkerScripthashesSubscribeRequest(
scripthashByAddress: scripthashByAddress, scripthashByAddress: scripthashByAddress,
addressByScripthashes: addressByScripthashes,
).toJson(), ).toJson(),
); );
} }

View file

@ -212,8 +212,6 @@ class ElectrumWorker {
if (tx.confirmations != newConfirmationsValue) { if (tx.confirmations != newConfirmationsValue) {
tx.confirmations = newConfirmationsValue; tx.confirmations = newConfirmationsValue;
tx.isPending = tx.confirmations == 0; tx.isPending = tx.confirmations == 0;
if (!anyTxWasUpdated) {
}
anyTxWasUpdated = true; anyTxWasUpdated = true;
} }
} }
@ -233,61 +231,93 @@ class ElectrumWorker {
}); });
} }
Future<void> _handleBatchScriphashesSubscribe(
ElectrumWorkerScripthashesSubscribeRequest request, [
int chunkSize = 100,
List<String>? allScriptHashes,
]) async {
final scripthashByAddress = request.scripthashByAddress;
allScriptHashes ??= scripthashByAddress.values.toList();
final chunks = allScriptHashes.sublist(
0,
chunkSize > allScriptHashes.length ? allScriptHashes.length : chunkSize,
);
final req = ElectrumBatchRequestScriptHashSubscribe(scriptHashes: chunks);
final batchStreams = await _electrumClient!.batchSubscribe(req);
if (batchStreams != null) {
int i = 0;
for (final stream in batchStreams) {
stream.subscription.listen((status) async {
final batch = req.onResponse(status, stream.params);
// https://electrumx.readthedocs.io/en/latest/protocol-basics.html#status
// The status of the script hash is the hash of the tx history, or null if the string is empty because there are no transactions
final result = batch.result;
final scriptHash = batch.paramForRequest!.first as String;
final address = request.addressByScripthashes[scriptHash]!;
if (result != null) {
_sendResponse(
ElectrumWorkerScripthashesSubscribeResponse(
result: ElectrumWorkerScripthashesResponse(
address: address,
scripthash: scriptHash,
status: result,
),
id: request.id,
completed: false,
),
);
}
scripthashByAddress.remove(address);
await _electrumClient!.request(
ElectrumRequestScriptHashUnSubscribe(scriptHash: scriptHash),
);
i++;
if (i == chunkSize) {
_handleBatchScriphashesSubscribe(
request,
chunkSize,
scripthashByAddress.values.toList(),
);
}
// Got all batches, complete
if (i == chunks.length && scripthashByAddress.isEmpty) {
_sendResponse(
ElectrumWorkerScripthashesSubscribeResponse(
result: ElectrumWorkerScripthashesResponse(
address: address,
scripthash: scriptHash,
status: null,
),
id: request.id,
completed: true,
),
);
}
}, onError: (Object e) {
// print(e);
});
}
} else {
_serverCapability!.supportsBatching = false;
}
}
Future<void> _handleScriphashesSubscribe( Future<void> _handleScriphashesSubscribe(
ElectrumWorkerScripthashesSubscribeRequest request, ElectrumWorkerScripthashesSubscribeRequest request,
) async { ) async {
if (_serverCapability!.supportsBatching) { if (_serverCapability!.supportsBatching) {
try { _handleBatchScriphashesSubscribe(request);
final req = ElectrumBatchRequestScriptHashSubscribe(
scriptHashes: request.scripthashByAddress.values.toList() as List<String>,
);
final streams = await _electrumClient!.batchSubscribe(req);
if (streams != null) {
int i = 0;
await Future.wait(streams.map((stream) async {
stream.subscription.listen((status) {
final batch = req.onResponse(status, stream.params);
final result = batch.result;
final scriptHash = batch.paramForRequest!.first as String;
final address = request.scripthashByAddress.entries
.firstWhere(
(entry) => entry.value == scriptHash,
)
.key;
if (result != null) {
_sendResponse(
ElectrumWorkerScripthashesSubscribeResponse(
result: {address: result},
id: request.id,
completed: false,
),
);
}
i++;
if (i == request.scripthashByAddress.length) {
_sendResponse(ElectrumWorkerScripthashesSubscribeResponse(
result: {address: null},
id: request.id,
completed: true,
));
}
}, onError: () {
_serverCapability!.supportsBatching = false;
});
}));
} else {
_serverCapability!.supportsBatching = false;
}
} catch (_) {
_serverCapability!.supportsBatching = false;
}
} }
if (_serverCapability!.supportsBatching == false) { if (_serverCapability!.supportsBatching == false) {
@ -310,16 +340,25 @@ class ElectrumWorker {
stream.listen((status) async { stream.listen((status) async {
if (status != null) { if (status != null) {
_sendResponse(ElectrumWorkerScripthashesSubscribeResponse( _sendResponse(ElectrumWorkerScripthashesSubscribeResponse(
result: {address: req.onResponse(status)}, result: ElectrumWorkerScripthashesResponse(
address: address,
scripthash: scripthash,
status: req.onResponse(status),
),
id: request.id, id: request.id,
completed: false, completed: false,
)); ));
} }
i++; i++;
// Got all statuses, complete
if (i == request.scripthashByAddress.length) { if (i == request.scripthashByAddress.length) {
_sendResponse(ElectrumWorkerScripthashesSubscribeResponse( _sendResponse(ElectrumWorkerScripthashesSubscribeResponse(
result: {address: null}, result: ElectrumWorkerScripthashesResponse(
address: address,
scripthash: scripthash,
status: null,
),
id: request.id, id: request.id,
completed: true, completed: true,
)); ));

View file

@ -3,11 +3,13 @@ part of 'methods.dart';
class ElectrumWorkerScripthashesSubscribeRequest implements ElectrumWorkerRequest { class ElectrumWorkerScripthashesSubscribeRequest implements ElectrumWorkerRequest {
ElectrumWorkerScripthashesSubscribeRequest({ ElectrumWorkerScripthashesSubscribeRequest({
required this.scripthashByAddress, required this.scripthashByAddress,
required this.addressByScripthashes,
this.id, this.id,
this.completed = false, this.completed = false,
}); });
final Map<String, dynamic> scripthashByAddress; final Map<String, String> scripthashByAddress;
final Map<String, String> addressByScripthashes;
final int? id; final int? id;
final bool completed; final bool completed;
@ -17,7 +19,8 @@ class ElectrumWorkerScripthashesSubscribeRequest implements ElectrumWorkerReques
@override @override
factory ElectrumWorkerScripthashesSubscribeRequest.fromJson(Map<dynamic, dynamic> json) { factory ElectrumWorkerScripthashesSubscribeRequest.fromJson(Map<dynamic, dynamic> json) {
return ElectrumWorkerScripthashesSubscribeRequest( return ElectrumWorkerScripthashesSubscribeRequest(
scripthashByAddress: json['scripthashes'] as Map<String, String>, scripthashByAddress: json['scripthashByAddress'] as Map<String, String>,
addressByScripthashes: json['addressByScripthashes'] as Map<String, String>,
id: json['id'] as int?, id: json['id'] as int?,
completed: json['completed'] as bool? ?? false, completed: json['completed'] as bool? ?? false,
); );
@ -29,7 +32,8 @@ class ElectrumWorkerScripthashesSubscribeRequest implements ElectrumWorkerReques
'method': method, 'method': method,
'id': id, 'id': id,
'completed': completed, 'completed': completed,
'scripthashes': scripthashByAddress, 'scripthashByAddress': scripthashByAddress,
'addressByScripthashes': addressByScripthashes,
}; };
} }
} }
@ -44,8 +48,32 @@ class ElectrumWorkerScripthashesSubscribeError extends ElectrumWorkerErrorRespon
final String method = ElectrumRequestMethods.scriptHashSubscribe.method; final String method = ElectrumRequestMethods.scriptHashSubscribe.method;
} }
class ElectrumWorkerScripthashesResponse {
ElectrumWorkerScripthashesResponse({
required this.address,
required this.scripthash,
this.status,
});
final String address;
final String scripthash;
final String? status;
Map<String, dynamic> toJson() {
return {'address': address, 'scripthash': scripthash, 'status': status};
}
static ElectrumWorkerScripthashesResponse fromJson(Map<String, dynamic> json) {
return ElectrumWorkerScripthashesResponse(
address: json['address'] as String,
scripthash: json['scripthash'] as String,
status: json['status'] as String?,
);
}
}
class ElectrumWorkerScripthashesSubscribeResponse class ElectrumWorkerScripthashesSubscribeResponse
extends ElectrumWorkerResponse<Map<String, dynamic>?, Map<String, dynamic>?> { extends ElectrumWorkerResponse<ElectrumWorkerScripthashesResponse, Map<String, dynamic>?> {
ElectrumWorkerScripthashesSubscribeResponse({ ElectrumWorkerScripthashesSubscribeResponse({
required super.result, required super.result,
super.error, super.error,
@ -55,13 +83,13 @@ class ElectrumWorkerScripthashesSubscribeResponse
@override @override
Map<String, dynamic>? resultJson(result) { Map<String, dynamic>? resultJson(result) {
return result; return result.toJson();
} }
@override @override
factory ElectrumWorkerScripthashesSubscribeResponse.fromJson(Map<String, dynamic> json) { factory ElectrumWorkerScripthashesSubscribeResponse.fromJson(Map<String, dynamic> json) {
return ElectrumWorkerScripthashesSubscribeResponse( return ElectrumWorkerScripthashesSubscribeResponse(
result: json['result'] as Map<String, dynamic>?, result: ElectrumWorkerScripthashesResponse.fromJson(json['result'] as Map<String, dynamic>),
error: json['error'] as String?, error: json['error'] as String?,
id: json['id'] as int?, id: json['id'] as int?,
completed: json['completed'] as bool? ?? false, completed: json['completed'] as bool? ?? false,