Source code for polymath.qube

##########################################################################################
# polymath/qube.py: Base class for all PolyMath subclasses.
##########################################################################################

import numpy as np
import numbers

from polymath.unit import Unit


[docs] class Qube(object): """The base class for all PolyMath subclasses. The PolyMath subclasses, e.g., Scalar, Vector3, Matrix3, etc., define one or more possibly multidimensional items. Unlike NumPy ndarrays, this class makes a clear distinction between the dimensions associated with the items and any additional, leading dimensions that define an array of such items. The "shape" is defined by the leading axes only, so a 2x2 array of 3x3 matrices would have shape (2,2,3,3) according to NumPy but has shape (2,2) according to PolyMath. Standard NumPy rules of broadcasting apply, but only on the array dimensions, not on the item dimensions. In other words, you can multiply a (2,2) array of 3x3 matrices by a (5,1,2) array of 3-vectors, yielding a (5,2,2) array of 3-vectors. PolyMath objects are designed as lightweight wrappers on NumPy ndarrays. All standard mathematical operators and indexing/slicing options are defined. One can generally mix PolyMath arithmetic with scalars, NumPy ndarrays, NumPy MaskedArrays, or anything array-like. In every object, a boolean mask is maintained in order to identify undefined array elements. Operations that would otherwise raise errors such as 1/0 and sqrt(-1) are masked out so that run-time errors can be avoided. See more about masks below. PolyMath objects also support embedded units using the Unit class. However, the internal values in a PolyMath object are always held in standard units of kilometers, seconds and radians, or arbitrary combinations thereof. The unit is primarily used to affect the appearance of numbers during input and output. PolyMath objects can be either read-only or read-write. Read-only objects are prevented from modification to the extent that Python makes this possible. Operations on read-only objects should always return read-only objects. PolyMath objects can track associated derivatives and partial derivatives, which are represented by other PolyMath objects. Mathematical operations generally carry all derivatives along so that, for example, if x.d_dt is the derivative of x with respect to t, then x.sin().d_dt will be the derivative of sin(x) with respect to t. The denominators of partial derivatives are represented by splitting the item shape into a numerator shape plus a denominator shape. As a result, for example, the partial derivatives of a Vector3 object (item shape (3,)) with respect to a Pair (item shape (2,)) will have overall item shape (3,2). The PolyMath subclasses generally do not constrain the shape of the denominator, just the numerator. As a result, the aforementioned partial derivatives can still be represented by a Vector3 object. Properties: shape (tuple): The leading axes of the object, i.e., those that are not considered part of the items. rank (int): The number of axes belonging to the items. nrank (int): The number of numerator axes associated with the items. drank (int): The number of denominator axes associated with the items. item (tuple): The shape of the individual items. numer (tuple): The shape of the numerator items. denom (tuple): The shape of the denominator items. values (numpy.ndarray, float, int, or bool): The object's data, with shape object.shape + object.item. If the object has a unit, then the values are in default units (km, sec, etc.) rather than in the specified unit. vals (numpy.ndarray, float, int, or bool): Alternative name for `values`. mask (numpy.ndarray or bool): The array's mask. A scalar False means the object is entirely unmasked; a scalar True means it is entirely masked. Otherwise, it is a boolean array of shape object.shape. unit (Unit or None): The unit of the array, if any. None indicates no unit. derivs (dict): A dictionary of the names and values of any derivatives, each represented by additional PolyMath object. readonly (bool): True if the object cannot (or at least should not) be modified. A determined user may be able to alter a read-only object, but the API makes this more difficult. size (int): The number of elements in the shape. isize (int): The number of elements in each item. nsize (int): The number of elements in the numerator of the items. dsize (int): The number of elements in the denominator of the items. """ # This prevents binary operations of the form: # <np.ndarray> <op> <Qube> # from executing the ndarray operation instead of the polymath operation __array_priority__ = 1 # Global attribute to be used for testing _DISABLE_CACHE = False # If this global is set to True, the shrink/unshrink methods are disabled. # Calculations done with and without shrinking should always produce the same results, # although they may be slower with shrinking disabled. Used for testing and debugging. _DISABLE_SHRINKING = False # If this global is set to True, the unshrunk method will ignore any cached value of # its un-shrunken equivalent. Used for testing and debugging. _IGNORE_UNSHRUNK_AS_CACHED = False # Default class constants, to be overridden as needed by subclasses... _NRANK = None # The number of numerator axes; None to leave this unconstrained. _NUMER = None # Shape of the numerator; None to leave unconstrained. _FLOATS_OK = True # True to allow floating-point numbers. _INTS_OK = True # True to allow integers. _BOOLS_OK = True # True to allow booleans. _UNITS_OK = True # True to allow units; False to disallow them. _DERIVS_OK = True # True to allow derivatives and denominators; False to disallow.
[docs] def __new__(subtype, *values, **keywords): """Create a new, un-initialized object given a Qube subclass.""" return object.__new__(subtype)
[docs] def __init__(self, arg, mask=False, *, derivs={}, unit=None, nrank=None, drank=None, example=None, default=None, op=''): """Default constructor. Parameters: arg (Qube, array-like, float, in, or bool), : An object to define the numeric value(s) of the returned object. If this object is read-only, then the returned object will be entirely read-only. Otherwise, the object will be read-writable. The values are generally given in standard units of km, seconds and radians, regardless of the specified unit. mask (Boolean, array-like, or bool, optional): The mask for the object. Use None to copy the mask from the example object. False (the default) leaves the object un-masked. derivs (dict, optional): Derivatives represented as PolyMath objects. Use None to make a copy of the derivs attribute of the example object, or {} (the default) for no derivatives. All derivatives are broadcasted to the shape of the object if necessary. unit (Unit, optional): The unit of the object. Use None to infer the unit from the example object; use False to suppress the unit. nrank (int, optional): The number of numerator axes in the returned object; None to derive the rank from the input data and/or the subclass. drank (int, optional): The number of denominator axes in the returned object; None to derive it from the input data and/or the subclass. example (Qube, optional): Another Qube object from which to copy any input arguments except derivs that have not been explicitly specified. default (array-like, float, int, or bool): Value to use where masked. This is typically a constant that will not "break" most arithmetic calculations. If it is an array, it must be of the same shape as the items. op (str, optional): Name of an operation to include in an error message if something goes wrong. Raises: TypeError: If the data type of `arg` or `mask` is invalid. TypeError: If `example` is not an instance of Qube. ValueError: If the shape of `mask` is incompatible with object. ValueError: If `derivs` or `unit` are specified but are disallowed by the Qube subclass. ValueError: If `nrank` is incompatible with the Qube subclass. ValueError: If `drank` is specified but the Qube subclass disallows derivatives. ValueError: If the dimensions of `arg` are incompatible with the subclass. """ opstr = Qube._opstr(self, op) # Set defaults based on a Qube input if isinstance(arg, Qube): if derivs is None: derivs = arg._derivs.copy() # shallow copy if unit is None: unit = arg._unit if nrank is None: nrank = arg._nrank elif nrank != arg._nrank: # nranks _must_ be compatible self._nrank = nrank Qube._raise_incompatible_numers(op, self, arg) if drank is None: drank = arg._drank elif drank != arg._drank: # dranks _must_ be compatible self._drank = drank Qube._raise_incompatible_denoms(op, self, arg) if default is None: default = arg._default # Set defaults based on an example object if example is not None: if not isinstance(example, Qube): raise TypeError(f'{opstr} example value is not a Qube subclass') if mask is None: mask = example._mask if unit is None and self._UNITS_OK: unit = example._unit if nrank is None and self._NRANK is None: nrank = example._nrank if drank is None: drank = example._drank if default is None: default = example._default # Validate inputs nrank = nrank or self._NRANK or 0 drank = drank or 0 rank = nrank + drank if derivs and not self._DERIVS_OK: raise ValueError(f'{opstr} derivatives are disallowed') if unit and not self._UNITS_OK: raise TypeError(f'{opstr} unit is disallowed: {unit}') if self._NRANK is not None: if nrank is not None and nrank != self._NRANK: raise ValueError(f'invalid {opstr} numerator rank: {nrank}') if drank and not self._DERIVS_OK: raise ValueError(f'{opstr} denominators are disallowed') # Get the value and check its shape (values, arg_mask) = Qube._as_values_and_mask(arg, opstr=opstr) full_shape = np.shape(values) if len(full_shape) < rank: raise ValueError(f'invalid {opstr} array shape {full_shape}: ' f'minimum rank = {nrank} + {drank}') dd = len(full_shape) - drank nn = dd - nrank denom = full_shape[dd:] numer = full_shape[nn:dd] item = full_shape[nn:] shape = full_shape[:nn] # Fill in the values self._values = self._suitable_value(values, numer=numer, denom=denom, opstr=opstr) self._is_array = isinstance(self._values, np.ndarray) self._is_scalar = not self._is_array # Get the mask and check its shape mask = Qube.or_(arg_mask, Qube._as_mask(mask, opstr=opstr)) collapse = isinstance(arg, np.ma.MaskedArray) self._mask = Qube._suitable_mask(mask, shape=shape, broadcast=True, collapse=collapse, check=False, opstr=opstr) # Fill in the remaining shape info self._shape = shape self._ndims = len(shape) self._rank = rank self._nrank = nrank self._drank = drank self._item = item self._numer = numer self._denom = denom self._size = int(np.prod(shape)) self._isize = int(np.prod(item)) self._nsize = int(np.prod(numer)) self._dsize = int(np.prod(denom)) # Fill in the unit self._unit = None if Qube.is_one_false(unit) else unit # The object is read-only if the values array is read-only self._readonly = Qube._array_is_readonly(self._values) if self._readonly: Qube._array_to_readonly(self._mask) # Used for anything we want to cache in association with an object. This cache # will be cleared whenever the object is modified in any way. self._cache = {} # Install the derivs (converting to read-only if necessary) self._derivs = {} if derivs: self.insert_derivs(derivs) # Used only for if clauses; filled in when needed self._truth_if_any = False self._truth_if_all = False # Fill in the default if default is not None and np.shape(default) == item: pass elif hasattr(self, '_DEFAULT_VALUE') and drank == 0: default = self._DEFAULT_VALUE elif item: default = np.ones(item) else: default = 1 dtype = Qube._dtype(self._values) self._default = Qube._casted_to_dtype(default, dtype)
###################################################################################### # Builtin type support ###################################################################################### _PREFER_BUILTIN_TYPES = False
[docs] @staticmethod def prefer_builtins(status=None): """Set a global flag defining whether certain functions return a Python builtin type, rather than a Qube subclass, if possible. Parameters: status (bool, optional): True to favor Python builtin types; False otherwise. Omit this input to leave the global setting unchanged (but return it). Returns: bool: True if builtins are globally preferred; False otherwise. """ if status is not None: Qube._PREFER_BUILTIN_TYPES = status return Qube._PREFER_BUILTIN_TYPES
[docs] def as_builtin(self, masked=None): """This object as a Python built-in class (float, int, or bool) if the conversion can be done without loss of information. Parameters: masked (float, int, or bool, optional): Value to return if the shape of this object is () and it is masked. Returns: (Qube, float, int, bool, or None): This object's `values` attribute if its shape is () and it is unmasked; the value of `masked` if the shape is () and it is masked; otherwise, this object. """ values = self._values if np.size(values) == 0: return self # previously, erroneously returned `masked` if np.shape(values): return self # Now we know shape is () if self._mask: return self if masked is None else masked if not self.is_unitless(): return self if isinstance(values, (bool, np.bool_)): return bool(values) if isinstance(values, numbers.Integral): return int(values) if isinstance(values, numbers.Real): return float(values) return self # This shouldn't happen # noqa
###################################################################################### # Support functions ###################################################################################### @staticmethod def _has_qube(arg): """True if the given list or tuple contains a Qube somewhere within.""" if isinstance(arg, (list, tuple)): return (any(isinstance(item, Qube) for item in arg) or any(Qube._has_qube(item) for item in arg)) return False @staticmethod def _has_masked_array(arg): """True if the given list or tuple contains a MaskedArray somewhere within.""" if isinstance(arg, (list, tuple)): return (any(isinstance(item, np.ma.MaskedArray) for item in arg) or any(Qube._has_masked_array(item) for item in arg)) return False @staticmethod def _as_values_and_mask(arg, opstr=''): """This object converted to a scalar or Numpy array with optional mask. Parameters: arg: object to convert to a scalar or array. opstr (str, optional): Name of operation string to include in any error message. Returns: tuple: (`value`, `mask`) as inferred from `arg`. Raises: TypeError: If the data type of `arg` is invalid. """ if isinstance(arg, numbers.Real): return (arg, False) if isinstance(arg, np.ma.MaskedArray): return (arg.data, arg.mask) if isinstance(arg, np.ndarray): return (arg, False) if isinstance(arg, Qube): return (arg._values, arg._mask) if isinstance(arg, (list, tuple)): if Qube._has_qube(arg): merged = Qube.stack(*arg) return (merged._values, merged._mask) elif Qube._has_masked_array(arg): merged = np.ma.stack(*arg) return (merged.data, merged.mask) else: merged = np.array(arg) return (merged, False) if isinstance(arg, np.bool_): return (bool(arg), False) _opstr = ' ' + opstr if opstr else '' raise TypeError(f'invalid{_opstr} data type: {type(arg)}') @staticmethod def _as_mask(arg, *, invert=False, masked_value=True, opstr=''): """This argument converted to a scalar bool or boolean Numpy array. Parameters: arg: The object to convert to a mask. invert (bool, optional): True to return the logical not of the mask. masked_value (bool, optional): The value to use where the input argument is masked. This value is used _after_ `invert` is applied. opstr (str, optional): Name of operation to include in any error message. Returns: (bool or NumPy.ndarray): bool or boolean array suitable for us as a mask. Raises: TypeError: If the data type of `arg` is invalid for a mask. """ # Handle most common cases first if isinstance(arg, (numbers.Real, np.bool_, type(None))): return bool(arg) != invert if type(arg) is np.ndarray: # exact type, not a subclass if arg.dtype.kind == 'b' and not invert: return arg elif invert: return arg == 0 else: return arg != 0 # Convert a list or tuple to something else if isinstance(arg, (list, tuple)): if Qube._has_qube(arg): arg = Qube.stack(*arg) elif Qube._has_masked_array(arg): arg = np.ma.stack(*arg) else: arg = np.array(arg) return Qube._as_mask(arg, invert=invert, masked_value=masked_value, opstr=opstr) # Handle an object with a possible mask if isinstance(arg, Qube): mask = arg._mask arg = arg._values elif isinstance(arg, np.ma.MaskedArray): mask = arg.mask arg = arg.data else: _opstr = ' ' + opstr if opstr else '' raise TypeError(f'invalid{_opstr} mask type: {type(arg).__name__}') # Handle a shapeless mask if isinstance(mask, (bool, np.bool_)): if mask: # entirely masked return bool(masked_value) else: # entirely unmasked return Qube._as_mask(arg, invert=invert, masked_value=masked_value, opstr=opstr) # Copy the arg and merge the mask if invert: merged = (arg == 0) else: merged = (arg != 0) merged[mask] = masked_value return merged @staticmethod def _suitable_mask(arg, shape, *, collapse=False, broadcast=False, invert=False, masked_value=True, check=False, opstr=''): """This argument converted to a scalar bool or boolean Numpy array of suitable shape to use as a mask. Parameters: arg: The object to convert to a mask. shape (tuple): Shape of the required mask. collapse (bool, optional): True to merge the extraneous axes of a mask if its rank is greater than that of the given shape. expand (bool, optional): True to broadcast this mask if its rank is less than that of the given shape. invert (bool, optional): True to return the logical not of the mask. masked_value (bool, optional): The value to use where the input argument is nmasked. This value is used _after_ `invert` is applied. check (bool, optional): True to check for an array containing all False values, and if so, replace it with a single value of False. opstr (str, optional): Name of operation to include in any error message. Returns: (bool or NumPy.ndarray): bool or boolean mask array. Raises: TypeError: If the data type of `arg` is invalid for a mask. ValueError: If the mask is incompatible with the specified `shape`. """ mask = Qube._as_mask(arg, invert=invert, masked_value=masked_value, opstr=opstr) if isinstance(mask, bool): return mask if mask.shape == shape: if check and not np.any(mask): return False return mask new_rank = len(shape) if collapse and mask.ndim > new_rank: axes = tuple(range(new_rank, mask.ndim)) mask = np.any(mask, axis=axes) if not isinstance(mask, np.ndarray): return bool(mask) if mask.shape == shape: return mask if broadcast: try: mask = np.broadcast_to(mask, shape) except ValueError: pass else: Qube._array_to_readonly(mask) return mask opstr_ = opstr + ' ' if opstr else '' raise ValueError(f'{opstr_}object and mask shape mismatch: ' f'{shape}, {mask.shape}') @staticmethod def _dtype_and_value(arg, masked_value=0, opstr=''): """Tuple (dtype, value), where dtype is one of "float", "int", or "bool". The value is converted to a builtin type if it is scalar; otherwise it is returned as an array with its original dtype. Parameters: arg (Qube, array-like, float, int, or bool): Object to interpret. masked_value (float, int, or bool): Value to use where `arg` is masked. opstr (str, optional): Name of operation to include in any error message. Returns: tuple: (`dtype`, `value`), where `dtype` is one of "float", "int", or "bool", and `value` is the result of converting `arg` to a NumPy.ndarray, float, int, or bool. Raises: TypeError: If the type of `arg` is invalid. """ # Handle the easy and common cases first if isinstance(arg, (bool, np.bool_)): return ('bool', bool(arg)) if isinstance(arg, numbers.Integral): return ('int', int(arg)) if isinstance(arg, numbers.Real): return ('float', float(arg)) if isinstance(arg, np.ndarray): if arg.shape == (): # shapeless array return Qube._dtype_and_value(arg[()], opstr=opstr) kind = arg.dtype.kind if kind == 'f': return ('float', arg) if kind in ('i', 'u'): return ('int', arg) if kind == 'b': return ('bool', arg) _opstr = ' ' + opstr if opstr else '' raise ValueError(f'unsupported{_opstr} dtype: {arg.dtype}') # Convert a list or tuple to something else if isinstance(arg, (list, tuple)): if Qube._has_qube(arg): arg = Qube.stack(*arg) elif Qube._has_masked_array(arg): arg = np.ma.stack(*arg) else: arg = np.array(arg) return Qube._dtype_and_value(arg, opstr=opstr) # Handle an object with a possible mask if isinstance(arg, Qube): mask = arg._mask arg = arg._values elif isinstance(arg, np.ma.MaskedArray): mask = arg.mask arg = arg.data else: _opstr = ' ' + opstr if opstr else '' raise TypeError(f'unsupported{_opstr} data type: {type(arg)}') # Interpret the argument ignoring its mask (dtype, arg) = Qube._dtype_and_value(arg, opstr=opstr) # Handle a shapeless mask if isinstance(mask, (bool, np.bool_)): if mask: # entirely masked return (dtype, Qube._casted_to_dtype(masked_value, dtype)) else: # entirely unmasked return (dtype, arg) # Mask an array value arg = arg.copy() arg[mask] = masked_value return (dtype, arg) @staticmethod def _dtype(arg): """dtype of the given argument, one of "float", "int", or "bool".""" return Qube._dtype_and_value(arg)[0] @staticmethod def _casted_to_dtype(arg, dtype, masked_value=0): """This value casted to the specified dtype, one of "float", "int", or "bool". An object that is already of the requested type is returned unchanged. Note that converting floats to ints is always a "floor" operation, so -1.5 -> -2. Parameters: arg (Qube, array-like, float, int, or bool): Object to cast dtype (str): dtype to cast to, one of float", "int", or "bool". masked_value (float, int, or bool): Value to assign to a masked item in the case where the input argument is a Qube or MaskedArray. Returns: (numpy.ndarray, float, int, or bool): The result of the cast. """ if isinstance(arg, (list, tuple)): arg = np.array(arg) if isinstance(arg, Qube): if arg._mask is False: arg = arg._values else: mask = arg._mask arg = arg.without_mask(recursive=False).copy() arg[mask] = masked_value arg = arg._values elif isinstance(arg, np.ma.MaskedArray): if arg.mask is False: arg = arg.data else: mask = arg.mask arg = arg.data.copy() arg[mask] = masked_value if isinstance(arg, np.ndarray): if arg.shape == (): return Qube._casted_to_dtype(arg[()], dtype) if dtype == 'float': if arg.dtype.kind == 'f': return arg return np.asarray(arg, dtype=np.double) if dtype == 'int': if arg.dtype.kind in ('i', 'u'): return arg return (arg // 1).astype('int') # must be bool if arg.dtype.kind == 'b': return arg return (arg != 0) # Handle shapeless if dtype == 'float': return float(arg) if dtype == 'int': if isinstance(arg, numbers.Integral): return int(arg) return int(arg // 1) # bool case if isinstance(arg, (bool, np.bool_)): return bool(arg) return (arg != 0) @classmethod def _suitable_dtype(cls, dtype='float', opstr=''): """The dtype for this Qube subclass closest to a given dtype. Parameters: cls (class): Qube subclass. dtype (str, optional): Default dtype, one of "float", "int", or "bool", to return if it is compatible with the subclass. opstr (str, optional): Name of the operation to include in any error message. Returns: str: One of "float", "int", or "bool". Raises: ValueError: If a suitable dtype cannot be determined. """ if dtype == 'float': if cls._FLOATS_OK: return 'float' elif cls._INTS_OK: return 'int' else: return 'bool' elif dtype == 'int': if cls._INTS_OK: return 'int' elif cls._FLOATS_OK: return 'float' else: return 'bool' elif dtype == 'bool': if cls._BOOLS_OK: return 'bool' elif cls._INTS_OK: return 'int' else: return 'float' # Handle a NumPy dtype try: kind = np.dtype(dtype).kind except (TypeError, ValueError): pass else: if kind == 'f': return cls._suitable_dtype('float', opstr=opstr) if kind in ('i', 'u'): return cls._suitable_dtype('int', opstr=opstr) if kind == 'b': return cls._suitable_dtype('bool', opstr=opstr) _in_opstr = ' in ' + opstr if opstr else '' # noqa raise ValueError('invalid dtype{_in_opstr}: "{dtype}"') @classmethod def _suitable_numer(cls, numer=None, opstr=''): """The given numerator made suitable for this class; ValueError otherwise. Parameters: cls (class): Qube subclass. numer (tuple, optional): Numerator shape to make suitable for use; None to return the default numerator shape for this Qube subclass. opstr (str, optional): Name of operation to include in any error message. Returns: tuple: Numerator shape. Raises: ValueError: If `numer` is unspecified and `cls` does not have a default. ValueError: If `numer` is incompatible with `cls`. """ if numer is None: if cls._NUMER is not None: return cls._NUMER if not cls._NRANK: return () _in_opstr = ' in ' + opstr if opstr else '' raise ValueError(f'class {cls} does not have a default numerator{_in_opstr}') numer = tuple(numer) opstr = opstr or cls.__name__ if ((cls._NUMER is not None and numer != cls._NUMER) or (cls._NRANK is not None and len(numer) != cls._NRANK)): # noqa raise ValueError(f'invalid {opstr} numerator shape {numer}; ' f'must be {cls._NUMER}') return numer @classmethod def _suitable_value(cls, arg, *, numer=None, denom=(), expand=True, opstr=''): """This argument converted to a suitable value for this class. Parameters: cls (class): Qube subclass. arg (Qube, array-like, float, int, or bool): Object to be made suitable. numer (tuple, optional): Numerator shape; None for class default. denom (tuple, optional): Denominator shape. expand (bool, optional): True to expand the shape of the returned argument to the minimum required for the class; False to leave it with its original shape. opstr (str, optional): Name of operation to include in any error message. Returns: (numpy.ndarray, float, int, or bool): The value made suitable for `cls`. Raises: ValueError: If `arg` is incompatible with `cls`. """ # Convert arg to a valid dtype (old_dtype, arg) = Qube._dtype_and_value(arg, opstr=opstr) new_dtype = cls._suitable_dtype(old_dtype, opstr=opstr) if new_dtype != old_dtype: arg = Qube._casted_to_dtype(arg, new_dtype) # Without expansion, we're done if not expand: return arg # Get the valid numerator numer = cls._suitable_numer(numer, opstr=opstr) # Expand the arg shape if necessary item = numer + denom if len(np.shape(arg)) < len(item): temp = np.empty(item, dtype=new_dtype) temp[...] = arg arg = temp return arg
[docs] @staticmethod def or_(*masks): """The logical "or" of two or more masks, avoiding array operations if possible. Parameters: *masks (array-like or bool): One or more boolean masks. Returns: (np.ndarray or bool): New mask array or bool. """ # Two inputs is most common if len(masks) == 2: mask0 = masks[0] mask1 = masks[1] if isinstance(mask0, (bool, np.bool_)): if mask0: return True else: return mask1 if isinstance(mask1, (bool, np.bool_)): if mask1: return True else: return mask0 if mask0 is mask1: # can happen when objects share masks return mask0 return mask0 | mask1 # Handle one input if len(masks) == 1: return masks[0] # Handle three or more by recursion return Qube.or_(masks[0], Qube.or_(*masks[1:]))
[docs] @staticmethod def and_(*masks): """The logical "and" of two or more masks, avoiding array operations if possible. Parameters: *masks (array-like or bool): One or more boolean masks. Returns: (np.ndarray or bool): New mask array or bool. """ # Two inputs is most common if len(masks) == 2: mask0 = masks[0] mask1 = masks[1] if isinstance(mask0, (bool, np.bool_)): if mask0: return mask1 else: return False if isinstance(mask1, (bool, np.bool_)): if mask1: return mask0 else: return False if mask0 is mask1: # can happen when objects share masks return mask0 return mask0 & mask1 # Handle one input if len(masks) == 1: return masks[0] # Handle three or more by recursion return Qube.and_(masks[0], Qube.and_(*masks[1:]))
###################################################################################### # Alternative constructors ######################################################################################
[docs] def clone(self, *, recursive=True, preserve=[], retain_cache=False): """Fast construction of a shallow copy. Parameters: recursive (bool, optional): True to clone the derivatives of this object; False to ignore them. preserve (list, optional): Name(s) of derivatives to include even if `recursive` is False. retain_cache (bool, optional): True to retain cache except "unshrunk" and "wod"; False to return clone with an empty cache. Returns: Qube: The shallow clone. """ obj = Qube.__new__(type(self)) # Transfer attributes other than derivatives and cache for attr, value in self.__dict__.items(): if attr in ('_derivs', '_cache'): obj.__dict__[attr] = {} elif attr.startswith('d_d'): continue elif isinstance(value, dict): obj.__dict__[attr] = value.copy() else: obj.__dict__[attr] = value # Handle derivatives recursively if recursive: new_keys = set(self._derivs.keys()) elif preserve: if isinstance(preserve, str): new_keys = {preserve} else: new_keys = set(preserve) else: new_keys = set() for key in new_keys: deriv = self._derivs[key] new_deriv = deriv.clone(recursive=False, retain_cache=retain_cache) obj.insert_deriv(key, new_deriv) # Handle cache if retain_cache: obj._cache = self._cache.copy() if 'shrunk' in obj._cache: del obj._cache['shrunk'] if 'wod' in obj._cache: del obj._cache['wod'] else: obj._cache = {} return obj
[docs] @classmethod def zeros(cls, shape, dtype='float', *, numer=None, denom=(), mask=False): """New object of this class and shape, filled with zeros. Parameters: shape (tuple): Shape of the object. dtype (str, optional): One of "bool", "int", or "float", defining the data type. Ignored if `cls` has a default dtype. numer (tuple, optional): Numerator shape; None to use default for `cls`. denom (tuple, optional): Denominator shape. mask (array-like or bool, optional): Mask to apply. Returns: Qube: The new object. """ dtype = cls._suitable_dtype(dtype) numer = cls._suitable_numer(numer) obj = Qube.__new__(cls) obj.__init__(np.zeros(shape + numer + denom, dtype=dtype), mask=mask, drank=len(denom)) return obj
[docs] @classmethod def ones(cls, shape, dtype='float', *, numer=None, denom=(), mask=False): """New object of this class and shape, filled with ones. Parameters: shape (tuple): Shape of the object. dtype (str, optional): One of "bool", "int", or "float", defining the data type. Ignored if `cls` has a default dtype. numer (tuple, optional): Numerator shape; None to use default for `cls`. denom (tuple, optional): Denominator shape. mask (array-like or bool, optional): Mask to apply. Returns: Qube: The new object. """ dtype = cls._suitable_dtype(dtype) numer = cls._suitable_numer(numer) obj = Qube.__new__(cls) obj.__init__(np.ones(shape + numer + denom, dtype=dtype), mask=mask, drank=len(denom)) return obj
[docs] @classmethod def filled(cls, shape, fill=0, *, numer=None, denom=(), mask=False): """Internal object of this class and shape, filled with a constant. Parameters: shape (tuple): Shape of the object. dtype (str, optional): One of "bool", "int", or "float", defining the data type. Ignored if `cls` has a default dtype. numer (tuple, optional): Numerator shape; None to use default for `cls`. denom (tuple, optional): Denominator shape. mask (array-like or bool, optional): Mask to apply. Returns: Qube: The new object. Raises: ValueError: If `fill` is not compatible with the `cls`. """ # Create example object with shape == () example = Qube.__new__(cls) example.__init__(cls._suitable_value(fill, numer=numer, denom=denom), drank=len(denom)) # For a shapeless object, return the example if not shape: if not mask: return example example = example.remask(mask) return example # Return the filled object vals = np.empty(shape + example._item, dtype=example.dtype()) vals[...] = example._values obj = Qube.__new__(cls) obj.__init__(vals, mask=mask, example=example, drank=len(denom)) return obj
###################################################################################### # Low-level access ###################################################################################### def _set_values(self, values, mask=None, *, antimask=None, retain_cache=False): """Low-level method to update the values of an array. The read-only status of the object is defined by that of the given value. Parameters: values (array-like, float, int, or bool): New values. mask (array-like or bool, optional): New mask. antimask (array-like or bool, optional): If provided, then only the array locations associated with the antimask are modified. retain_cache (bool, optional): If True, the cache values are retained except for "unshrunk". Returns: Qube: This object, updated. Raises: TypeError: If the type of `values` or `mask` is invalid. ValueError: If the shape of `values`, `mask`, or `antimask` is invalid. """ # Confirm shapes shape = np.shape(self._values) shape1 = np.shape(values) if shape1 != shape: raise ValueError(f'value shape mismatch: {shape1}, {shape}') if mask is not None: mshape = np.shape(mask) if mshape and mshape != shape: raise ValueError(f'mask shape mismatch: {mshape}, {shape}') # Update values if antimask is not None: ashape = np.shape(antimask) if ashape != shape: raise ValueError(f'antimask shape mismatch: {ashape}, {shape}') self._values[antimask] = values[antimask] else: if isinstance(values, np.generic): if isinstance(values, np.floating): values = float(values) elif isinstance(values, np.integer): values = int(values) else: values = bool(values) self._values = values self._readonly = Qube._array_is_readonly(self._values) # Update the mask if necessary if mask is not None: if antimask is None: self._mask = mask elif isinstance(mask, np.ndarray): self._mask[antimask] = mask[antimask] else: if not isinstance(self._mask, np.ndarray): old_mask = self._mask self._mask = np.empty(self._shape, dtype=np.bool_) self._mask.fill(old_mask) self._mask[antimask] = mask # Handle the cache if retain_cache and mask is None: if 'unshrunk' in self._cache: del self._cache['unshrunk'] else: self._cache.clear() # Set the readonly state based on the values given if np.shape(self._mask): if self._readonly: self._mask = Qube._array_to_readonly(self._mask) elif Qube._array_is_readonly(self._mask): self._mask = self._mask.copy() return self def _new_values(self): """Low-level method to indicate that values have changed. This means "unshrunk" will be deleted from the cache if present. """ if 'unshrunk' in self._cache: del self._cache['unshrunk'] def _set_mask(self, mask, *, antimask=None, check=False): """Low-level method to update the mask of an array. The read-only status of the object will be preserved. Parameters: mask (array-like or bool, optional): New mask. antimask (array-like or bool, optional): If provided, then only the array locations associated with the antimask are modified. check (bool, optional): True to check for an array containing all False values, and if so, replace it with a single value of False. Returns: Qube: This object, updated. Raises: TypeError: If the type of `mask` is invalid. ValueError: If the mask is incompatible with the required shape. """ # Cast the mask and confirm the shape mask = Qube._suitable_mask(mask, self._shape, check=check) is_readonly = self._readonly if antimask is None: self._mask = mask elif isinstance(mask, np.ndarray): self._mask[antimask] = mask[antimask] else: if not isinstance(self._mask, np.ndarray): old_mask = self._mask self._mask = np.empty(self._shape, dtype=np.bool_) self._mask.fill(old_mask) self._mask[antimask] = mask self._cache.clear() if isinstance(self._mask, np.ndarray): if is_readonly: self._mask = Qube._array_to_readonly(self._mask) elif Qube._array_is_readonly(self._mask): self._mask = self._mask.copy() return self ###################################################################################### # Properties ###################################################################################### @property def values(self): """The value of this object as a numpy.ndarray, float, int, or bool.""" return self._values @property def vals(self): """The value of this object as a numpy.ndarray, float, int, or bool.""" return self._values # Handy shorthand @property def mvals(self): """This object as a NumPy ma.MaskedArray.""" # Deal with a scalar if self._is_scalar: if self._mask: return np.ma.masked else: return np.ma.MaskedArray(self._values) # Deal with a scalar mask if isinstance(self._mask, (bool, np.bool_)): if self._mask: return np.ma.MaskedArray(self._values, True) else: return np.ma.MaskedArray(self._values) # For zero rank, the mask is already the right size if self._rank == 0: return np.ma.MaskedArray(self._values, self._mask) # Expand the mask mask = self._mask.reshape(self._shape + self._rank * (1,)) mask = np.broadcast_to(mask, self._values.shape) return np.ma.MaskedArray(self._values, mask) @property def mask(self): """The boolean mask of this object as a NumPy.ndarray or bool.""" return self._mask @property def antimask(self): """The inverse of the mask of this object, True wherever an element is valid.""" if not Qube._DISABLE_CACHE and 'antimask' in self._cache: return self._cache['antimask'] if isinstance(self._mask, np.ndarray): antimask = np.logical_not(self._mask) self._cache['antimask'] = antimask return antimask antimask = not self._mask self._cache['antimask'] = antimask return antimask @property def default(self): """The default element value for this object.""" return self._default @property def unit_(self): """The Unit of this object.""" return self._unit @property def units(self): """The Unit of this object; alternative name for `unit_`.""" return self._unit @property def derivs(self): """The dictionary of derivatives of this object.""" return self._derivs @property def shape(self): """The shape of this object as a tuple.""" return self._shape @property def ndims(self): """The number of dimensions in this object (excluding items).""" return self._ndims # alternative name @property def ndim(self): """The number of dimensions in this object (excluding items).""" return self._ndims @property def rank(self): """The rank of this object.""" return self._rank @property def nrank(self): """The rank of the element numerator in this object.""" return self._nrank @property def drank(self): """The rank of the element denominator in this object.""" return self._drank @property def item(self): """The shape of the elements in this object as a tuple.""" return self._item @property def numer(self): """The shape of the element numerator in this object as a tuple.""" return self._numer @property def denom(self): """The shape of the element denominator in this object as a tuple.""" return self._denom @property def size(self): """The number of elements in this object's shape.""" return self._size @property def isize(self): """The number of components in this object's items.""" return self._isize @property def nsize(self): """The number of numerator components in this object's items.""" return self._nsize @property def dsize(self): """The number of denominator components in this object's items.""" return self._dsize @property def readonly(self): """True if this object is read-only; False otherwise.""" return self._readonly ###################################################################################### # Cache support ###################################################################################### def _clear_cache(self): """Clear the cache.""" self._cache.clear() def _find_corners(self): """Update the corner indices such that everything outside this defined "hypercube" is masked. """ if self._ndims == 0: return None index0 = self._ndims * (0,) if isinstance(self._mask, (bool, np.bool_)): if self._mask: return (index0, index0) else: return (index0, self._shape) lower = [] upper = [] antimask = self.antimask for axis in range(self._ndims): other_axes = list(range(self._ndims)) del other_axes[axis] occupied = np.any(antimask, tuple(other_axes)) indices = np.where(occupied)[0] if len(indices) == 0: return (index0, index0) lower.append(indices[0]) upper.append(indices[-1] + 1) return (tuple(lower), tuple(upper)) @property def corners(self): """Corners of a "hypercube" that contain all the unmasked array elements. Returns: (tuple, tuple): The first tuple defines the lower coordinates of the unmasked region, and the second tuple defines the upper coordinates. """ if not Qube._DISABLE_CACHE and 'corners' in self._cache: return self._cache['corners'] corners = self._find_corners() self._cache['corners'] = corners return corners @staticmethod def _slicer_from_corners(corners): """A slice object based on corners specified as a tuple of indices.""" slice_objects = [] for axis in range(len(corners[0])): slice_objects.append(slice(corners[0][axis], corners[1][axis])) return tuple(slice_objects) @staticmethod def _shape_from_corners(corners): """Array shape based on corner indices.""" shape = [] for axis in range(len(corners[0])): shape.append(corners[1][axis] - corners[0][axis]) return tuple(shape) @property def _slicer(self): """A slice object containing all the array elements inside the current corners.""" if not Qube._DISABLE_CACHE and 'slicer' in self._cache: return self._cache['slicer'] slicer = Qube._slicer_from_corners(self.corners) self._cache['slicer'] = slicer return slicer ###################################################################################### # Derivative operations ######################################################################################
[docs] def insert_deriv(self, key, deriv, *, override=True): """Insert or replace a derivative in this object. To prevent recursion, any internal derivatives of a derivative object are stripped away. If the object is read-only, then derivatives will also be converted to read-only. Derivatives cannot be integers. They are converted to floating-point if necessary. You cannot replace the pre-existing value of a derivative in a read-only object unless you explicit set override=True. However, inserting a new derivative into a read-only object is not prevented. Parameters: key (str): The name of the derivative. Each derivative also becomes accessible as an object attribute with "d_d" in front of the name. For example, the time-derivative of this object might be keyed by "t", in which case it can also be accessed as attribute "d_dt". deriv (Qube): The derivative. Derivatives must have the same leading shape and the same numerator as the object; denominator items are used for partial derivatives. override (bool, optional): True to allow the value of a pre-existing derivative to be replaced. Returns: Qube: This object after the derivative has been inserted. Raises: TypeError: If the derivative class is invalid or if derivatives are disallowed for the object class. ValueError: If the shape is invalid, or if the key already exists when `override` is False. """ if not self._DERIVS_OK: raise TypeError(f'derivatives are disallowed in class {type(self).__name__}') # Make sure the derivative is compatible with the object if not isinstance(deriv, Qube): raise TypeError(f'invalid class for derivative "{key}" in ' f'{type(self).__name__} object: {type(deriv).__name__}') if self._numer != deriv._numer: raise ValueError(f'shape mismatch for numerator of derivative "{key}" in ' f'{type(self).__name__} object: ' f'{deriv._numer}, {self._numer}') if self.readonly and (key in self._derivs) and not override: raise ValueError(f'derivative "{key}" cannot be replaced in ' f'{type(self).__name__} object; is read-only') # Prevent recursion, convert to floating point deriv = deriv.wod.as_float() # Match readonly of parent if necessary if self._readonly and not deriv._readonly: deriv = deriv.clone(recursive=False).as_readonly() # Save in the derivative dictionary and as an attribute if deriv._shape != self._shape: deriv = deriv.broadcast_to(self._shape) self._derivs[key] = deriv setattr(self, 'd_d' + key, deriv) self._cache.clear() return self
[docs] def insert_derivs(self, derivs, *, override=False): """Insert or replace the derivatives in this object from a dictionary. You cannot replace the pre-existing values of any derivative in a read-only object unless you explicit set override=True. However, inserting a new derivative into a read-only object is not prevented. Parameters: derivs (dict): The dictionary of derivatives keyed by their names. override (bool, optional): True to allow the value of a pre-existing derivative to be replaced. Returns: Qube: This object after the derivatives has been inserted. Raises: TypeError: If a derivative class is invalid. ValueError: If derivatives are disallowed for the object, if a shape is invalid, or if a key already exists when `override` is False. """ # Check every insert before proceeding with any if self.readonly and not override: for key in derivs: if key in self._derivs: raise ValueError(f'derivative "{key}" cannot be replaced in ' '{type(self).__name__} object; object is read-only') # Insert derivatives for key, deriv in derivs.items(): self.insert_deriv(key, deriv, override=override) return self
[docs] def delete_deriv(self, key, *, override=False): """Delete a single derivative from this object, given the key. Derivatives cannot be deleted from a read-only object without explicitly setting override=True. Parameters: key (str): The key of the derivative to remove. If the key does not exist, the object is unchanged. override (bool, optional): True to allow the deleting of derivatives from a read-only object. Raises: ValueError: If this object is read-only and `override` is False. """ if not override: self.require_writeable() if key in self._derivs.keys(): del self._derivs[key] del self.__dict__['d_d' + key] self._cache.clear()
[docs] def delete_derivs(self, *, override=False, preserve=None): """Delete all derivatives from this object. Derivatives cannot be deleted from a read-only object without explicitly setting `override=True`. Parameters: override (bool, optional): True to allow the deleting of derivatives from a read-only object. preserve (list, tuple or set, optional): The names of derivatives to retain. All others are removed. Raises: ValueError: If this object is read-only and `override` is False. """ if not override: self.require_writeable() # If something is being preserved... if preserve: # Delete derivatives not on the list for key in self._derivs.keys(): if key not in preserve: self.delete_deriv(key, override) return # Delete all derivatives for key in self._derivs.keys(): delattr(self, 'd_d' + key) self._derivs = {} self._cache.clear()
[docs] def without_derivs(self, *, preserve=None): """A shallow copy of this object without derivatives. A read-only object remains read-only, and is cached for later use. Parameters: preserve (list, tuple, or set, optional): The names of derivatives to retain. All others are removed. Returns: Qube: The copy, with the same subclass as self. """ if not self._derivs: return self # If something is being preserved... if preserve: if isinstance(preserve, str): preserve = [preserve] if not any([p for p in preserve if p in self._derivs]): return self.wod # Create a fast copy with derivatives obj = self.clone(recursive=True) # Delete derivatives not on the list deletions = [] for key in obj._derivs: if key not in preserve: deletions.append(key) for key in deletions: obj.delete_deriv(key, override=True) return obj # Return a fast copy without derivatives return self.wod
@property def wod(self): """A shallow clone without derivatives, cached. Read-only objects remain read-only. """ if not self._derivs: return self if not Qube._DISABLE_CACHE and 'wod' in self._cache: return self._cache['wod'] wod = Qube.__new__(type(self)) wod.__init__(self._values, self._mask, example=self) for key, attr in self.__dict__.items(): if key.startswith('d_d'): pass elif isinstance(attr, Qube): wod.__dict__[key] = attr.wod else: wod.__dict__[key] = attr wod._derivs = {} wod._cache['wod'] = wod self._cache['wod'] = wod return wod
[docs] def without_deriv(self, key): """A shallow copy of this object without a particular derivative. A read-only object remains read-only. Parameters: key (str): The key of the derivative to remove. Returns: Qube: The copy, with the same subclass as self. """ if key not in self._derivs: return self result = self.clone(recursive=True) del result._derivs[key] return result
[docs] def with_deriv(self, key, value, *, method='insert'): """A shallow copy of this object with a derivative inserted or added. A read-only object remains read-only. Parameters: key (str): The key of the derivative to insert. value (Qube): The value for this derivative. method (str): How to insert the derivative, one of these options:` * "`insert`": Iinsert the new derivative; raise a ValueError if a derivative of the same name already exists. * "`replace`": Replace an existing derivative of the same name. * "`add`": Add this derivative to an existing derivative of the same name. Returns: Qube: The copy, with the same subclass as self. Raises: ValueError: If `method` is "insert" and a derivative of the given name already exists. """ result = self.clone(recursive=True) if method not in ('insert', 'replace', 'add'): raise ValueError('invalid with_deriv method: ' + repr(method)) if key in result._derivs: if method == 'insert': raise ValueError(f'derivative "{key}" already exists in ' f'{type(self).__name__} object') if method == 'add': value = value + result._derivs[key] result.insert_deriv(key, value) return result
[docs] def rename_deriv(self, key, new_key, *, method='insert'): """A shallow copy of this object with a derivative renamed. A read-only object remains read-only. Parameters: key (str): The current key of the derivative. new_key (str): The new name of the derivative. method (str): How to rename the derivative, one of these options:` * "`insert`": Iinsert the new derivative; raise a ValueError if a derivative of the same name already exists. * "`replace`": Replace an existing derivative of the same name. * "`add`": Add this derivative to an existing derivative of the same name. Returns: Qube: The copy, with the same subclass as self. Raises: KeyError: If the `key` derivative does not exist. ValueError: If `method` is "insert" and a derivative of the given name already exists. """ result = self.with_deriv(new_key, self._derivs[key], method=method) result = result.without_deriv(key) return result
[docs] def unique_deriv_name(self, key, *objects): """A unique name for a derivative to apply to one or more objects. Parameters: key (str): The name to use, with a suffix appended if needed. *objects (Qube): One or more Qube objects. Returns: str: The given key, or with a numeric suffix if needed to make it unique. """ # Make a list of all the derivative keys all_keys = set(self._derivs.keys()) for obj in objects: if not hasattr(obj, 'derivs'): continue all_keys |= set(obj._derivs.keys()) # Return the proposed key if it is unused if key not in all_keys: return key # Otherwise, tack on a number and iterate until the name is unique i = 0 while True: unique = key + str(i) if unique not in all_keys: return unique i += 1
###################################################################################### # Unit operations ######################################################################################
[docs] def set_unit(self, unit, *, override=False): """Set the unit of this object. Parameters: unit (Unit or None): The new unit. override (bool, optional): If True, the unit can be modified on a read-only object. Raises: ValueError: If this object is read-only and `override` is False. """ if not self._UNITS_OK: if Unit.is_unitless(unit): return raise TypeError(f'units are disallowed in class {type(self).__name__}') if not override: self.require_writeable() unit = Unit.as_unit(unit) Unit.require_compatible(unit, self._unit) self._unit = unit self._cache.clear()
[docs] def without_unit(self, *, recursive=True): """A shallow copy of this object without units. A read-only object remains read-only. If recursive is True, derivatives are also stripped of their units. Parameters: recursive (bool, optional): True to include derivatives with their units stripped; False to omit all derivatives. Returns: Qube: A shallow copy of this object with the unit stripped. """ if self._unit is None and not self._derivs: return self obj = self.clone(recursive=recursive) obj._unit = None return obj
[docs] def into_unit(self, recursive=False): """The values property of this object, converted to its unit. Parameters: recursive (bool, optional): If True, also return the derivatives converted to their units. Returns: (numpy.ndarray, float, int, bool, or tuple): The values attribute of this object, converted to this object's units. If `recursive` is True, it returns a tuple (`values`, `derivs`), where `derivs` is a dictionary of the derivative values converted to their units. """ if self._unit is None or self._unit.into_unit_factor == 1.: values = self._values else: values = Unit.into_unit(self._unit, self._values) if not recursive: return values derivs = {} for key, deriv in self._derivs.items(): derivs[key] = Unit.into_unit(deriv._unit, deriv._values) return (values, derivs)
[docs] def confirm_unit(self, unit): """Raises a ValueError if the unit is not compatible with this object. Parameters: unit (Unit or None): The new unit. Returns: Qube: This object. Raises: ValueError: If this object has a unit that are incompatible with the new unit. """ if not Unit.can_match(self._unit, unit): raise ValueError(f'units are not compatible with {type(self).__name__} ' f'object: {unit}, {self._unit}') return self
[docs] def is_unitless(self): """True if this object is unitless.""" return Unit.is_unitless(self._unit)
def _require_unitless(self, op=''): """Raise a ValueError if this object is not unitless. Parameters: info (str, optional): An info string to embed into the error message. Raises: ValueError: If units are present. """ if self.is_unitless(): return Unit.require_unitless(self._unit, info=self._opstr(op)) def _require_angle(self, op=''): """Raise a ValueError if this object is not either unitless or has a dimension of angle. Parameters: op (str, optional): Operation name to embed into the error message. Raises: ValueError: If units are not compatible with an angle. """ if Unit.is_angle(self._unit): return Unit.require_angle(self._unit, info=self._opstr(op)) def _require_compatible_units(self, arg, op=''): """Raise a ValueError if these objects do not have compatible units. Parameters: op (str, optional): Operation name to embed into the error message. Raises: ValueError: If units are not compatible. """ if not isinstance(arg, Qube): return True if Unit.can_match(self._unit, arg._unit): return True Unit.require_compatible(self._unit, arg._unit, info=self._opstr(op)) ###################################################################################### # Read-only/read-write operations ###################################################################################### @staticmethod def _array_is_readonly(arg): """True if the argument is a read-only NumPy ndarray. False means that it is either a writable array or a scalar. """ if not isinstance(arg, np.ndarray): return False return (not arg.flags['WRITEABLE']) @staticmethod def _array_to_readonly(arg): """Make the given argument read-only if it is a NumPy ndarray; then return it.""" if not isinstance(arg, np.ndarray): return arg arg.flags['WRITEABLE'] = False return arg
[docs] def as_readonly(self, *, recursive=True): """Convert this object to read-only. It is modified in place and returned. If this object is already read-only, it is returned as is. Otherwise, the internal _values and _mask arrays are modified as necessary. Once this happens, the internal arrays will also cease to be writable in any other object that shares them. Note that `as_readonly()` cannot be undone. Use `copy()` to create a writable copy of a readonly object. Parameters: recursive (bool, optional): True also to convert the derivatives to read-only; False to strip the derivatives. Returns: Qube: This object, converted to read-only if necessary. """ # If it is already read-only, return if self._readonly: return self # Update the value if it is an array Qube._array_to_readonly(self._values) Qube._array_to_readonly(self._mask) self._readonly = True # Update anything cached if not Qube._DISABLE_CACHE: for key, value in self._cache.items(): if isinstance(value, Qube): self._cache[key] = value.as_readonly(recursive=recursive) # Update the derivatives if recursive: for key in self._derivs: self._derivs[key].as_readonly() return self
[docs] def match_readonly(self, arg): """Convert the read-only status of this object equal to that of another. Parameters: arg (Qube): An existing Qube subclass. Returns: Qube: This object converted to read-only. Raises: ValueError: If this object is read-only but the `arg` is not. """ if arg._readonly: return self.as_readonly() elif self._readonly: raise ValueError(f'{type(self).__name__} object is read-only') return self
[docs] def require_writeable(self, force=False): """Ensure that this object is writeable. Parameters: force (bool, optional): True to return a new copy if this object is read-only; otherwise, if this object is not writeable, raise a ValueError. Returns: Qube: This object if already writeable; otherwise a new writeable copy. Raises: ValueError: If this object is read-only but `force` is False. """ if self._readonly: if force: return self.copy(recursive=True, readonly=True) raise ValueError(f'{type(self).__name__} object is read-only') # Sometimes the array is writeable but a shared mask is not if np.shape(self._mask) and not self._mask.flags['WRITEABLE']: self.remask(self._mask.copy()) # It's possible that a derivative is read-only for key, deriv in self._derivs.items(): if deriv._readonly: self._derivs[key] = deriv.copy(recursive=False, readonly=False) return self
[docs] def require_writable(self, force=False): """Ensure that this object is writeable. DEPRECATED NAME; use require_writeable(). Parameters: force (bool, optional): True to return a new copy if this object is read-only; otherwise, if this object is not writeable, raise a ValueError. Returns: Qube: This object if already writeable; otherwise a new writeable copy. Raises: ValueError: If this object is read-only but `force` is False. """ return self.require_writeable(force=force)
###################################################################################### # Copying operations and conversions ######################################################################################
[docs] def copy(self, *, recursive=True, readonly=False): """Deep copy operation with additional options. Parameters: recursive (bool, optional): True to copy the derivatives; False, to return an object without derivatives. readonly (bool, optional): True to return a read-only copy, or this object if it is already read-only. Otherwise, this return is guaranteed to be an entirely new copy, independent of this object and suitable for modification. Returns: Qube: A copy of this object. """ # Create a shallow copy obj = self.clone(recursive=False) # Copying a readonly object is easy if self._readonly and readonly: return obj # Copy the values if self._is_array: obj._values = self._values.copy() else: obj._values = self._values # Copy the mask if isinstance(self._mask, np.ndarray): obj._mask = self._mask.copy() else: obj._mask = self._mask obj._cache = {} # Set the read-only state if readonly: obj.as_readonly() else: obj._readonly = False # Make the derivatives read-only if necessary if recursive: for key, deriv in self._derivs.items(): obj.insert_deriv(key, deriv.copy(recursive=False, readonly=readonly)) return obj
# Python-standard copy function
[docs] def __copy__(self): """An independent, writeable copy of this object.""" return self.copy(recursive=True, readonly=False)
###################################################################################### # Value tests ######################################################################################
[docs] @staticmethod def as_one_bool(value): """Convert a single value to a bool; leave other values unchanged.""" if not isinstance(value, np.ndarray): return bool(value) return value
[docs] @staticmethod def is_one_true(value): """True if the value is a single boolean True.""" if isinstance(value, (bool, np.bool_)): return bool(value) return False
[docs] @staticmethod def is_one_false(value): """True if the value is a single boolean False.""" if isinstance(value, (bool, np.bool_)): return not bool(value) return False
@staticmethod def _is_one_value(value): """True if the value is a Python numeric or a NumPy numeric scalar.""" return isinstance(value, numbers.Real) ###################################################################################### # Conversions ######################################################################################
[docs] def dtype(self): """One of "float", "int", or "bool", depending this object's value.""" return Qube._dtype(self._values)
[docs] def is_numeric(self): """True if this object contains numbers; False if boolean.""" if isinstance(self._values, (bool, np.bool_)): return False if isinstance(self._values, np.ndarray) and self._values.dtype.kind == 'b': return False return True
[docs] def as_numeric(self, *, recursive=True): """A numeric version of this object. Booleans are converted to Scalars. Parameters: recursive (bool, optional): True to include any derivatives; False to remove them. Returns: Qube: This object if it is already numeric; a Boolean is converted to a Scalar. """ if self.is_numeric(): return self if recursive else self.wod values = int(self._values) if self._is_scalar else self._values.astype(np.int8) return Qube._SCALAR_CLASS(values, self._mask, example=self, op='as_numeric()')
[docs] def is_float(self): """True if this object contains floats; False if ints or booleans.""" if isinstance(self._values, np.ndarray): return self._values.dtype.kind == 'f' return isinstance(self._values, float)
[docs] def as_float(self, *, recursive=True, copy=False, builtins=False): """A floating-point version of this object. Booleans are converted to Scalars. Parameters: recursive (bool, optional): True to include any derivatives; False to remove them. copy (bool, optional): True to ensure that a new object with an independent copy of the values is returned. builtins (bool, optional): True to return a Python float if the returned value has shape (), is unmasked, and has no derivatives. Returns: Qube: The result. Raises: TypeError: If this object cannot contain floats. """ if (builtins and self._is_scalar and not self._mask and not (recursive and self._derivs)): return float(self._values) if isinstance(self._values, np.ndarray) and self._values.dtype.kind == 'f': if copy: return self.__copy__(recursive=recursive) return self if recursive else self.wod cls = type(self) if cls is Qube._BOOLEAN_CLASS: cls = Qube._SCALAR_CLASS if not cls._FLOATS_OK: raise TypeError(f'{cls.__name__} object cannot contain floats') if self._is_scalar: values = float(self._values) else: values = self._values.astype(np.float64) derivs = self._derivs if recursive else {} obj = Qube.__new__(cls) obj.__init__(values, self._mask, derivs=derivs, example=self, op='as_float()') return obj
[docs] def is_int(self): """True if this object contains ints; False if floats or booleans.""" if isinstance(self._values, np.ndarray): return self._values.dtype.kind in 'iu' if isinstance(self._values, bool): return False return isinstance(self._values, int)
[docs] def as_int(self, copy=False, builtins=False): """An integer version of this object. Booleans are converted to Scalars. Parameters: copy (bool, optional): True to ensure that a new object with an independent copy of the values is returned. builtins (bool, optional): True to return a Python float if the returned value has shape (), is unmasked, and has no derivatives. Returns: Qube or int: The result. Raises: TypeError: If this object cannot contain integers. """ if builtins and self._is_scalar and not self._mask: return int(self._values) if isinstance(self._values, np.ndarray) and self._values.dtype.kind in 'iu': return self.__copy__() if copy else self cls = type(self) if cls is Qube._BOOLEAN_CLASS: cls = Qube._SCALAR_CLASS if not cls._INTS_OK: raise TypeError(f'{cls.__name__} object cannot contain ints') if self._is_scalar: values = int(self._values // 1) elif self._values.dtype.kind == 'b': values = self._values.astype(np.int8) else: values = (self._values // 1).astype(np.int64) obj = Qube.__new__(cls) obj.__init__(values, self._mask, example=self, op='as_int()') return obj
[docs] def is_bool(self): """True if this object contains booleans; False otherwise.""" if isinstance(self._values, np.ndarray): return self._values.dtype.kind == 'b' return isinstance(self._values, bool)
[docs] def as_bool(self, copy=False, builtins=False): """A boolean version of this object. Scalars are converted to Booleans. Parameters: copy (bool, optional): True to ensure that a new object with an independent copy of the values is returned. builtins (bool, optional): True to return a Python float if the returned value has shape (), is unmasked, and has no derivatives. Returns: Qube: A copy of object converted to bools; if the values are already bools and `copy` is False, this object is returned unchanged. Raises: TypeError: If this object cannot contain bools. """ if builtins and self._is_scalar and not self._mask: return bool(self._values) if isinstance(self._values, np.ndarray) and self._values.dtype.kind == 'b': return self.__copy__() if copy else self cls = type(self) if cls is Qube._SCALAR_CLASS: cls = Qube._BOOLEAN_CLASS if not cls._INTS_OK: raise TypeError(f'{cls.__name__} object cannot contain bools') values = bool(self._values) if self._is_scalar else self._values.astype(np.bool_) obj = Qube.__new__(cls) obj.__init__(values, self._mask, example=self, op='as_bool()') return obj
[docs] def as_this_type(self, arg, *, recursive=True, coerce=True, op=''): """The argument converted to this class and data type. If the object is already of the correct class and type, it is returned unchanged. Parameters: arg (array-like, float, int, or bool): The object to the class of this object. If the argument is a scalar or NumPy ndarray, a new instance of this object's class is created. recursive (bool, optional): True to convert the derivatives as well. coerce (bool, optional): True to coerce the data type silently; False to leave the data type unchanged. op (str, optional): Name of operator to use in an error message. Returns: Qube: The argument converted to the type of this object. """ # If the classes already match, we might return the argument as is if type(arg) is type(self): obj = arg else: obj = None # Initialize the new values and mask; track other attributes if not isinstance(arg, Qube): arg = Qube(arg, example=self, op=op) if arg._nrank != self._nrank: Qube._raise_incompatible_numers(op, self, arg) new_vals = arg._values new_mask = arg._mask new_unit = arg._unit has_derivs = bool(arg._derivs) is_readonly = arg._readonly # Convert the value types if necessary changed = False if coerce: casted = Qube._casted_to_dtype(new_vals, Qube._dtype(self._values)) changed = casted is not new_vals new_vals = casted # Convert the unit if necessary if new_unit and not self._UNITS_OK: new_unit = None changed = True # Validate derivs if has_derivs and not self._DERIVS_OK: changed = True if has_derivs and not recursive: changed = True # Construct the new object if necessary if changed or obj is None: obj = Qube.__new__(type(self)) obj.__init__(new_vals, new_mask, unit=new_unit, drank=arg._drank, example=self) is_readonly = False # Update the derivatives if necessary if recursive and has_derivs: derivs_changed = False new_derivs = {} for key, deriv in arg._derivs.items(): new_deriv = self.as_this_type(deriv, recursive=False, coerce=False, op=op) if new_deriv is not deriv: derivs_changed = True new_derivs[key] = new_deriv if derivs_changed or (arg is not obj): if is_readonly: obj = obj.copy(recursive=False) obj.insert_derivs(new_derivs) return obj
[docs] def cast(self, classes): """A shallow copy of this object casted to another Qube subclass. Parameters: classes (class or list): A Qube subclass or list of subclasses. The object will be casted to the first suitable class in the list. Returns: Qube: A shallow copy of this object. If the object is already of the selected class or if no suitable class is found, it is returned without modification. """ # Convert a single class to a tuple if isinstance(classes, type): classes = (classes,) # For each class in the list... for cls in classes: # If this is already the class of this object, return it as is if cls is type(self): return self # Exclude the class if it is incompatible if cls._NUMER is not None and cls._NUMER != self._numer: continue if cls._NRANK is not None and cls._NRANK != self._nrank: continue # Construct the new object obj = Qube.__new__(cls) obj.__init__(self._values, self._mask, derivs=self._derivs, example=self) return obj # If no suitable class was found, return this object unmodified return self
[docs] def as_all_constant(self, constant=None, *, recursive=True): """A shallow, read-only copy of this object with constant values. Derivatives are all set to zero. The mask is unchanged. Parameters: constant (array-like, float, int, or bool, optional): The constant value for each item. This must have the same shape as this object's items. Use None for values of zero appropriate to the Qube subclass. Returns: Qube: A shallow copy of this object with constant values. """ if constant is None: constant = self.zero() constant = self.as_this_type(constant, recursive=False) obj = self.clone(recursive=False) obj._set_values(Qube.broadcast(constant, obj)[0]._values) obj.as_readonly() if recursive: for key, deriv in self._derivs.items(): obj.insert_deriv(key, deriv.as_all_constant(recursive=False)) return obj
[docs] def as_size_zero(self, axis=0, *, recursive=True): """A shallow, read-only copy of this object with size zero. Parameters: axis (int, optional): The axis index (positive or negative) to collapse to length zero; the other axes are left unchanged. Use None for an object of shape (0,). Returns: Qube: A shallow copy of this object with size zero. """ obj = Qube.__new__(type(self)) if self._shape == (): new_values = np.array([self._values])[:0] new_mask = np.array([self._mask])[:0] elif axis is None: new_values = self._values.ravel()[:0] new_mask = np.asarray(self._mask).ravel()[:0] else: if axis == 0: indx = slice(0, 0) else: indx = (Ellipsis, slice(0, 0)) new_values = self._values[indx] if np.shape(self._mask): new_mask = self._mask[indx] else: new_mask = np.array([self._mask])[indx] obj.__init__(new_values, new_mask, example=self) if recursive: for key, deriv in self._derivs.items(): obj.insert_deriv(key, deriv.as_size_zero(axis=axis, recursive=False)) return obj
###################################################################################### # Object mask operations ######################################################################################
[docs] def is_all_masked(self): """True if this object is entirely masked.""" return np.all(self._mask)
[docs] def count_masked(self): """The number of masked items in this object.""" if isinstance(self._mask, np.ndarray): return np.sum(self._mask) return self._size if self._mask else 0
[docs] def count_unmasked(self): """The number of unmasked items in this object.""" if isinstance(self._mask, np.ndarray): return self._size - np.sum(self._mask) return 0 if self._mask else self._size
[docs] def masked_single(self, *, recursive=True): """An object of this subclass containing one masked value.""" if not self._rank: new_value = self._default else: new_value = self._default.copy() obj = Qube.__new__(type(self)) obj.__init__(new_value, True, example=self) if recursive and self._derivs: for key, value in self._derivs.items(): obj.insert_deriv(key, value.masked_single(recursive=False)) obj.as_readonly() return obj
[docs] def without_mask(self, *, recursive=True): """A shallow copy of this object without its mask. Note that masked values will be revealed. Parameters: recursive (bool, optional): True to unmask any derivatives; False to strip derivatives. Returns: Qube: This object without a mask. """ obj = self.clone(recursive=recursive) obj._set_mask(False) if recursive: for key, deriv in self._derivs.items(): obj.insert_deriv(key, deriv.without_mask()) return obj
[docs] def as_all_masked(self, *, recursive=True): """A shallow copy of this object with everything masked. Parameters: recursive (bool, optional): True to mask any derivatives; False to strip derivatives. Returns: Qube: This object but fully masked. """ obj = self.clone(recursive=recursive) obj._set_mask(True) if recursive: for key, deriv in self._derivs.items(): obj.insert_deriv(key, deriv.as_all_masked(recursive=False)) return obj
[docs] def as_one_masked(self, *, recursive=True): """This object reduced to shape () and masked. Parameters: recursive (bool, optional): True to mask any derivatives; False to strip derivatives. Returns: Qube: This object but fully masked and with shape () """ return self.flatten()[0].as_all_masked()
[docs] def remask(self, mask, *, recursive=True, check=True): """A shallow copy of this object with a replaced mask. This is much quicker than masked_where(), for cases where only the mask of this object is changing. Parameters: mask (array-like or bool): The new mask to be applied to the object. recursive (bool, optional): True to apply the same mask to any derivatives. check (bool, optional): True to check for an array containing all False values, and if so, replace it with a single value of False. Returns: Qube: A shallow copy of this object with a new mask. Raises: TypeError: If the data type of `mask` is invalid. ValueError: If the mask is incompatible with the required shape. """ mask = Qube._suitable_mask(mask, self._shape, check=check) # Construct the new object obj = self.clone(recursive=False) obj._set_mask(mask) if recursive: for key, deriv in self._derivs.items(): obj.insert_deriv(key, deriv.remask(mask, recursive=False, check=False)) return obj
[docs] def remask_or(self, mask, *, recursive=True, check=True): """A shallow copy of this object, in which the current mask is "or-ed" with the given mask. This is much quicker than masked_where(), for cases where only the mask is changing. Parameters: mask (array-like or bool): The new mask to be applied to the object. recursive (bool, optional): True to apply the same mask to any derivatives. check (bool, optional): True to check for an array containing all False values, and if so, replace it with a single value of False. Returns: Qube: A shallow copy of this object with a new mask. Raises: TypeError: If the data type of `mask` is invalid for a mask. ValueError: If the mask is incompatible with the required shape. """ mask = Qube._suitable_mask(mask, self._shape, check=check) # Construct the new object obj = self.clone(recursive=False) obj._set_mask(Qube.or_(self._mask, mask)) if recursive: for key, deriv in self._derivs.items(): obj.insert_deriv(key, deriv.remask(mask, recursive=False, check=False)) return obj
[docs] def expand_mask(self, *, recursive=True): """A shallow copy where a single mask value of True or False is converted to an array. If the object's mask is already an array, it is returned unchanged. Parameters: recursive (bool, optional): True to expand the mask of any derivatives. Returns: Qube: A shallow copy of this object with an expanded mask. """ if np.shape(self._mask) and not (recursive and self._derivs): return self # Clone the object only if necessary obj = None if not isinstance(self._mask, np.ndarray): obj = self.clone(recursive=True) if obj._mask: obj._set_mask(np.ones(self._shape, dtype=np.bool_)) else: obj._set_mask(np.zeros(self._shape, dtype=np.bool_)) # Clone any derivs only if necessary new_derivs = {} if recursive: for key, deriv in self._derivs.items(): mask_before = deriv._mask new_deriv = deriv.expand_mask(recursive=False) if mask_before is not new_deriv._mask: new_derivs[key] = new_deriv # If nothing has changed, return self if obj is None and not new_derivs: return self # Return the modified object if obj is None: obj = self.clone(recursive=True) for key, deriv in new_derivs.items(): obj.insert_deriv(key, deriv, override=True) return obj
[docs] def collapse_mask(self, *, recursive=True): """A shallow copy where a mask entirely containing either True or False is converted to a single boolean. Parameters: recursive (bool, optional): True to collapse the mask of any derivatives. Returns: Qube: A shallow copy of this object with a collapsed mask. """ if not isinstance(self._mask, np.ndarray) and not (recursive and self._derivs): return self # Clone the object only if necessary obj = None if np.shape(self._mask): if not np.any(self._mask): obj = self.clone(recursive=True) obj._set_mask(False) elif np.all(self._mask): obj = self.clone(recursive=True) obj._set_mask(True) # Clone any derivs only if necessary new_derivs = {} if recursive: for key, deriv in self._derivs.items(): mask_before = deriv._mask new_deriv = deriv.collapse_mask(recursive=False) if mask_before is not new_deriv._mask: new_derivs[key] = new_deriv # If nothing has changed, return self if obj is None and not new_derivs: return self # Return the modified object if obj is None: obj = self.clone(recursive=True) for key, deriv in new_derivs.items(): obj.insert_deriv(key, deriv, override=True) return obj
[docs] def as_mask_where_nonzero(self): """A boolean scalar or NumPy ndarray where values are nonzero and unmasked.""" return (self._values != 0) & self.antimask
[docs] def as_mask_where_zero(self): """A boolean scalar or NumPy ndarray where values are zero and unmasked.""" return (self._values == 0) & self.antimask
[docs] def as_mask_where_nonzero_or_masked(self): """A boolean scalar or NumPy ndarray where values are nonzero or masked.""" return (self._values != 0) | self._mask
[docs] def as_mask_where_zero_or_masked(self): """A boolean scalar or NumPy ndarray where values are zero or masked.""" return (self._values == 0) | self._mask
###################################################################################### # I/O operations ######################################################################################
[docs] def __repr__(self): """Express the value as a string. The format of the returned string is `Class([value, value, ...], suffixes, ...)`, where the quanity inside square brackets is the result of str() applied to a NumPy ndarray. The suffixes are, in order... * "denom=(shape)" if the object has a denominator; * "mask" if the object has a mask * the name of the unit of the object has a unit * the names of all the derivatives in alphabetical order Returns: str: String representation """ return self.__str__()
[docs] def __str__(self): """Express the value as a string. The format of the returned string is `Class([value, value, ...], suffixes, ...)`, where the quanity inside square brackets is the result of str() applied to a NumPy ndarray. The suffixes are, in order... * "denom=(shape)" if the object has a denominator; * "mask" if the object has a mask * the name of the unit of the object has a unit * the names of all the derivatives in alphabetical order Returns: str: String representation """ suffix = [] # Indicate the denominator shape if necessary if self._denom != (): suffix += ['denom=' + str(self._denom)] # Masked objects have a suffix ', mask' is_masked = np.any(self._mask) if is_masked: suffix += ['mask'] # Objects with a unit include the unit in the suffix if not self.is_unitless(): suffix += [str(self._unit)] # Objects with derivatives include a list of the names if self._derivs: keys = list(self._derivs.keys()) keys.sort() for key in keys: suffix += ['d_d' + key] # Generate the value string scaled = self.into_unit(recursive=False) # apply the unit if self._is_scalar: if is_masked: string = '--' else: string = str(scaled) elif is_masked: temp = Qube(scaled, self._mask, example=self, derivs={}) string = str(temp.mvals)[1:-1] else: string = str(scaled)[1:-1] # Add an extra set of brackets around derivatives if self._denom: string = '[' + string + ']' # Concatenate the results if len(suffix) == 0: suffix = '' else: suffix = '; ' + ', '.join(suffix) return type(self).__name__ + '(' + string + suffix + ')'
def _opstr(self, /, op): """An operation string to use in an error message for this class. Parameters: op (str): Name of the operation. Returns: str: The class name followed by the operation, updated for an error message. """ name = self.__name__ if isinstance(self, type) else type(self).__name__ if not op: return name if op[0].isalpha(): return name + '.' + op return name + ' "' + op + '"' def _disallow_denom(self, op): """Raise ValueError if this object has a denominator. Parameters: op (str): Name of the operation to appear in the error message. """ if self._drank: raise ValueError(self._opstr(op) + ' does not support denominators') def _require_scalar(self, op): """Raise ValueError if this object has rank > 0. Parameters: op (str): Name of the operation to appear in the error message. """ if self._nrank: raise ValueError(self._opstr(op) + ' requires scalar items') def _require_axis_in_range(self, axis, rank, op, name='axis'): """Raise ValueError if a given axis index is out of range. Parameters: axis (int): Axis index, positive or negative. rank (int): Rank of an array for indexing. op (str): Name of the operation to appear in the error message. name (str, optional): Name of axis variable. Raises: ValueError: If axis < -rank or >= rank. """ if axis < -rank or axis >= rank: opstr = self._opstr(op) raise ValueError(f'{opstr} {name} is out of range ({-rank},{rank}): {axis}') ###################################################################################### # from_scalars() special method ######################################################################################
[docs] @classmethod def from_scalars(cls, *scalars, recursive=True, readonly=False, classes=[]): """A new instance constructed from Scalars or arrays given as arguments. Defined as a class method so it can also be used to generate instances of any 1-D subclass. Parameters: *scalars (Qube, array-like, float, or int): One or more Scalars or objects that can be converted to Scalars. recursive (bool, optional): True to construct the derivatives as the union of the derivatives of all the components' derivatives. False to return an object without derivatives. readonly (bool, optional): True to return a read-only object; False (the default) to return something potentially writable. classes: (class or list[class]): A list defining the preferred class of the returned object. The first suitable class in the list will be used; default is [Vector]. Returns: Qube: A new object constructed from the inputs and using the first suitable class within `classes`. Raises: ValueError: If two of the `scalars` have incompatible denominators. """ # Convert to scalars and broadcast to the same shape args = [] for arg in scalars: scalar = Qube._SCALAR_CLASS.as_scalar(arg) args.append(scalar) scalars = Qube.broadcast(*args, recursive=recursive) # Tabulate the properties and construct the value array new_unit = None new_denom = None arrays = [] masks = [] deriv_dicts = [] has_derivs = False dtype = np.int64 for scalar in scalars: arrays.append(scalar._values) masks.append(scalar._mask) new_unit = new_unit or scalar._unit Unit.require_match(new_unit, scalar._unit) if new_denom is None: new_denom = scalar._denom elif new_denom != scalar._denom: raise ValueError(f'incompatible denominators in {cls}.from_scalars(): ' f'{scalar._denom}, {new_denom}') deriv_dicts.append(scalar._derivs) if len(scalar._derivs): has_derivs = True # Remember any floats encountered if scalar.is_float(): dtype = np.float64 # Construct the values array new_drank = len(new_denom) new_values = np.array(arrays, dtype=dtype) new_values = np.rollaxis(new_values, 0, new_values.ndim - new_drank) # Construct the mask (scalar or array) masks = Qube.broadcast(*masks) new_mask = Qube.or_(*masks) # Construct the object obj = Qube.__new__(cls) obj.__init__(new_values, new_mask, unit=new_unit, nrank=scalars[0]._nrank + 1, drank=new_drank) obj = obj.cast(classes) # Insert derivatives if necessary if recursive and has_derivs: new_derivs = {} # Find one example of each derivative examples = {} for deriv_dict in deriv_dicts: for key, deriv in deriv_dict.items(): examples[key] = deriv for key, example in examples.items(): items = [] if example._item: missing_deriv = Qube(np.zeros(example._item), nrank=example._nrank, drank=example._drank, op='from_scalars()') else: missing_deriv = 0. for deriv_dict in deriv_dicts: items.append(deriv_dict.get(key, missing_deriv)) new_derivs[key] = Qube.from_scalars(*items, recursive=False, readonly=readonly, classes=classes) obj.insert_derivs(new_derivs) return obj
##########################################################################################