Source code for bitcoinlib.scripts

# -*- coding: utf-8 -*-
#
#    BitcoinLib - Python Cryptocurrency Library
#    Scripts class - Parse, Serialize and Evaluate scripts
#    © 2021 - 1200 Web Development <http://1200wd.com/>
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Affero General Public License as
#    published by the Free Software Foundation, either version 3 of the
#    License, or (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

from io import BytesIO
from bitcoinlib.encoding import *
from bitcoinlib.main import *
from bitcoinlib.config.opcodes import *
from bitcoinlib.keys import Signature, Key


_logger = logging.getLogger(__name__)


SCRIPT_TYPES = {
    # <name>: (<type>, <script_commands>, <data-lengths>)
    'p2pkh': ('locking', [op.op_dup, op.op_hash160, 'data', op.op_equalverify, op.op_checksig], [20]),
    'p2pkh_drop': ('locking', ['data', op.op_drop, op.op_dup, op.op_hash160, 'data', op.op_equalverify, op.op_checksig],
                   [32, 20]),
    'p2sh': ('locking', [op.op_hash160, 'data', op.op_equal], [20]),
    'p2wpkh': ('locking', [op.op_0, 'data'], [20]),
    'p2wsh': ('locking', [op.op_0, 'data'], [32]),
    'multisig': ('locking', ['op_n', 'key', 'op_n', op.op_checkmultisig], []),
    'p2pk': ('locking', ['key', op.op_checksig], []),
    'nulldata': ('locking', [op.op_return, 'data'], [0]),
    'nulldata_1': ('locking', [op.op_return, op.op_0], []),
    'nulldata_2': ('locking', [op.op_return], []),
    'sig_pubkey': ('unlocking', ['signature', 'key'], []),
    # 'p2sh_multisig': ('unlocking', [op.op_0, 'signature', 'op_n', 'key', 'op_n', op.op_checkmultisig], []),
    'p2sh_multisig': ('unlocking', [op.op_0, 'signature', 'redeemscript'], []),
    'p2sh_multisig_2?': ('unlocking', [op.op_0, 'signature', op.op_verify, 'redeemscript'], []),
    'p2sh_multisig_3?': ('unlocking', [op.op_0, 'signature', op.op_1add, 'redeemscript'], []),
    'p2sh_p2wpkh': ('unlocking', [op.op_0, op.op_hash160, 'redeemscript', op.op_equal], []),
    'p2sh_p2wsh': ('unlocking', [op.op_0, 'redeemscript'], []),
    'signature': ('unlocking', ['signature'], []),
    'signature_multisig': ('unlocking', [op.op_0, 'signature'], []),
    'locktime_cltv': ('unlocking', ['locktime_cltv', op.op_checklocktimeverify, op.op_drop], []),
    'locktime_csv': ('unlocking', ['locktime_csv', op.op_checksequenceverify, op.op_drop], []),
}


