feat: electrum worker types

This commit is contained in:
Rafael Saes 2024-10-31 11:39:02 -03:00
parent f3a0ff7001
commit 02fabf8594
9 changed files with 509 additions and 163 deletions

View file

@ -4,7 +4,9 @@ import 'dart:io';
import 'dart:isolate';
import 'package:bitcoin_base/bitcoin_base.dart';
import 'package:cw_bitcoin/electrum_worker.dart';
import 'package:cw_bitcoin/electrum_worker/electrum_worker.dart';
import 'package:cw_bitcoin/electrum_worker/electrum_worker_methods.dart';
import 'package:cw_bitcoin/electrum_worker/methods/methods.dart';
import 'package:shared_preferences/shared_preferences.dart';
import 'package:blockchain_utils/blockchain_utils.dart';
import 'package:collection/collection.dart';
@ -104,35 +106,55 @@ abstract class ElectrumWalletBase
sharedPrefs.complete(SharedPreferences.getInstance());
}
void _handleWorkerResponse(dynamic response) {
print('Main: worker response: $response');
@action
void _handleWorkerResponse(dynamic message) {
print('Main: received message: $message');
final workerResponse = ElectrumWorkerResponse.fromJson(
jsonDecode(response.toString()) as Map<String, dynamic>,
);
if (workerResponse.error != null) {
// Handle error
print('Worker error: ${workerResponse.error}');
return;
Map<String, dynamic> messageJson;
if (message is String) {
messageJson = jsonDecode(message) as Map<String, dynamic>;
} else {
messageJson = message as Map<String, dynamic>;
}
final workerMethod = messageJson['method'] as String;
switch (workerResponse.method) {
case 'connectionStatus':
final status = workerResponse.data as String;
final connectionStatus = ConnectionStatus.values.firstWhere(
(e) => e.toString() == status,
);
_onConnectionStatusChange(connectionStatus);
// if (workerResponse.error != null) {
// print('Worker error: ${workerResponse.error}');
// switch (workerResponse.method) {
// // case 'connectionStatus':
// // final status = ConnectionStatus.values.firstWhere(
// // (e) => e.toString() == workerResponse.error,
// // );
// // _onConnectionStatusChange(status);
// // break;
// // case 'fetchBalances':
// // // Update the balance state
// // // this.balance[currency] = balance!;
// // break;
// case 'blockchain.headers.subscribe':
// _chainTipListenerOn = false;
// break;
// }
// return;
// }
switch (workerMethod) {
case ElectrumWorkerMethods.connectionMethod:
final response = ElectrumWorkerConnectionResponse.fromJson(messageJson);
_onConnectionStatusChange(response.result);
break;
case 'fetchBalances':
final balance = ElectrumBalance.fromJSON(
jsonDecode(workerResponse.data.toString()).toString(),
);
case ElectrumRequestMethods.headersSubscribeMethod:
final response = ElectrumWorkerHeadersSubscribeResponse.fromJson(messageJson);
onHeadersResponse(response.result);
break;
// case 'fetchBalances':
// final balance = ElectrumBalance.fromJSON(
// jsonDecode(workerResponse.data.toString()).toString(),
// );
// Update the balance state
// this.balance[currency] = balance!;
break;
// Handle other responses...
// break;
}
}
@ -301,19 +323,13 @@ abstract class ElectrumWalletBase
syncStatus = SynchronizingSyncStatus();
// await subscribeForHeaders();
// await subscribeForUpdates();
await subscribeForHeaders();
await subscribeForUpdates();
// await updateTransactions();
// await updateAllUnspents();
// await updateBalance();
// await updateFeeRates();
workerSendPort?.send(
ElectrumWorkerMessage(
method: 'blockchain.scripthash.get_balance',
params: {'scriptHash': scriptHashes.first},
).toJson(),
);
_updateFeeRateTimer ??=
Timer.periodic(const Duration(seconds: 5), (timer) async => await updateFeeRates());
@ -439,10 +455,7 @@ abstract class ElectrumWalletBase
if (message is SendPort) {
workerSendPort = message;
workerSendPort!.send(
ElectrumWorkerMessage(
method: 'connect',
params: {'uri': node.uri.toString()},
).toJson(),
ElectrumWorkerConnectionRequest(uri: node.uri).toJson(),
);
} else {
_handleWorkerResponse(message);
@ -1790,25 +1803,17 @@ abstract class ElectrumWalletBase
(address) => !scripthashesListening.contains(address.scriptHash),
);
await Future.wait(unsubscribedScriptHashes.map((addressRecord) async {
final scripthash = addressRecord.scriptHash;
final listener = await electrumClient2!.subscribe(
ElectrumScriptHashSubscribe(scriptHash: scripthash),
);
if (listener != null) {
scripthashesListening.add(scripthash);
// https://electrumx.readthedocs.io/en/latest/protocol-basics.html#status
// The status of the script hash is the hash of the tx history, or null if the string is empty because there are no transactions
listener((status) async {
print("status: $status");
await _fetchAddressHistory(addressRecord);
await updateUnspentsForAddress(addressRecord);
Map<String, String> scripthashByAddress = {};
List<String> scriptHashesList = [];
walletAddresses.allAddresses.forEach((addressRecord) {
scripthashByAddress[addressRecord.address] = addressRecord.scriptHash;
scriptHashesList.add(addressRecord.scriptHash);
});
}
}));
workerSendPort!.send(
ElectrumWorkerScripthashesSubscribeRequest(scripthashByAddress: scripthashByAddress).toJson(),
);
scripthashesListening.addAll(scriptHashesList);
}
@action
@ -1928,11 +1933,8 @@ abstract class ElectrumWalletBase
Future<void> subscribeForHeaders() async {
if (_chainTipListenerOn) return;
final listener = electrumClient2!.subscribe(ElectrumHeaderSubscribe());
if (listener == null) return;
workerSendPort!.send(ElectrumWorkerHeadersSubscribeRequest().toJson());
_chainTipListenerOn = true;
listener(onHeadersResponse);
}
@action

View file

@ -3,55 +3,9 @@ import 'dart:convert';
import 'dart:isolate';
import 'package:bitcoin_base/bitcoin_base.dart';
import 'package:cw_bitcoin/electrum_balance.dart';
class ElectrumWorkerMessage {
final String method;
final Map<String, dynamic> params;
ElectrumWorkerMessage({
required this.method,
required this.params,
});
Map<String, dynamic> toJson() => {
'method': method,
'params': params,
};
factory ElectrumWorkerMessage.fromJson(Map<String, dynamic> json) {
return ElectrumWorkerMessage(
method: json['method'] as String,
params: json['params'] as Map<String, dynamic>,
);
}
}
class ElectrumWorkerResponse {
final String method;
final dynamic data;
final String? error;
ElectrumWorkerResponse({
required this.method,
required this.data,
this.error,
});
Map<String, dynamic> toJson() => {
'method': method,
'data': data,
'error': error,
};
factory ElectrumWorkerResponse.fromJson(Map<String, dynamic> json) {
return ElectrumWorkerResponse(
method: json['method'] as String,
data: json['data'],
error: json['error'] as String?,
);
}
}
import 'package:cw_bitcoin/electrum_worker/electrum_worker_methods.dart';
// import 'package:cw_bitcoin/electrum_balance.dart';
import 'package:cw_bitcoin/electrum_worker/electrum_worker_params.dart';
class ElectrumWorker {
final SendPort sendPort;
@ -69,33 +23,36 @@ class ElectrumWorker {
receivePort.listen(worker.handleMessage);
}
Future<void> _handleConnect({
required Uri uri,
}) async {
_electrumClient = ElectrumApiProvider(
await ElectrumTCPService.connect(
uri,
onConnectionStatusChange: (status) {
_sendResponse('connectionStatus', status.toString());
},
defaultRequestTimeOut: const Duration(seconds: 5),
connectionTimeOut: const Duration(seconds: 5),
),
);
void _sendResponse(ElectrumWorkerResponse response) {
sendPort.send(jsonEncode(response.toJson()));
}
void _sendError(ElectrumWorkerErrorResponse response) {
sendPort.send(jsonEncode(response.toJson()));
}
void handleMessage(dynamic message) async {
try {
final workerMessage = ElectrumWorkerMessage.fromJson(message as Map<String, dynamic>);
print("Worker: received message: $message");
switch (workerMessage.method) {
case 'connect':
final uri = Uri.parse(workerMessage.params['uri'] as String);
await _handleConnect(uri: uri);
break;
case 'blockchain.scripthash.get_balance':
await _handleGetBalance(workerMessage);
try {
Map<String, dynamic> messageJson;
if (message is String) {
messageJson = jsonDecode(message) as Map<String, dynamic>;
} else {
messageJson = message as Map<String, dynamic>;
}
final workerMethod = messageJson['method'] as String;
switch (workerMethod) {
case ElectrumWorkerMethods.connectionMethod:
await _handleConnect(ElectrumWorkerConnectRequest.fromJson(messageJson));
break;
// case 'blockchain.headers.subscribe':
// await _handleHeadersSubscribe();
// break;
// case 'blockchain.scripthash.get_balance':
// await _handleGetBalance(message);
// break;
case 'blockchain.scripthash.get_history':
// await _handleGetHistory(workerMessage);
break;
@ -103,51 +60,59 @@ class ElectrumWorker {
// await _handleListUnspent(workerMessage);
break;
// Add other method handlers here
default:
_sendError(workerMessage.method, 'Unsupported method: ${workerMessage.method}');
// default:
// _sendError(workerMethod, 'Unsupported method: ${workerMessage.method}');
}
} catch (e, s) {
print(s);
_sendError('unknown', e.toString());
_sendError(ElectrumWorkerErrorResponse(error: e.toString()));
}
}
void _sendResponse(String method, dynamic data) {
final response = ElectrumWorkerResponse(
method: method,
data: data,
Future<void> _handleConnect(ElectrumWorkerConnectRequest request) async {
_electrumClient = ElectrumApiProvider(
await ElectrumTCPService.connect(
request.uri,
onConnectionStatusChange: (status) {
_sendResponse(ElectrumWorkerConnectResponse(status: status.toString()));
},
defaultRequestTimeOut: const Duration(seconds: 5),
connectionTimeOut: const Duration(seconds: 5),
),
);
sendPort.send(jsonEncode(response.toJson()));
}
void _sendError(String method, String error) {
final response = ElectrumWorkerResponse(
method: method,
data: null,
error: error,
);
sendPort.send(jsonEncode(response.toJson()));
}
// Future<void> _handleHeadersSubscribe() async {
// final listener = _electrumClient!.subscribe(ElectrumHeaderSubscribe());
// if (listener == null) {
// _sendError('blockchain.headers.subscribe', 'Failed to subscribe');
// return;
// }
Future<void> _handleGetBalance(ElectrumWorkerMessage message) async {
try {
final scriptHash = message.params['scriptHash'] as String;
final result = await _electrumClient!.request(
ElectrumGetScriptHashBalance(scriptHash: scriptHash),
);
// listener((event) {
// _sendResponse('blockchain.headers.subscribe', event);
// });
// }
final balance = ElectrumBalance(
confirmed: result['confirmed'] as int? ?? 0,
unconfirmed: result['unconfirmed'] as int? ?? 0,
frozen: 0,
);
// Future<void> _handleGetBalance(ElectrumWorkerRequest message) async {
// try {
// final scriptHash = message.params['scriptHash'] as String;
// final result = await _electrumClient!.request(
// ElectrumGetScriptHashBalance(scriptHash: scriptHash),
// );
_sendResponse(message.method, balance.toJSON());
} catch (e, s) {
print(s);
_sendError(message.method, e.toString());
}
}
// final balance = ElectrumBalance(
// confirmed: result['confirmed'] as int? ?? 0,
// unconfirmed: result['unconfirmed'] as int? ?? 0,
// frozen: 0,
// );
// _sendResponse(message.method, balance.toJSON());
// } catch (e, s) {
// print(s);
// _sendError(message.method, e.toString());
// }
// }
// Future<void> _handleGetHistory(ElectrumWorkerMessage message) async {
// try {

View file

@ -0,0 +1,171 @@
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
import 'package:bitcoin_base/bitcoin_base.dart';
import 'package:cw_bitcoin/electrum_worker/electrum_worker_methods.dart';
// import 'package:cw_bitcoin/electrum_balance.dart';
import 'package:cw_bitcoin/electrum_worker/electrum_worker_params.dart';
import 'package:cw_bitcoin/electrum_worker/methods/methods.dart';
class ElectrumWorker {
final SendPort sendPort;
ElectrumApiProvider? _electrumClient;
ElectrumWorker._(this.sendPort, {ElectrumApiProvider? electrumClient})
: _electrumClient = electrumClient;
static void run(SendPort sendPort) {
final worker = ElectrumWorker._(sendPort);
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
receivePort.listen(worker.handleMessage);
}
void _sendResponse<T, U>(ElectrumWorkerResponse<T, U> response) {
sendPort.send(jsonEncode(response.toJson()));
}
void _sendError(ElectrumWorkerErrorResponse response) {
sendPort.send(jsonEncode(response.toJson()));
}
void handleMessage(dynamic message) async {
print("Worker: received message: $message");
try {
Map<String, dynamic> messageJson;
if (message is String) {
messageJson = jsonDecode(message) as Map<String, dynamic>;
} else {
messageJson = message as Map<String, dynamic>;
}
final workerMethod = messageJson['method'] as String;
switch (workerMethod) {
case ElectrumWorkerMethods.connectionMethod:
await _handleConnect(
ElectrumWorkerConnectionRequest.fromJson(messageJson),
);
break;
case ElectrumRequestMethods.headersSubscribeMethod:
await _handleHeadersSubscribe();
break;
case ElectrumRequestMethods.scripthashesSubscribeMethod:
await _handleScriphashesSubscribe(
ElectrumWorkerScripthashesSubscribeRequest.fromJson(messageJson),
);
break;
// case 'blockchain.scripthash.get_balance':
// await _handleGetBalance(message);
// break;
case 'blockchain.scripthash.get_history':
// await _handleGetHistory(workerMessage);
break;
case 'blockchain.scripthash.listunspent':
// await _handleListUnspent(workerMessage);
break;
// Add other method handlers here
// default:
// _sendError(workerMethod, 'Unsupported method: ${workerMessage.method}');
}
} catch (e, s) {
print(s);
_sendError(ElectrumWorkerErrorResponse(error: e.toString()));
}
}
Future<void> _handleConnect(ElectrumWorkerConnectionRequest request) async {
_electrumClient = ElectrumApiProvider(
await ElectrumTCPService.connect(
request.uri,
onConnectionStatusChange: (status) {
_sendResponse(ElectrumWorkerConnectionResponse(status: status));
},
defaultRequestTimeOut: const Duration(seconds: 5),
connectionTimeOut: const Duration(seconds: 5),
),
);
}
Future<void> _handleHeadersSubscribe() async {
final listener = _electrumClient!.subscribe(ElectrumHeaderSubscribe());
if (listener == null) {
_sendError(ElectrumWorkerHeadersSubscribeError(error: 'Failed to subscribe'));
return;
}
listener((event) {
_sendResponse(ElectrumWorkerHeadersSubscribeResponse(result: event));
});
}
Future<void> _handleScriphashesSubscribe(
ElectrumWorkerScripthashesSubscribeRequest request,
) async {
await Future.wait(request.scripthashByAddress.entries.map((entry) async {
final address = entry.key;
final scripthash = entry.value;
final listener = await _electrumClient!.subscribe(
ElectrumScriptHashSubscribe(scriptHash: scripthash),
);
if (listener == null) {
_sendError(ElectrumWorkerScripthashesSubscribeError(error: 'Failed to subscribe'));
return;
}
// https://electrumx.readthedocs.io/en/latest/protocol-basics.html#status
// The status of the script hash is the hash of the tx history, or null if the string is empty because there are no transactions
listener((status) async {
print("status: $status");
_sendResponse(ElectrumWorkerScripthashesSubscribeResponse(
result: {address: status},
));
});
}));
}
// Future<void> _handleGetBalance(ElectrumWorkerRequest message) async {
// try {
// final scriptHash = message.params['scriptHash'] as String;
// final result = await _electrumClient!.request(
// ElectrumGetScriptHashBalance(scriptHash: scriptHash),
// );
// final balance = ElectrumBalance(
// confirmed: result['confirmed'] as int? ?? 0,
// unconfirmed: result['unconfirmed'] as int? ?? 0,
// frozen: 0,
// );
// _sendResponse(message.method, balance.toJSON());
// } catch (e, s) {
// print(s);
// _sendError(message.method, e.toString());
// }
// }
// Future<void> _handleGetHistory(ElectrumWorkerMessage message) async {
// try {
// final scriptHash = message.params['scriptHash'] as String;
// final result = await electrumClient.getHistory(scriptHash);
// _sendResponse(message.method, jsonEncode(result));
// } catch (e) {
// _sendError(message.method, e.toString());
// }
// }
// Future<void> _handleListUnspent(ElectrumWorkerMessage message) async {
// try {
// final scriptHash = message.params['scriptHash'] as String;
// final result = await electrumClient.listUnspent(scriptHash);
// _sendResponse(message.method, jsonEncode(result));
// } catch (e) {
// _sendError(message.method, e.toString());
// }
// }
}

View file

@ -0,0 +1,15 @@
class ElectrumWorkerMethods {
const ElectrumWorkerMethods._(this.method);
final String method;
static const String connectionMethod = "connection";
static const String unknownMethod = "unknown";
static const ElectrumWorkerMethods connect = ElectrumWorkerMethods._(connectionMethod);
static const ElectrumWorkerMethods unknown = ElectrumWorkerMethods._(unknownMethod);
@override
String toString() {
return method;
}
}

View file

@ -0,0 +1,45 @@
// import 'dart:convert';
import 'package:cw_bitcoin/electrum_worker/electrum_worker_methods.dart';
abstract class ElectrumWorkerRequest {
abstract final String method;
Map<String, dynamic> toJson();
ElectrumWorkerRequest.fromJson(Map<String, dynamic> json);
}
class ElectrumWorkerResponse<RESULT, RESPONSE> {
ElectrumWorkerResponse({required this.method, required this.result, this.error});
final String method;
final RESULT result;
final String? error;
RESPONSE resultJson(RESULT result) {
throw UnimplementedError();
}
factory ElectrumWorkerResponse.fromJson(Map<String, dynamic> json) {
throw UnimplementedError();
}
Map<String, dynamic> toJson() {
return {'method': method, 'result': resultJson(result), 'error': error};
}
}
class ElectrumWorkerErrorResponse {
ElectrumWorkerErrorResponse({required this.error});
String get method => ElectrumWorkerMethods.unknown.method;
final String error;
factory ElectrumWorkerErrorResponse.fromJson(Map<String, dynamic> json) {
return ElectrumWorkerErrorResponse(error: json['error'] as String);
}
Map<String, dynamic> toJson() {
return {'method': method, 'error': error};
}
}

View file

@ -0,0 +1,50 @@
part of 'methods.dart';
class ElectrumWorkerConnectionRequest implements ElectrumWorkerRequest {
ElectrumWorkerConnectionRequest({required this.uri});
final Uri uri;
@override
final String method = ElectrumWorkerMethods.connect.method;
@override
factory ElectrumWorkerConnectionRequest.fromJson(Map<String, dynamic> json) {
return ElectrumWorkerConnectionRequest(uri: Uri.parse(json['params'] as String));
}
@override
Map<String, dynamic> toJson() {
return {'method': method, 'params': uri.toString()};
}
}
class ElectrumWorkerConnectionError extends ElectrumWorkerErrorResponse {
ElectrumWorkerConnectionError({required String error}) : super(error: error);
@override
String get method => ElectrumWorkerMethods.connect.method;
}
class ElectrumWorkerConnectionResponse extends ElectrumWorkerResponse<ConnectionStatus, String> {
ElectrumWorkerConnectionResponse({required ConnectionStatus status, super.error})
: super(
result: status,
method: ElectrumWorkerMethods.connect.method,
);
@override
String resultJson(result) {
return result.toString();
}
@override
factory ElectrumWorkerConnectionResponse.fromJson(Map<String, dynamic> json) {
return ElectrumWorkerConnectionResponse(
status: ConnectionStatus.values.firstWhere(
(e) => e.toString() == json['result'] as String,
),
error: json['error'] as String?,
);
}
}

View file

@ -0,0 +1,44 @@
part of 'methods.dart';
class ElectrumWorkerHeadersSubscribeRequest implements ElectrumWorkerRequest {
ElectrumWorkerHeadersSubscribeRequest();
@override
final String method = ElectrumRequestMethods.headersSubscribe.method;
@override
factory ElectrumWorkerHeadersSubscribeRequest.fromJson(Map<String, dynamic> json) {
return ElectrumWorkerHeadersSubscribeRequest();
}
@override
Map<String, dynamic> toJson() {
return {'method': method};
}
}
class ElectrumWorkerHeadersSubscribeError extends ElectrumWorkerErrorResponse {
ElectrumWorkerHeadersSubscribeError({required String error}) : super(error: error);
@override
final String method = ElectrumRequestMethods.headersSubscribe.method;
}
class ElectrumWorkerHeadersSubscribeResponse
extends ElectrumWorkerResponse<ElectrumHeaderResponse, Map<String, dynamic>> {
ElectrumWorkerHeadersSubscribeResponse({required super.result, super.error})
: super(method: ElectrumRequestMethods.headersSubscribe.method);
@override
Map<String, dynamic> resultJson(result) {
return result.toJson();
}
@override
factory ElectrumWorkerHeadersSubscribeResponse.fromJson(Map<String, dynamic> json) {
return ElectrumWorkerHeadersSubscribeResponse(
result: ElectrumHeaderResponse.fromJson(json['result'] as Map<String, dynamic>),
error: json['error'] as String?,
);
}
}

View file

@ -0,0 +1,6 @@
import 'package:bitcoin_base/bitcoin_base.dart';
import 'package:cw_bitcoin/electrum_worker/electrum_worker_methods.dart';
import 'package:cw_bitcoin/electrum_worker/electrum_worker_params.dart';
part 'connection.dart';
part 'headers_subscribe.dart';
part 'scripthashes_subscribe.dart';

View file

@ -0,0 +1,48 @@
part of 'methods.dart';
class ElectrumWorkerScripthashesSubscribeRequest implements ElectrumWorkerRequest {
ElectrumWorkerScripthashesSubscribeRequest({required this.scripthashByAddress});
final Map<String, String> scripthashByAddress;
@override
final String method = ElectrumRequestMethods.scriptHashSubscribe.method;
@override
factory ElectrumWorkerScripthashesSubscribeRequest.fromJson(Map<String, dynamic> json) {
return ElectrumWorkerScripthashesSubscribeRequest(
scripthashByAddress: json['scripthashes'] as Map<String, String>,
);
}
@override
Map<String, dynamic> toJson() {
return {'method': method, 'scripthashes': scripthashByAddress};
}
}
class ElectrumWorkerScripthashesSubscribeError extends ElectrumWorkerErrorResponse {
ElectrumWorkerScripthashesSubscribeError({required String error}) : super(error: error);
@override
final String method = ElectrumRequestMethods.scriptHashSubscribe.method;
}
class ElectrumWorkerScripthashesSubscribeResponse
extends ElectrumWorkerResponse<Map<String, String>?, Map<String, String>?> {
ElectrumWorkerScripthashesSubscribeResponse({required super.result, super.error})
: super(method: ElectrumRequestMethods.scriptHashSubscribe.method);
@override
Map<String, String>? resultJson(result) {
return result;
}
@override
factory ElectrumWorkerScripthashesSubscribeResponse.fromJson(Map<String, dynamic> json) {
return ElectrumWorkerScripthashesSubscribeResponse(
result: json['result'] as Map<String, String>?,
error: json['error'] as String?,
);
}
}