Source code for bitcoinlib.services.electrumx

# -*- coding: utf-8 -*-
#
#    BitcoinLib - Python Cryptocurrency Library
#    Electrumx client
#    © 2025 January - 1200 Web Development <http://1200wd.com/>
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Affero General Public License as
#    published by the Free Software Foundation, either version 3 of the
#    License, or (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import logging
from datetime import datetime, timezone
import socket
import sys
try:
    import aiorpcx
except ImportError:
    pass
import asyncio
from bitcoinlib.main import MAX_TRANSACTIONS
from bitcoinlib.services.baseclient import BaseClient, ClientError
from bitcoinlib.transactions import Transaction
from bitcoinlib.keys import Address, sha256
from bitcoinlib.scripts import Script

PROVIDERNAME = 'electrumx'
ELECTRUMX_PROTOCOL_VERSION = "1.6"

_logger = logging.getLogger(__name__)


[docs] class ElectrumxClient(BaseClient): def __init__(self, network, base_url, denominator, api_key, *args): self.aiorpcx_installed = True if 'aiorpcx' not in sys.modules: self.aiorpcx_installed = False _logger.warning('Aiorpcx library not installed, using sockets directly now. Please install aiorpcx library ' 'when using ElectumX client for faster and more reliable results') super(self.__class__, self).__init__(network, PROVIDERNAME, base_url, denominator, api_key, *args)
[docs] def compose_request(self, method, parameters=None): try: host, port = self.base_url.split(':') except ValueError: raise ClientError('Please specify ElectrumX uri in format host:port') parameters = parameters or [] if self.aiorpcx_installed: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(0.5) if sock.connect_ex((host, int(port))) != 0: raise ClientError('ElectrumX server %s unavailable at port %s' % (host, port)) sock.close() async def main(host, port, method, parameters): async with aiorpcx.connect_rs(host, port, framer=aiorpcx.NewlineFramer(5000000)) as session: session.sent_request_timeout = self.timeout await session.send_request('server.version', ["Bitcoinlib", ELECTRUMX_PROTOCOL_VERSION]) return await session.send_request(method, parameters) return asyncio.run(main(host, port, method, parameters)) else: content = { "method": method, "params": parameters if parameters else [], "id": 0 } sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(10) sock.connect((host, int(port))) import json from time import sleep server_version_msg = {"method": "server.version", "params": ["BitcoinLib", ELECTRUMX_PROTOCOL_VERSION], "id": 0} sock.sendall(json.dumps(server_version_msg).encode('utf-8') + b'\n') sleep(0.1) buffer = b"" try: sock.settimeout(1) while True: data = sock.recv(4096) if not data: break buffer += data except socket.timeout: pass sock.settimeout(10) sock.sendall(json.dumps(content).encode('utf-8')+b'\n') sleep(0.5) sock.shutdown(socket.SHUT_WR) res = "" while True: data = sock.recv(1024) if not data: break res += data.decode() sock.close() parsed_resp = json.loads(res) if 'result' in parsed_resp: return parsed_resp['result'] else: raise ClientError("Electrumx error: %s" % parsed_resp['error'])
def _get_scripthash(self, address): address_obj = Address.parse(address) return sha256(Script(public_hash=address_obj.hash_bytes, script_types=[address_obj.script_type]).as_bytes())[::-1].hex()
[docs] def getbalance(self, addresslist): balance = 0 for address in addresslist: scripthash = self._get_scripthash(address) res = self.compose_request('blockchain.scripthash.get_balance', [scripthash]) balance += res['confirmed'] + res['unconfirmed'] return balance
[docs] def getutxos(self, address, after_txid='', limit=MAX_TRANSACTIONS): scripthash = self._get_scripthash(address) res = self.compose_request('blockchain.scripthash.listunspent', [scripthash]) self.latest_block = self.blockcount() if not self.latest_block else self.latest_block utxos = [] for u in res: if u['tx_hash'] == after_txid: utxos = [] continue block_height = None if not u['height'] else u['height'] confirmations = self.latest_block - block_height if self.latest_block else None utxos.append({ 'address': address, 'txid': u['tx_hash'], 'confirmations': confirmations, 'output_n': u['tx_pos'], 'input_n': 0, 'block_height': block_height, 'fee': None, 'size': 0, 'value': u['value'], 'script': '', 'date': None }) return utxos[:limit]
def _parse_transaction(self, tx, block_height=None, get_input_values=True): confirmations = tx['confirmations'] status = 'unconfirmed' # FIXME: Number of confirmations returned by Electrumx is not always correct, use block database or query # electrumx for correct blockheight? if confirmations: status = 'confirmed' self.latest_block = self.blockcount() if not self.latest_block else self.latest_block block_height = self.latest_block - confirmations + 1 tx_date = None if not tx.get('blocktime') else datetime.fromtimestamp(tx['blocktime'], timezone.utc) rawtx = self.compose_request('blockchain.transaction.get', [tx['txid'], False]) t = Transaction.parse_hex(rawtx, strict=self.strict, network=self.network) t.confirmations = confirmations t.status = status t.date = tx_date t.block_height = block_height t.rawtx = bytes.fromhex(rawtx) t.size = tx['size'] t.vsize = tx['vsize'] t.network = self.network for n, i in enumerate(t.inputs): if not t.coinbase and not i.value and get_input_values: # This does not work with very large transactions, increase MAX_SEND in electrumx config try: ptx = self.compose_request('blockchain.transaction.get', [i.prev_txid.hex(), True]) i.value = round([x['value'] for x in ptx['vout'] if x['n'] == i.output_n_int][0] / self.network.denominator) except: pass t.input_total += i.value for o in t.outputs: o.spent = None t.update_totals() return t
[docs] def gettransaction(self, txid, block_count=None): tx = self.compose_request('blockchain.transaction.get', [txid, True]) return self._parse_transaction(tx, block_count)
[docs] def gettransactions(self, address, after_txid='', limit=MAX_TRANSACTIONS): scripthash = self._get_scripthash(address) txids = self.compose_request('blockchain.scripthash.get_history', [scripthash]) txids_after = [] for tx in txids: txids_after.append(tx['tx_hash']) if tx['tx_hash'] == after_txid: txids_after = [] txs = [] for txid in txids_after[:limit]: txs.append(self.gettransaction(txid)) return txs
[docs] def getrawtransaction(self, txid): return self.compose_request('blockchain.transaction.get', [txid, False])
[docs] def sendrawtransaction(self, rawtx): txid = self.compose_request('blockchain.transaction.broadcast', [rawtx]) return { 'txid': txid, 'response_dict': txid }
[docs] def estimatefee(self, blocks): return round(self.compose_request('blockchain.estimatefee', [blocks]) / self.network.denominator)
[docs] def blockcount(self): return self.compose_request('blockchain.headers.subscribe')['height']
[docs] def mempool(self, txid): if txid: t = self.gettransaction(txid) if t and not t.confirmations: return [t.txid] return []
# Only returns headers, not full block # def getblock(self, blockid, parse_transactions, page, limit): # braw = self.getrawblock(blockid) # return Block.parse(bytes.fromhex(braw), height=blockid, parse_transactions=parse_transactions, limit=limit) # def getrawblock(self, blockid): # res = self.compose_request('blockchain.block.header', [blockid]) # return res # def isspent(self, txid, output_n): # res = self.compose_request('isspent', txid, str(output_n)) # return 1 if res['spent'] else 0 # # def getinfo(self): # res = self.compose_request('') # info = {k: v for k, v in res.items() if k in ['chain', 'blockcount', 'hashrate', 'mempool_size', # 'difficulty']} # return info