[docs]class ScriptError(Exception): """ Handle Key class Exceptions """ def __init__(self, msg=''): self.msg = msg _logger.error(msg) def __str__(self): return self.msg
def _get_script_types(blueprint): # Convert blueprint to more generic format bp = [] for item in blueprint: if isinstance(item, str) and item[:4] == 'data': bp.append('data') elif isinstance(item, int) and op.op_1 <= item <= op.op_16: bp.append('op_n') elif item == 'key' and len(bp) and bp[-1] == 'key': bp[-1] = 'key' elif item == 'signature' and len(bp) and bp[-1] == 'signature': bp[-1] = 'signature' else: bp.append(item) bp_len = [int(c.split('-')[1]) for c in blueprint if isinstance(c, str) and c[:4] == 'data'] script_types = [] while len(bp): # Find all possible matches with blueprint matches = [(key, len(values[1]), values[2]) for key, values in SCRIPT_TYPES.items() if values[1] == bp[:len(values[1])]] if not matches: script_types.append('unknown') break # Select match with correct data length if more then 1 match is found match_id = 0 for match in matches: data_lens = match[2] for i, data_len in enumerate(data_lens): bl = bp_len[i] if data_len == bl or data_len == 0: match_id = matches.index(match) break # Add script type to list script_type = matches[match_id][0] if script_type == 'multisig' and script_types[-1:] == ['signature_multisig']: script_types.pop() script_type = 'p2sh_multisig' script_types.append(script_type) bp = bp[matches[match_id][1]:] return script_types
[docs]def data_pack(data): """ Add data length prefix to data string to include data in a script :param data: Data to be packed :type data: bytes :return bytes: """ if len(data) <= 75: return len(data).to_bytes(1, 'big') + data elif 75 < len(data) <= 255: return b'L' + len(data).to_bytes(1, 'little') + data else: return b'M' + len(data).to_bytes(2, 'little') + data
[docs]def get_data_type(data): """ Get type of data in script. Recognises signatures, keys, hashes or sequence data. Return 'other' if data is not recognised. :param data: Data part of script :type data: bytes :return str: """ if isinstance(data, Key): return 'key_object' elif isinstance(data, Signature): return 'signature_object' elif data.startswith(b'\x30') and 69 <= len(data) <= 74: return 'signature' elif ((data.startswith(b'\x02') or data.startswith(b'\x03')) and len(data) == 33) or \ (data.startswith(b'\x04') and len(data) == 65): return 'key' elif len(data) == 20 or len(data) == 32 or 1 < len(data) <= 4: return 'data-%d' % len(data) else: return 'other'
[docs]class Script(object): def __init__(self, commands=None, message=None, script_types='', is_locking=True, keys=None, signatures=None, blueprint=None, tx_data=None, public_hash=b'', sigs_required=None, redeemscript=b'', hash_type=SIGHASH_ALL): """ Create a Script object with specified parameters. Use parse() method to create a Script from raw hex >>> s = Script([op.op_2, op.op_4, op.op_add]) >>> s <Script([op.op_2, op.op_4, op.op_add])> >>> s.blueprint [82, 84, 147] >>> s.evaluate() True Stack is empty now, because evaluate pops last item from stack and check if is non-zero >>> s.stack [] :param commands: List of script language commands :type commands: list :param message: Signed message to verify, normally a transaction hash. Used to validate script :type message: bytes :param script_types: List of script_types as defined in SCRIPT_TYPES :type script_types: list of str :param is_locking: Is this a locking script (Output), otherwise unlocking (Input) :type is_locking: bool :param keys: Provide list of keys to create script :type keys: list of Key :param signatures: Provide list of signatures to create script :type signatures: list of Signature :param blueprint: Simplified version of script, normally generated by Script object :type blueprint: list of str :param tx_data: Dictionary with extra information needed to verify script. Such as 'redeemscript' for multisignature scripts and 'blockcount' for time locked scripts :type tx_data: dict :param public_hash: Public hash of key or redeemscript used to create scripts :type public_hash: bytes :param sigs_required: Nubmer of signatures required to create multisig script :type sigs_required: int :param redeemscript: Provide redeemscript to create a new (multisig) script :type redeemscript: bytes :param hash_type: Specific script hash type, default is SIGHASH_ALL :type hash_type: int """ self.commands = commands if commands else [] self._raw = b'' self.stack = [] self.message = message self.script_types = script_types if script_types else [] self.is_locking = is_locking self.keys = keys if keys else [] self.signatures = signatures if signatures else [] self._blueprint = blueprint if blueprint else [] self.tx_data = {} if not tx_data else tx_data self.sigs_required = sigs_required if sigs_required else len(self.keys) if len(self.keys) else 1 self.redeemscript = redeemscript self.public_hash = public_hash self.hash_type = hash_type if not self.commands and self.script_types and (self.keys or self.signatures or self.public_hash): for st in self.script_types: st_values = SCRIPT_TYPES[st] script_template = st_values[1] self.is_locking = True if st_values[0] == 'locking' else False sig_n_and_m = [len(self.keys), self.sigs_required] for tc in script_template: command = [tc] if tc == 'data': command = [self.public_hash] if self.public_hash else [] elif tc == 'signature': command = self.signatures elif tc == 'key': command = self.keys elif tc == 'op_n': command = [sig_n_and_m.pop() + 80] elif tc == 'redeemscript': command = [self.redeemscript] if not command or command == [b'']: raise ScriptError("Cannot create script, please supply %s" % (tc if tc != 'data' else 'public key hash')) self.commands += command if not (self.keys and self.signatures and self.blueprint): self._blueprint = [] for c in self.commands: if isinstance(c, int): self._blueprint.append(c) else: data_type = get_data_type(c) if data_type in ['key', 'signature', 'key_object', 'signature_object']: if data_type == 'key_object': data_type = 'key' elif data_type == 'signature_object': data_type = 'signature' self._blueprint.append(data_type) else: self._blueprint.append('data-%d' % len(c))
[docs] @classmethod def parse(cls, script, message=None, tx_data=None, strict=True, _level=0): """ Parse raw script and return Script object. Extracts script commands, keys, signatures and other data. Wrapper for the :func:`parse_bytesio` method. Convert hexadecimal string or bytes script to BytesIO. >>> Script.parse('76a914af8e14a2cecd715c363b3a72b55b59a31e2acac988ac') <Script([op.op_dup, op.op_hash160, data-20, op.op_equalverify, op.op_checksig])> :param script: Raw script to parse in bytes, BytesIO or hexadecimal string format :type script: BytesIO, bytes, str :param message: Signed message to verify, normally a transaction hash :type message: bytes :param tx_data: Dictionary with extra information needed to verify script. Such as 'redeemscript' for multisignature scripts and 'blockcount' for time locked scripts :type tx_data: dict :param strict: Raise exception when script is malformed, incomplete or not understood. Default is True :type strict: bool :param _level: Internal argument used to avoid recursive depth :type _level: int :return Script: """ if isinstance(script, bytes): script = BytesIO(script) elif isinstance(script, str): script = BytesIO(bytes.fromhex(script)) return cls.parse_bytesio(script, message, tx_data, strict, _level)
[docs] @classmethod def parse_bytesio(cls, script, message=None, tx_data=None, strict=True, _level=0): """ Parse raw script and return Script object. Extracts script commands, keys, signatures and other data. :param script: Raw script to parse in bytes, BytesIO or hexadecimal string format :type script: BytesIO :param message: Signed message to verify, normally a transaction hash :type message: bytes :param tx_data: Dictionary with extra information needed to verify script. Such as 'redeemscript' for multisignature scripts and 'blockcount' for time locked scripts :type tx_data: dict :param strict: Raise exception when script is malformed, incomplete or not understood. Default is True :type strict: bool :param _level: Internal argument used to avoid recursive depth :type _level: int :return Script: """ commands = [] signatures = [] keys = [] blueprint = [] redeemscript = b'' sigs_required = None # hash_type = SIGHASH_ALL # todo: check hash_type = None if not tx_data: tx_data = {} while script: chb = script.read(1) if not chb: break ch = int.from_bytes(chb, 'big') data = None data_length = 0 if 1 <= ch <= 75: # Data` data_length = ch elif ch == op.op_pushdata1: data_length = int.from_bytes(script.read(1), 'little') elif ch == op.op_pushdata2: data_length = int.from_bytes(script.read(2), 'little') if data_length: data = script.read(data_length) if len(data) != data_length: msg = "Malformed script, not enough data found" if strict: raise ScriptError(msg) else: _logger.warning(msg) if data: data_type = get_data_type(data) commands.append(data) if data_type == 'signature': sig = Signature.parse_bytes(data) signatures.append(sig) hash_type = sig.hash_type blueprint.append('signature') elif data_type == 'signature_object': signatures.append(data) hash_type = data.hash_type blueprint.append('signature') elif data_type == 'key': keys.append(Key(data)) blueprint.append('key') elif data_type == 'key_object': keys.append(data) blueprint.append('key') elif data_type[:4] == 'data': # FIXME: This is arbitrary blueprint.append('data-%d' % len(data)) elif len(commands) >= 2 and commands[-2] == op.op_return: blueprint.append('data-%d' % len(data)) else: # FIXME: Only parse sub-scripts if script is expected try: if _level >= 1: blueprint.append('data-%d' % len(data)) else: s2 = Script.parse_bytes(data, _level=_level+1) commands.pop() commands += s2.commands blueprint += s2.blueprint keys += s2.keys signatures += s2.signatures redeemscript = s2.redeemscript sigs_required = s2.sigs_required except (ScriptError, IndexError): blueprint.append('data-%d' % len(data)) else: # Other opcode commands.append(ch) blueprint.append(ch) s = cls(commands, message, keys=keys, signatures=signatures, blueprint=blueprint, tx_data=tx_data, hash_type=hash_type) script.seek(0) s._raw = script.read() s.script_types = _get_script_types(blueprint) if 'unknown' in s.script_types: s.script_types = ['unknown'] # Extract extra information from script data for st in s.script_types[:1]: if st == 'multisig': s.redeemscript = s.raw s.sigs_required = s.commands[0] - 80 if s.sigs_required > len(keys): raise ScriptError("Number of signatures required (%d) is higher then number of keys (%d)" % (s.sigs_required, len(keys))) if len(s.keys) != s.commands[-2] - 80: raise ScriptError("%d keys found but %d keys expected" % (len(s.keys), s.commands[-2] - 80)) elif st in ['p2wpkh', 'p2wsh', 'p2sh'] and len(s.commands) > 1: s.public_hash = s.commands[1] elif st == 'p2pkh' and len(s.commands) > 2: s.public_hash = s.commands[2] s.redeemscript = redeemscript if redeemscript else s.redeemscript if s.redeemscript and 'redeemscript' not in s.tx_data: s.tx_data['redeemscript'] = s.redeemscript s.sigs_required = sigs_required if sigs_required else s.sigs_required return s
[docs] @classmethod def parse_hex(cls, script, message=None, tx_data=None, strict=True, _level=0): """ Parse raw script and return Script object. Extracts script commands, keys, signatures and other data. Wrapper for the :func:`parse_bytesio` method. Convert hexadecimal string script to BytesIO. >>> Script.parse_hex('76a914af8e14a2cecd715c363b3a72b55b59a31e2acac988ac') <Script([op.op_dup, op.op_hash160, data-20, op.op_equalverify, op.op_checksig])> :param script: Raw script to parse in hexadecimal string format :type script: str :param message: Signed message to verify, normally a transaction hash :type message: bytes :param tx_data: Dictionary with extra information needed to verify script. Such as 'redeemscript' for multisignature scripts and 'blockcount' for time locked scripts :type tx_data: dict :param strict: Raise exception when script is malformed, incomplete or not understood. Default is True :type strict: bool :param _level: Internal argument used to avoid recursive depth :type _level: int :return Script: """ return cls.parse_bytesio(BytesIO(bytes.fromhex(script)), message, tx_data, strict, _level)
[docs] @classmethod def parse_bytes(cls, script, message=None, tx_data=None, strict=True, _level=0): """ Parse raw script and return Script object. Extracts script commands, keys, signatures and other data. Wrapper for the :func:`parse_bytesio` method. Convert bytes script to BytesIO. :param script: Raw script to parse in bytes format :type script: bytes :param message: Signed message to verify, normally a transaction hash :type message: bytes :param tx_data: Dictionary with extra information needed to verify script. Such as 'redeemscript' for multisignature scripts and 'blockcount' for time locked scripts :type tx_data: dict :param strict: Raise exception when script is malformed or incomplete :type strict: bool :param _level: Internal argument used to avoid recursive depth :type _level: int :return Script: """ return cls.parse_bytesio(BytesIO(script), message, tx_data, strict, _level)
def __repr__(self): s_items = [] for command in self.blueprint: if isinstance(command, int): s_items.append('op.' + opcodenames.get(command, 'unknown-op-%s' % command).lower()) else: s_items.append(command) return '<Script([' + ', '.join(s_items) + '])>' def __str__(self): s_items = [] for command in self.blueprint: if isinstance(command, int): s_items.append(opcodenames.get(command, 'unknown-op-%s' % command)) else: s_items.append(command) return ' '.join(s_items) def __add__(self, other): self.commands += other.commands self._raw += other.raw if other.message and not self.message: self.message = other.message self.is_locking = None self.keys += other.keys self.signatures += other.signatures self._blueprint += other._blueprint self.script_types = _get_script_types(self._blueprint) if other.tx_data and not self.tx_data: self.tx_data = other.tx_data if other.redeemscript and not self.redeemscript: self.redeemscript = other.redeemscript return self def __bool__(self): return bool(self.commands) def __hash__(self): return hash160(self.raw) @property def blueprint(self): # TODO: create blueprint from commands if empty return self._blueprint @property def raw(self): if not self._raw: self._raw = self.serialize() return self._raw
[docs] def serialize(self): """ Serialize script. Return all commands and data as bytes >>> s = Script.parse_hex('76a914af8e14a2cecd715c363b3a72b55b59a31e2acac988ac') >>> s.serialize().hex() '76a914af8e14a2cecd715c363b3a72b55b59a31e2acac988ac' :return bytes: """ raw = b'' for cmd in self.commands: if isinstance(cmd, int): raw += bytes([cmd]) else: raw += data_pack(cmd) self._raw = raw return raw
[docs] def serialize_list(self): """ Serialize script and return commands and data as list >>> s = Script.parse_hex('76a9') >>> s.serialize_list() [b'v', b'\\xa9'] :return list of bytes: """ clist = [] for cmd in self.commands: if isinstance(cmd, int): clist.append(bytes([cmd])) else: clist.append(bytes(cmd)) return clist
[docs] def evaluate(self, message=None, tx_data=None): """ Evaluate script, run all commands and check if it is valid >>> s = Script([op.op_2, op.op_4, op.op_add]) >>> s <Script([op.op_2, op.op_4, op.op_add])> >>> s.blueprint [82, 84, 147] >>> s.evaluate() True >>> lock_script = bytes.fromhex('76a914f9cc73824051cc82d64a716c836c54467a21e22c88ac') >>> unlock_script = bytes.fromhex('483045022100ba2ec7c40257b3d22864c9558738eea4d8771ab97888368124e176fdd6d7cd8602200f47c8d0c437df1ea8f9819d344e05b9c93e38e88df1fc46abb6194506c50ce1012103e481f20561573cfd800e64efda61405917cb29e4bd20bed168c52b674937f535') >>> s = Script.parse_bytes(unlock_script + lock_script) >>> transaction_hash = bytes.fromhex('12824db63e7856d00ee5e109fd1c26ac8a6a015858c26f4b336274f6b52da1c3') >>> s.evaluate(message=transaction_hash) True :param message: Signed message to verify, normally a transaction hash. Leave empty to use Script.message. If supplied Script.message will be ignored. :type message: bytes :param tx_data: Dictionary with extra information needed to verify script. Such as 'redeemscript' for multisignature scripts and 'blockcount' for time locked scripts. Leave emtpy to use Script.tx_data. If supplied Script.tx_data will be ignored :return bool: Valid or not valid """ self.message = self.message if message is None else message self.tx_data = self.tx_data if tx_data is None else tx_data self.stack = Stack() commands = self.commands[:] while len(commands): command = commands.pop(0) if isinstance(command, int): if command == op.op_0: # OP_0 self.stack.append(encode_num(0)) elif command == op.op_1negate: # OP_1NEGATE self.stack.append(encode_num(-1)) elif op.op_1 <= command <= op.op_16: # OP_1 to OP_16 self.stack.append(encode_num(command-80)) elif command == op.op_if or command == op.op_notif: method = opcodenames[command].lower() method = getattr(self.stack, method) if not method(commands): return False else: method_name = opcodenames[command].lower() if method_name not in dir(self.stack): raise ScriptError("Method %s not found" % method_name) try: method = getattr(self.stack, method_name) if method_name == 'op_checksig' or method_name == 'op_checksigverify': res = method(self.message) elif method_name == 'op_checkmultisig' or method_name == 'op_checkmultisigverify': res = method(self.message, self.tx_data) elif method_name == 'op_checklocktimeverify': res = self.stack.op_checklocktimeverify( self.tx_data['sequence'], self.tx_data.get('locktime')) elif method_name == 'op_checksequenceverify': res = self.stack.op_checksequenceverify(self.tx_data['sequence'], self.tx_data['version']) else: res = method() if res is False: return False except Exception as e: _logger.warning("Stack evaluate error: %s" % e) return False else: self.stack.append(command) if len(self.stack) == 0: return False if self.stack.pop() == b'': return False return True
[docs]class Stack(list): """ The Stack object is a child of the Python list object with extra operational (OP) methods. The operations as used in the Script language can be used to manipulate the stack / list. For documentation of the op-methods you could check https://en.bitcoin.it/wiki/Script """
[docs] @classmethod def from_ints(cls, list_ints): """ Create a Stack item with a list of integers. >>> Stack.from_ints([1, 2]) [b'\\x01', b'\\x02'] :param list_ints: :return: """ return Stack([encode_num(n) for n in list_ints])
[docs] def as_ints(self): """ Return the Stack as list of integers >>> st = Stack.from_ints([1, 2]) >>> st.as_ints() [1, 2] :return list of int: """ # TODO: What to do with data/hashes? return Stack([decode_num(x) for x in self])
[docs] def pop_as_number(self): """ Pop the latest item from the list and decode as number >>> st = Stack.from_ints([1, 2]) >>> st.pop_as_number() 2 :return int: """ return decode_num(self.pop())
[docs] def is_arithmetic(self, items=1): """ Check if top stack item is or last stock are arithmetic and has no more then 4 bytes :return bool: """ if len(self) < items: raise IndexError("Not enough items in list to run operation. Items %d, expected %d" % (len(self), items)) for i in self[-items:]: if len(i) > 4: return False return True
# def op_ver() # unused
[docs] def op_if(self, commands): true_items = [] false_items = [] current_array = true_items found = False num_endifs_needed = 1 while len(commands) > 0: item = commands.pop(0) if item in (99, 100): # nested if, we have to go another endif num_endifs_needed += 1 current_array.append(item) elif num_endifs_needed == 1 and item == 103: current_array = false_items elif item == 104: if num_endifs_needed == 1: found = True break else: num_endifs_needed -= 1 current_array.append(item) else: current_array.append(item) if not found: return False element = self.pop() if decode_num(element) == 0: commands[:0] = false_items else: commands[:0] = true_items return True
[docs] def op_notif(self, commands): element = self.pop() if decode_num(element) == 0: self.append(b'\1') else: self.append(b'\0') return self.op_if(commands)
[docs] def op_nop(self): return True
[docs] def op_verify(self): if self.pop() == b'': return False return True
[docs] @staticmethod def op_return(): return False
[docs] def op_2drop(self): self.pop() self.pop() return True
[docs] def op_2dup(self): if len(self) < 2: raise ValueError("Stack op_2dup method requires minimum of 2 stack items") self.extend(self[-2:]) return True
[docs] def op_3dup(self): if len(self) < 3: raise ValueError("Stack op_3dup method requires minimum of 3 stack items") self.extend(self[-3:]) return True
[docs] def op_2over(self): if len(self) < 4: raise ValueError("Stack op_2over method requires minimum of 4 stack items") self.extend(self[-4:-2]) return True
[docs] def op_2rot(self): self.extend([self.pop(-6), self.pop(-5)]) return True
[docs] def op_2swap(self): self[-2:-2] = [self.pop(), self.pop()] return True
[docs] def op_ifdup(self): if not len(self): raise ValueError("Stack op_ifdup method requires minimum of 1 stack item") if self[-1] != b'': self.append(self[-1]) return True
[docs] def op_depth(self): self.append(encode_num(len(self))) return True
[docs] def op_drop(self): self.pop() return True
[docs] def op_dup(self): if not len(self): return False self.append(self[-1]) return True
[docs] def op_nip(self): self.pop(-2) return True
[docs] def op_over(self): if len(self) < 2: raise ValueError("Stack op_over method requires minimum of 2 stack items") self.append(self[-2]) return True
[docs] def op_pick(self): self.append(self[-self.pop_as_number()]) return True
[docs] def op_roll(self): self.append(self.pop(-self.pop_as_number())) return True
[docs] def op_rot(self): self.append(self.pop(-3)) return True
[docs] def op_swap(self): self.append(self.pop(-2)) return True
[docs] def op_tuck(self): self.append(self[-2]) return True
[docs] def op_size(self): self.append(encode_num(len(self[-1]))) return True
[docs] def op_equal(self): self.append(b'\x01' if self.pop() == self.pop() else b'') return True
[docs] def op_equalverify(self): self.op_equal() return self.op_verify()
# # 'op_reserved1': unused # # 'op_reserved2': unused
[docs] def op_1add(self): if not self.is_arithmetic(): return False self.append(encode_num(self.pop_as_number() + 1)) return True
[docs] def op_1sub(self): if not self.is_arithmetic(): return False self.append(encode_num(self.pop_as_number() - 1)) return True
[docs] def op_negate(self): if not self.is_arithmetic(): return False self.append(encode_num(-self.pop_as_number())) return True
[docs] def op_abs(self): if not self.is_arithmetic(): return False self.append(encode_num(abs(self.pop_as_number()))) return True
[docs] def op_not(self): if not self.is_arithmetic(): return False self.append(b'\1' if self.pop() == b'' else b'') return True
[docs] def op_0notequal(self): if not self.is_arithmetic(): return False self.append(b'' if self.pop() == b'' else b'\1') return True
[docs] def op_add(self): if not self.is_arithmetic(2): return False self.append(encode_num(self.pop_as_number() + self.pop_as_number())) return True
[docs] def op_sub(self): if not self.is_arithmetic(2): return False self.append(encode_num(self.pop_as_number() - self.pop_as_number())) return True
[docs] def op_booland(self): if not self.is_arithmetic(2): return False a = self.pop() b = self.pop() if a != b'' and b != b'': self.append(b'\1') else: self.append(b'') return True
[docs] def op_boolor(self): if not self.is_arithmetic(2): return False a = self.pop() b = self.pop() if a != b'' or b != b'': self.append(b'\1') else: self.append(b'') return True
[docs] def op_numequal(self): if not self.is_arithmetic(2): return False if self.pop() == self.pop(): self.append(b'\1') else: self.append(b'') return True
[docs] def op_numequalverify(self): self.op_numequal() return self.op_verify()
[docs] def op_numnotequal(self): if not self.is_arithmetic(2): return False if self.pop() != self.pop(): self.append(b'\1') else: self.append(b'') return True
[docs] def op_numlessthan(self): if not self.is_arithmetic(2): return False if self.pop_as_number() < self.pop_as_number(): self.append(b'\1') else: self.append(b'') return True
[docs] def op_numgreaterthan(self): if not self.is_arithmetic(2): return False if self.pop_as_number() > self.pop_as_number(): self.append(b'\1') else: self.append(b'') return True
[docs] def op_numlessthanorequal(self): if not self.is_arithmetic(2): return False if self.pop_as_number() <= self.pop_as_number(): self.append(b'\1') else: self.append(b'') return True
[docs] def op_numgreaterthanorequal(self): if not self.is_arithmetic(2): return False if self.pop_as_number() >= self.pop_as_number(): self.append(b'\1') else: self.append(b'') return True
[docs] def op_min(self): if not self.is_arithmetic(2): return False a = self.pop_as_number() b = self.pop_as_number() self.append(encode_num(a) if a < b else encode_num(b)) return True
[docs] def op_max(self): if not self.is_arithmetic(2): return False a = self.pop_as_number() b = self.pop_as_number() self.append(encode_num(a) if a > b else encode_num(b)) return True
[docs] def op_within(self): if not self.is_arithmetic(3): return False x = self.pop_as_number() vmin = self.pop_as_number() vmax = self.pop_as_number() if vmin <= x < vmax: self.append(b'\1') else: self.append(b'') return True
[docs] def op_ripemd160(self): self.append(hashlib.new('ripemd160', self.pop()).digest()) return True
[docs] def op_sha1(self): self.append(hashlib.sha1(self.pop()).digest()) return True
[docs] def op_sha256(self): self.append(hashlib.sha256(self.pop()).digest()) return True
[docs] def op_hash160(self): self.append(hash160(self.pop())) return True
[docs] def op_hash256(self): self.op_sha256() self.op_sha256() return True
[docs] def op_checksig(self, message, _=None): public_key = self.pop() signature = self.pop() signature = Signature.parse_bytes(signature, public_key=public_key) if signature.verify(message, public_key): self.append(b'\1') else: self.append(b'') return True
[docs] def op_checksigverify(self, message, _=None): return self.op_checksig(message, None) and self.op_verify()
[docs] def op_checkmultisig(self, message, data=None): n = decode_num(self.pop()) pubkeys = [] for _ in range(n): pubkeys.append(self.pop()) m = decode_num(self.pop()) signatures = [] for _ in range(m): signatures.append(self.pop()) if len(self): # OP_CHECKMULTISIG bug self.pop() sigcount = 0 for pubkey in pubkeys: s = Signature.parse_bytes(signatures[sigcount]) if s.verify(message, pubkey): sigcount += 1 if sigcount >= len(signatures): break if sigcount == len(signatures): if data and 'redeemscript' in data: self.append(data['redeemscript']) else: self.append(b'\1') else: self.append(b'') return True
[docs] def op_checkmultisigverify(self, message, data=None): return self.op_checkmultisig(message, data) and self.op_verify()
[docs] def op_nop1(self): return True
[docs] def op_checklocktimeverify(self, sequence, tx_locktime): """ Implements CHECKLOCKTIMEVERIFY opcode (CLTV) as defined in BIP65. CLTV is an absolute timelock and is added to an output locking script. It locks an output until a certain time or block. :param sequence: Sequence value from the transaction. Must be 0xffffffff to be valid :type sequence: int :param tx_locktime: The nLocktime value from the transaction in blocks or as Median Time Past timestamp :type tx_locktime: int :return bool: """ # TODO: Check, add to Script/Transaction and add unittests if not tx_locktime: return False if sequence == 0xffffffff: return False locktime = decode_num(self[-1]) if locktime < 0: return False if locktime < 50000000 < tx_locktime or locktime > 50000000 > tx_locktime: return False if tx_locktime < locktime: return False return True
[docs] def op_checksequenceverify(self, sequence, version): """ Implements CHECKSEQUENCEVERIFY opcode (CSV) as defined in BIP112 CSV is a relative timelock and is added to an output locking script. It locks an output for a certain number of blocks or time. :param sequence: Sequence value from the transaction :type sequence: int :param version: Transaction verion. Must be 2 or higher :type version: int :return bool: """ # TODO: Implement # if sequence == 0xffffffff: # return False # locktime = decode_num(self[-1]) # if locktime < 0: # return False # if locktime != 0xffffffff: # if version < 2: # return False # return True return NotImplementedError
[docs] def op_nop4(self): return True
[docs] def op_nop5(self): return True
[docs] def op_nop6(self): return True
[docs] def op_nop7(self): return True
[docs] def op_nop8(self): return True
[docs] def op_nop9(self): return True
[docs] def op_nop10(self): return True
# # 'op_invalidopcode':
[docs]def encode_num(num): """ Encode number as byte used in Script language. Bitcoin specific little endian format with sign for negative integers. >>> encode_num(0) b'' >>> encode_num(1) b'\\x01' >>> encode_num(1000) b'\\xe8\\x03' >>> encode_num(1000000) b'@B\\x0f' :param num: number to represent :type num: int :return bytes: """ if num == 0: return b'' abs_num = abs(num) negative = num < 0 length = (num.bit_length() + 7) // 8 encoded = abs_num.to_bytes(length, byteorder='little') if encoded[-1] & 0x80: if negative: encoded += b'\x80' else: encoded += b'\0' elif negative: encoded = encoded[:-1] + (encoded[-1] + 0x80).to_bytes(1, 'big') return encoded
[docs]def decode_num(encoded): """ Decode byte representation of number used in Script language to integer. >>> decode_num(b'') 0 >>> decode_num(b'@B\\x0f') 1000000 :param encoded: Number as bytes :type encoded: bytes :return int: """ if encoded == b'': return 0 negative = False if encoded[-1] & 0x80: negative = True element = encoded[:-1] + (encoded[-1] & 0x7f).to_bytes(1, 'big') num = int.from_bytes(element, 'little') if negative: return -num else: return num