Source code for ancb

from numpy import ndarray, asarray, empty, empty_like  # type: ignore
from typing import Tuple, NoReturn
from typing import Union

from functools import reduce
from itertools import zip_longest, starmap

from numpy import (
    matmul, add, subtract, multiply, divide, mod, floor_divide, power,
    negative, positive, absolute,
    right_shift, left_shift, bitwise_and, invert, bitwise_or, bitwise_xor
)

import numpy as np


[docs]def can_broadcast(shape1, shape2) -> bool: """ Check if shapes shape1 and shape2 can be broadcast together. :param Tuple shape1: first shape to parse :param Tuple shape2: second shape to parse :returns: True if arr1 and arr2 can be broadcast together, False otherwise :rtype: bool """ return( reduce( lambda a, b: a and b, starmap( lambda a, b: (a == b or (a == 1 or b == 1)), zip_longest(shape1, shape2, fillvalue=1) ) ) )
[docs]def star_can_broadcast(starexpr) -> bool: """ Check if shapes shape1 and shape2 can be broadcast together from a tuple of zip_longest(shape1, shape2, fillvalue=1) called the "starexpr" :param Tuple starexpr: starexpr to parse :returns: True if shape1 and shape2 can be broadcast together, False otherwise :rtype: bool """ return ( reduce( lambda a, b: a and b, starmap( lambda a, b: (a == b or (a == 1 or b == 1)), starexpr ) ) )
[docs]class NumpyCircularBuffer(ndarray): """ Implements a circular (ring) buffer using a numpy array. This implmentation uses an internal size count and capacity count so that the data region is fully utilized. """ def __new__(cls, data, bounds: Tuple[int, int] = (0, 0)): """ Generate a circular buffer over existing array. The dimension 0 is used to index elements of the buffer. For example, for an ndim array of shape (N, a, b, c), the size of the buffer is interpretted to be N and the elements are arrays of shape (a, b, c). :param data: Data backing the buffer. Interpretted as a numpy array. :param Tuple[int, int] bounds: tuple of (begin, end) indices expressing where the buffer begins and ends over the data. For example, for a buffer [0, 1, 2, -1, -1] choosing (0, 2) would say [0, 1, 2] is in the buffer. For [1, 2, -1, -1, 0] choosing (4, 2) would select [0, 1, 2]. """ obj = asarray(data).view(cls) obj._begin = bounds[0] obj._end = bounds[1] obj._capacity = obj.shape[0] if (obj._begin < obj._end): obj._size = (obj._end - obj._begin) + 1 elif (obj._begin > obj._end): # ((obj._capacity - 1) - obj._begin) + obj._end + 1 obj._size = obj._capacity - obj._begin + obj._end else: obj._size = 0 return (obj) def __init__(self, *args, **kwargs): # This is here for typing info for linters, not anything else super().__init__() self._begin: int self._end: int self._capacity: int def __matmul__(self, x): x = asarray(x) if (x.ndim == 0): ValueError( "matmul: Input operand 1 does not have enough dimensions " "(has 0, gufunc core with signature (n?,k),(k,m?)->(n?,m?) " "requires 1" ) if x.ndim == 1 and self.ndim == 1: # Dot product if x.shape[0] == self._size: if self.fragmented: k = self._capacity - self._begin # fragmentation index out = matmul(self[self._begin:], x[:k]).view(ndarray) out += matmul(self[:self._end], x[k:]).view(ndarray) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] out = matmul(part, x).view(ndarray) return(out) else: raise ValueError( "matmul: Input operand 1 has a mismatch in its core " "dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?)" " (size {n} is different from {m})".format( n=self._size, m=x.shape[0] ) ) elif self.ndim == 1 and x.ndim > 1: if self._size == x.shape[-2]: out = empty(*x.shape[:-2], x.shape[-1]) if self.fragmented: k = self._capacity - self._begin # fragmentation index matmul(self[self._begin:], x[..., :k, :], out) out += matmul(self[:self._end], x[..., k:, :]).view( ndarray ) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] matmul(part, x, out).view(ndarray) return(out) else: raise ValueError( "matmul: Input operand 1 has a mismatch in its core " "dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?)" " (size {n} is different from {m})".format( n=self.shape[-2], m=x.shape[0] ) ) elif self.ndim == 2: if (self.shape[-1] == x.shape[-2]): out = empty( (*x.shape[:-2], self.shape[-1], x.shape[-2]) ) if self.fragmented: k = self._capacity - self._begin # fragmentation index matmul(self[self._begin:], x, out[..., :k, :]) matmul(self[:self._end], x, out[..., k:, :]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] matmul(part, x, out) return(out.view(ndarray)) else: raise ValueError( ( "matmul: Input operand 1 has a mismatch in its core " "dimension 0, with gufunc signature (n?,k),(k,m?)->" "(n?,m?) (size {n} is different from {m})" ).format( n=self.shape[-1], m=x.shape[-2] ) ) else: if (self.shape[-1] == x.shape[-2]): self_shape = (self._size, *self.shape[1:-2]) starexpr = tuple( zip_longest(self_shape, x.shape[:-2], fillvalue=1) ) if star_can_broadcast(starexpr): broadcast_shape = tuple( starmap( lambda a, b: max(a, b), starexpr ) ) out = empty( (*broadcast_shape, self.shape[-2], x.shape[-1]) ) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim > 2: matmul(self[self._begin:], x[:k], out[:k]) matmul(self[:self._end], x[k:], out[k:]) else: matmul(self[self._begin:], x, out[:k]) matmul(self[:self._end], x, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] matmul(part, x, out) return(out.view(ndarray)) else: raise ValueError( ( "operands could not be broadcast together with" "remapped shapes [original->remapped]: " "{shape_b}->({shape_bn}, newaxis,newaxis) " "{shape_a}->({shape_an}, newaxis,newaxis) " "and requested shape ({n},{m})" ).format( shape_a=self_shape, shape_b=x.shape, shape_an=self.shape[:-2].__str__()[:-1], shape_bn=x.shape[:-2].__str__()[:-1], n=self.shape[-1], m=x.shape[-2] ) ) else: raise ValueError( ( "matmul: Input operand 1 has a mismatch in its core " "dimension 0, with gufunc signature (n?,k),(k,m?)->" "(n?,m?) (size {n} is different from {m})" ).format( n=self.shape[-1], m=x.shape[-2] ) ) def __imatmul__(self, x): raise TypeError("In-place matrix multiplication is not (yet) " "supported. Use 'a = a @ b' instead of 'a @= b'") def __rmatmul__(self, x): x = asarray(x) if (x.ndim == 0): ValueError( "matmul: Input operand 1 does not have enough dimensions " "(has 0, gufunc core with signature (n?,k),(k,m?)->(n?,m?) " "requires 1" ) if x.ndim == 1 and self.ndim == 1: # Dot product if x.shape[0] == self._size: if self.fragmented: k = self._capacity - self._begin # fragmentation index out = matmul(x[:k], self[self._begin:]).view(ndarray) out += matmul(x[k:], self[:self._end]).view(ndarray) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] out = matmul(x, part) return(out.view(ndarray)) else: raise ValueError( "matmul: Input operand 1 has a mismatch in its core " "dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?)" " (size {n} is different from {m})".format( n=self._size, m=x.shape[0] ) ) elif x.ndim == 1 and self.ndim > 1: if x.shape[0] == self.shape[-2]: if self.ndim == 2: out = empty(self.shape[-1]) if self.fragmented: k = self._capacity - self._begin # fragmentation index matmul(x[:k], self[self._begin:], out) out += matmul(x[k:], self[:self._end]).view(ndarray) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] matmul(x, part, out) return(out) else: out = empty( (self._size, *self.shape[1:-2], self.shape[-1]) ) if self.fragmented: k = self._capacity - self._begin # fragmentation index matmul(x, self[self._begin:], out[:k]) matmul(x, self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] matmul(x, part, out) return(out) else: raise ValueError( "matmul: Input operand 1 has a mismatch in its core " "dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?)" " (size {n} is different from {m})".format( n=self.shape[-2], m=x.shape[0] ) ) elif x.ndim > 1 and self.ndim == 1: if x.shape[-1] == self.shape[0]: out = empty(x.shape[:-1]) if self.fragmented: k = self._capacity - self._begin # fragmentation index matmul(x[..., :, :k], self[self._begin:], out) out += matmul(x[..., :, k:], self[:self._end]).view( ndarray ) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] matmul(x, part, out) return(out) else: raise ValueError( "matmul: Input operand 1 has a mismatch in its core " "dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?)" " (size {n} is different from {m})".format( n=self.shape[-2], m=x.shape[0] ) ) elif self.ndim == 2: if (x.shape[-1] == self.shape[-2]): out = empty( (*x.shape[:-1], self.shape[-2]) ) if self.fragmented: k = self._capacity - self._begin # fragmentation index matmul(x[..., :, :k], self[self._begin:], out) out += matmul(x[..., :, k:], self[:self._end]).view( ndarray ) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] matmul(x, part, out) return(out.view(ndarray)) else: raise ValueError( ( "matmul: Input operand 1 has a mismatch in its core " "dimension 0, with gufunc signature (n?,k),(k,m?)->" "(n?,m?) (size {n} is different from {m})" ).format( n=self.shape[-1], m=x.shape[-2] ) ) else: if (x.shape[-1] == self.shape[-2]): self_shape = (self._size, *self.shape[1:-2]) starexpr = tuple( zip_longest(self_shape, x.shape[:-2], fillvalue=1) ) if star_can_broadcast(starexpr): broadcast_shape = tuple( starmap( lambda a, b: max(a, b), starexpr ) ) out = empty( (*broadcast_shape, x.shape[-2], self.shape[-1]) ) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim > 2: matmul(x[:k], self[self._begin:], out[:k]) matmul(x[k:], self[:self._end], out[k:]) else: matmul(x, self[self._begin:], out[:k]) matmul(x, self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] matmul(x, part, out) return(out.view(ndarray)) else: raise ValueError( ( "operands could not be broadcast together with" "remapped shapes [original->remapped]: " "{shape_b}->({shape_bn}, newaxis,newaxis) " "{shape_a}->({shape_an}, newaxis,newaxis) " "and requested shape ({n},{m})" ).format( shape_a=self_shape, shape_b=x.shape, shape_an=self.shape[:-2].__str__()[:-1], shape_bn=x.shape[:-2].__str__()[:-1], n=self.shape[-1], m=x.shape[-2] ) ) else: raise ValueError( ( "matmul: Input operand 1 has a mismatch in its core " "dimension 0, with gufunc signature (n?,k),(k,m?)->" "(n?,m?) (size {n} is different from {m})" ).format( n=self.shape[-1], m=x.shape[-2] ) )
[docs] def get_partions(self) -> Union[ndarray, Tuple[ndarray, ndarray]]: """ Gets a slice of the buffer between the beginning and end indices. If the buffer is fragmented, a tuple of two slices of the two fragments sequentially. Concatenating the slices in the order they are in the tuple will return a list of elements in the correct order. Time complexity: O(1) :returns: slice or tuple of slices of the array elements in order :rtype: Union[ndarray, Tuple[ndarray, ndarray]] """ if self.fragmented: return (self[self._begin:], self[:self._end]) else: return self[self._begin:self._end]
def __add__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: add(self[self._begin:], x[:k], out[:k]) add(self[:self._end], x[k:], out[k:]) else: add(self[self._begin:], x, out[:k]) add(self[:self._end], x, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] add(part, x, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __radd__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: add(x[:k], self[self._begin:], out[:k]) add(x[k:], self[:self._end], out[k:]) else: add(x, self[self._begin:], out[:k]) add(x, self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] add(x, part, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast" "together with shapes {} {}".format( x.shape, (self._size, *self.shape[1:]) ) ) def __iadd__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: add(self[self._begin:], x[:k], self[self._begin:]) add(self[:self._end], x[k:], self[:self._end]) else: add(self[self._begin:], x, self[self._begin:]) add(self[:self._end], x, self[:self._end]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] add(part, x, part) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __sub__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: subtract(self[self._begin:], x[:k], out[:k]) subtract(self[:self._end], x[k:], out[k:]) else: subtract(self[self._begin:], x, out[:k]) subtract(self[:self._end], x, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] subtract(part, x, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __rsub__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: subtract(x[:k], self[self._begin:], out[:k]) subtract(x[k:], self[:self._end], out[k:]) else: subtract(x, self[self._begin:], out[:k]) subtract(x, self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] subtract(x, part, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast" "together with shapes {} {}".format( x.shape, (self._size, *self.shape[1:]) ) ) def __isub__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: subtract(self[self._begin:], x[:k], self[self._begin:]) subtract(self[:self._end], x[k:], self[:self._end]) else: subtract(self[self._begin:], x, self[self._begin:]) subtract(self[:self._end], x, self[:self._end]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] subtract(part, x, part) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __mul__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: multiply(self[self._begin:], x[:k], out[:k]) multiply(self[:self._end], x[k:], out[k:]) else: multiply(self[self._begin:], x, out[:k]) multiply(self[:self._end], x, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] multiply(part, x, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __rmul__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: multiply(x[:k], self[self._begin:], out[:k]) multiply(x[k:], self[:self._end], out[k:]) else: multiply(x, self[self._begin:], out[:k]) multiply(x, self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] multiply(x, part, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast" "together with shapes {} {}".format( x.shape, (self._size, *self.shape[1:]) ) ) def __imul__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: multiply(self[self._begin:], x[:k], self[self._begin:]) multiply(self[:self._end], x[k:], self[:self._end]) else: multiply(self[self._begin:], x, self[self._begin:]) multiply(self[:self._end], x, self[:self._end]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] multiply(part, x, part) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __truediv__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: divide(self[self._begin:], x[:k], out[:k]) divide(self[:self._end], x[k:], out[k:]) else: divide(self[self._begin:], x, out[:k]) divide(self[:self._end], x, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] divide(part, x, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __rtruediv__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: divide(x[:k], self[self._begin:], out[:k]) divide(x[k:], self[:self._end], out[k:]) else: divide(x, self[self._begin:], out[:k]) divide(x, self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] divide(x, part, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast" "together with shapes {} {}".format( x.shape, (self._size, *self.shape[1:]) ) ) def __itruediv__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: divide(self[self._begin:], x[:k], self[self._begin:]) divide(self[:self._end], x[k:], self[:self._end]) else: divide(self[self._begin:], x, self[self._begin:]) divide(self[:self._end], x, self[:self._end]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] divide(part, x, part) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __floordiv__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: floor_divide(self[self._begin:], x[:k], out[:k]) floor_divide(self[:self._end], x[k:], out[k:]) else: floor_divide(self[self._begin:], x, out[:k]) floor_divide(self[:self._end], x, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] floor_divide(part, x, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __rfloordiv__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: floor_divide(x[:k], self[self._begin:], out[:k]) floor_divide(x[k:], self[:self._end], out[k:]) else: floor_divide(x, self[self._begin:], out[:k]) floor_divide(x, self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] floor_divide(x, part, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast" "together with shapes {} {}".format( x.shape, (self._size, *self.shape[1:]) ) ) def __ifloordiv__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: floor_divide(self[self._begin:], x[:k], self[self._begin:]) floor_divide(self[:self._end], x[k:], self[:self._end]) else: floor_divide(self[self._begin:], x, self[self._begin:]) floor_divide(self[:self._end], x, self[:self._end]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] floor_divide(part, x, part) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __mod__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: mod(self[self._begin:], x[:k], out[:k]) mod(self[:self._end], x[k:], out[k:]) else: mod(self[self._begin:], x, out[:k]) mod(self[:self._end], x, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] mod(part, x, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __rmod__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: mod(x[:k], self[self._begin:], out[:k]) mod(x[k:], self[:self._end], out[k:]) else: mod(x, self[self._begin:], out[:k]) mod(x, self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] mod(x, part, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast" "together with shapes {} {}".format( x.shape, (self._size, *self.shape[1:]) ) ) def __imod__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: mod(self[self._begin:], x[:k], self[self._begin:]) mod(self[:self._end], x[k:], self[:self._end]) else: mod(self[self._begin:], x, self[self._begin:]) mod(self[:self._end], x, self[:self._end]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] mod(part, x, part) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __pow__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: power(self[self._begin:], x[:k], out[:k]) power(self[:self._end], x[k:], out[k:]) else: power(self[self._begin:], x, out[:k]) power(self[:self._end], x, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] power(part, x, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __rpow__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: power(x[:k], self[self._begin:], out[:k]) power(x[k:], self[:self._end], out[k:]) else: power(x, self[self._begin:], out[:k]) power(x, self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] power(x, part, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast" "together with shapes {} {}".format( x.shape, (self._size, *self.shape[1:]) ) ) def __ipow__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: power(self[self._begin:], x[:k], self[self._begin:]) power(self[:self._end], x[k:], self[:self._end]) else: power(self[self._begin:], x, self[self._begin:]) power(self[:self._end], x, self[:self._end]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] power(part, x, part) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __and__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: bitwise_and(self[self._begin:], x[:k], out[:k]) bitwise_and(self[:self._end], x[k:], out[k:]) else: bitwise_and(self[self._begin:], x, out[:k]) bitwise_and(self[:self._end], x, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] bitwise_and(part, x, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __rand__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: bitwise_and(self[self._begin:], x[:k], out[:k]) bitwise_and(self[:self._end], x[k:], out[k:]) else: bitwise_and(self[self._begin:], x, out[:k]) bitwise_and(self[:self._end], x, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] bitwise_and(x, part, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast" "together with shapes {} {}".format( x.shape, (self._size, *self.shape[1:]) ) ) def __iand__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: bitwise_and(self[self._begin:], x[:k], self[self._begin:]) bitwise_and(self[:self._end], x[k:], self[:self._end]) else: bitwise_and(self[self._begin:], x, self[self._begin:]) bitwise_and(self[:self._end], x, self[:self._end]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] bitwise_and(part, x, part) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __or__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: bitwise_or(x[:k], self[self._begin:], out[:k]) bitwise_or(x[k:], self[:self._end], out[k:]) else: bitwise_or(x, self[self._begin:], out[:k]) bitwise_or(x, self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] bitwise_or(x, part, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast" "together with shapes {} {}".format( x.shape, (self._size, *self.shape[1:]) ) ) def __ror__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: bitwise_or(self[self._begin:], x[:k], out[:k]) bitwise_or(self[:self._end], x[k:], out[k:]) else: bitwise_or(self[self._begin:], x, out[:k]) bitwise_or(self[:self._end], x, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] bitwise_or(x, part, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast" "together with shapes {} {}".format( x.shape, (self._size, *self.shape[1:]) ) ) def __ior__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: bitwise_or(self[self._begin:], x[:k], self[self._begin:]) bitwise_or(self[:self._end], x[k:], self[:self._end]) else: bitwise_or(self[self._begin:], x, self[self._begin:]) bitwise_or(self[:self._end], x, self[:self._end]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] bitwise_or(part, x, part) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __xor__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: bitwise_xor(x[:k], self[self._begin:], out[:k]) bitwise_xor(x[k:], self[:self._end], out[k:]) else: bitwise_xor(x, self[self._begin:], out[:k]) bitwise_xor(x, self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] bitwise_xor(x, part, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast" "together with shapes {} {}".format( x.shape, (self._size, *self.shape[1:]) ) ) def __rxor__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: bitwise_xor(self[self._begin:], x[:k], out[:k]) bitwise_xor(self[:self._end], x[k:], out[k:]) else: bitwise_xor(self[self._begin:], x, out[:k]) bitwise_xor(self[:self._end], x, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] bitwise_xor(x, part, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast" "together with shapes {} {}".format( x.shape, (self._size, *self.shape[1:]) ) ) def __ixor__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: bitwise_xor(self[self._begin:], x[:k], self[self._begin:]) bitwise_xor(self[:self._end], x[k:], self[:self._end]) else: bitwise_xor(self[self._begin:], x, self[self._begin:]) bitwise_xor(self[:self._end], x, self[:self._end]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] bitwise_xor(part, x, part) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __rshift__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: right_shift(self[self._begin:], x[:k], out[:k]) right_shift(self[:self._end], x[k:], out[k:]) else: right_shift(self[self._begin:], x, out[:k]) right_shift(self[:self._end], x, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] right_shift(part, x, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __rrshift__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: right_shift(x[:k], self[self._begin:], out[:k]) right_shift(x[k:], self[:self._end], out[k:]) else: right_shift(x, self[self._begin:], out[:k]) right_shift(x, self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] right_shift(x, part, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __irshift__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: right_shift(self[self._begin:], x[:k], self[self._begin:]) right_shift(self[:self._end], x[k:], self[:self._end]) else: right_shift(self[self._begin:], x, self[self._begin:]) right_shift(self[:self._end], x, self[:self._end]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] right_shift(part, x, part) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __lshift__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: left_shift(self[self._begin:], x[:k], out[:k]) left_shift(self[:self._end], x[k:], out[k:]) else: left_shift(self[self._begin:], x, out[:k]) left_shift(self[:self._end], x, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] left_shift(part, x, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __rlshift__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): out = empty(tuple(starmap(lambda a, b: max(a, b), starexpr))) if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: left_shift(x[:k], self[self._begin:], out[:k]) left_shift(x[k:], self[:self._end], out[k:]) else: left_shift(x, self[self._begin:], out[:k]) left_shift(x, self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] left_shift(x, part, out) return(out.view(ndarray)) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __ilshift__(self, x): x = asarray(x) self_shape = (self._size, *self.shape[1:]) starexpr = tuple(zip_longest(self_shape, x.shape, fillvalue=1)) if star_can_broadcast(starexpr): if self.fragmented: k = self._capacity - self._begin # fragmentation index if x.ndim >= 1: left_shift(self[self._begin:], x[:k], self[self._begin:]) left_shift(self[:self._end], x[k:], self[:self._end]) else: left_shift(self[self._begin:], x, self[self._begin:]) left_shift(self[:self._end], x, self[:self._end]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] left_shift(part, x, part) else: raise ValueError( "operands could not be broadcast " "together with shapes {} {}".format( (self._size, *self.shape[1:]), x.shape ) ) def __invert__(self): out = empty((self._size, *self.shape[1:])) if self.fragmented: k = self._capacity - self._begin # fragmentation index invert(self[self._begin:], out[:k]) invert(self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] invert(part, out) return(out.view(ndarray)) def __inv__(self): return self.__invert__() def __abs__(self): out = empty((self._size, *self.shape[1:])) if self.fragmented: k = self._capacity - self._begin # fragmentation index absolute(self[self._begin:], out[:k]) absolute(self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] absolute(part, out) return(out.view(ndarray)) def __neg__(self): out = empty((self._size, *self.shape[1:])) if self.fragmented: k = self._capacity - self._begin # fragmentation index negative(self[self._begin:], out[:k]) negative(self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] negative(part, out) return(out.view(ndarray)) def __pos__(self): out = empty((self._size, *self.shape[1:])) if self.fragmented: k = self._capacity - self._begin # fragmentation index positive(self[self._begin:], out[:k]) positive(self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] positive(part, out) return(out.view(ndarray)) @property def fragmented(self) -> bool: """ Property that returns True if the buffer is fragmented (the beginning index is greater than the end index), False otherwise. Time complexity: O(1) :returns: True if buffer is fragmented, False otherwise. :rtype: bool """ return not ( self._begin < self._end or self._end == 0 ) @property def full(self) -> bool: """ Property that returns True if the buffer is full, False otherwise. Time complexity: O(1) :returns: True if buffer is full, False otherwise. :rtype: bool See Also -------- :func:`NumpyCircularBuffer.empty` """ return (self._size == self._capacity) @property def empty(self) -> bool: """ Property that returns True if the buffer is empty, False otherwise. Time complexity: O(1) :returns: True if buffer is empty, False otherwise. :rtype: bool See Also -------- :func:`NumpyCircularBuffer.full` """ return (self._size == 0)
[docs] def reset(self): """ Empties all elements from the buffer. Time complexity: O(1) :returns: True if buffer is empty, False otherwise. :rtype: bool """ self._begin = 0 self._end = 0 self._size = 0
[docs] def append(self, value): """ Append a value to the buffer on the right. If the buffer is full, the buffer will advance forward (wrapping around at the ends) and overwrite an element. Time complexity: O(1) See Also -------- :func:`NumpyCircularBuffer.pop` :func:`NumpyCircularBuffer.peek` """ self[self._end] = value self._end = (self._end + 1) % self._capacity if self.full: self._begin = (self._begin + 1) % self._capacity else: self._size += 1
[docs] def pop(self): """ Gets the element at the start of the buffer and advances the start of the buffer by one, consuming the element returned. Time complexity: O(1) :raises: :class:`ValueError` if buffer is empty :returns: element at the start of the buffer See Also -------- :func:`NumpyCircularBuffer.peek` :func:`NumpyCircularBuffer.append` """ if not self.empty: i = self._begin self._begin = (self._begin + 1) % self._capacity self._size -= 1 return (self[i]) else: raise ValueError
[docs] def peek(self): """ Gets the element at the start of the buffer without advancing the start of the buffer. Time complexity: O(1) :raises: :class:`ValueError` if buffer is empty :returns: element at the start of the buffer See Also -------- :func:`NumpyCircularBuffer.pop` :func:`NumpyCircularBuffer.append` """ if not self.empty: return (self[self._begin]) else: raise ValueError
[docs] def all(self, *args, **kwargs): """ Returns True if all elements evaluate to True. :returns: True if all elements evaluate to True, False otherwise. See Also -------- :func:`ndarray.all` """ if self.fragmented: return ( np.all(self[self._begin:].view(ndarray), *args, **kwargs) and np.all(self[:self._end].view(ndarray), *args, **kwargs) ) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] return (np.all(part.view(ndarray), *args, **kwargs))
[docs] def any(self, *args, **kwargs): """ Returns True if any elements evaluate to True. :returns: True if any elements evaluate to True, False otherwise. See Also -------- :func:`ndarray.any` """ if self.fragmented: return ( np.any(self[self._begin:].view(ndarray), *args, **kwargs) and np.any(self[:self._end].view(ndarray), *args, **kwargs) ) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] return (np.any(part.view(ndarray), *args, **kwargs))
[docs] def argmax(self, *args, **kwargs): # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def argmin(self, *args, **kwargs): # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
def argpartition(self, *args, **kwargs) -> NoReturn: """ :raises NotImplementedError: This function has no plan for implementation as of this version. """ raise NotImplementedError def argsort(self, *args, **kwargs) -> NoReturn: """ :raises NotImplementedError: This function has no plan for implementation as of this version. """ raise NotImplementedError def astype(self, *args, **kwargs) -> NoReturn: # TODO: Considered """ :raises NotImplementedError: This function is being considered for implementation in the future """ raise NotImplementedError
[docs] def byteswap(self, inplace=False): """ Swap the bytes of the array elements over the valid range of the buffer Toggle between low-endian and big-endian data representation by returning a byteswapped array, optionally swapped in-place. Arrays of byte-strings are not swapped. The real and imaginary parts of a complex number are swapped individually. See Also -------- :func:`ndarray.byteswap` """ if inplace: if self.fragmented: (self[self._begin:].view(ndarray)).byteswap(inplace) (self[:self._end].view(ndarray)).byteswap(inplace) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] (part.view(ndarray)).byteswap(inplace) return self.view(ndarray) else: out = empty_like(self) if self.fragmented: k = self._capacity - self._begin # fragmentation index out[:k] = (self[self._begin:].view(ndarray)).byteswap(inplace) out[k:] = (self[:self._end].view(ndarray)).byteswap(inplace) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] out = (part.view(ndarray)).byteswap(inplace) return (out)
[docs] def choose(self, choices, out=None, mode='raise'): # TODO: Considered """ :raises NotImplementedError: This function is being considered for implementation in the future """ raise NotImplementedError
[docs] def clip(self, min=None, max=None, out=None, **kwargs): """ Return an array whose values are limited to [min, max] over the valid range of the buffer. One of max or min must be given. See Also -------- :func:`numpy.clip` """ if min is None and max is None: raise ValueError("One of max or min must be given") if out is None: out = empty_like(self) if self.fragmented: k = self._capacity - self._begin # fragmentation index if out is self: np.clip( min, max, self[self._begin:], self[self._begin:], **kwargs ) np.clip( min, max, self[:self._end], self[:self._end], **kwargs ) else: np.clip(min, max, self[self._begin:], out[:k], **kwargs) np.clip(min, max, self[:self._end], out[k:], **kwargs) return(out.view(ndarray)) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] np.clip(min, max, part, out, **kwargs) return (out.view(ndarray))
[docs] def conj(self): """ Complex-conjugate all elements over the valid range of the buffer. See Also -------- :func:`numpy.conjugate()` """ out = empty((self._size, *self.shape[1:]), self.dtype) if self.fragmented: k = self._capacity - self._begin # fragmentation index np.conjugate(self[self._begin:], out[:k]) np.conjugate(self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] np.conjugate(part, out) return(out.view(ndarray))
[docs] def conjugate(self): """ Complex-conjugate all elements over the valid range of the buffer. See Also -------- :func:`numpy.conjugate()` """ out = empty((self._size, *self.shape[1:]), self.dtype) if self.fragmented: k = self._capacity - self._begin # fragmentation index np.conjugate(self[self._begin:], out[:k]) np.conjugate(self[:self._end], out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] np.conjugate(part, out) return(out)
[docs] def copy(self, order='C', defrag=False): """ Return a copy of the array over the valid range of the buffer. See Also -------- :func:`ndarray.copy` """ out = empty((self._size, *self.shape[1:]), self.dtype, order) if self.fragmented: if defrag: k = self._capacity - self._begin # fragmentation index np.copyto(out[:k], self[self._begin:], casting='no') np.copyto(out[k:], self[:self._end], casting='no') else: np.copyto(out, self) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] np.copyto(out, part, casting='no') return(out)
[docs] def cumprod(self, axis=None, dtype=None, out=None) -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def cumsum(self, axis=None, dtype=None, out=None) -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def diagonal(self, offset=0, axis1=0, axis2=1) -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def dot(self, b, out=None) -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def dump(self, b, out=None) -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def dumps(self, b, out=None) -> NoReturn: # type: ignore # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def fill(self, value): """ Fill the valid region of the buffer with a scalar value. :param (scalar) value: All elements of *a* will be assigned this value. See Also -------- :func:`ndarray.fill` """ if self.fragmented: (self[self._begin:].view(ndarray)).fill(value) (self[:self._end].view(ndarray)).fill(value) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] (part.view(ndarray)).fill(value)
[docs] def flatten(self, order='C', defrag=False): """ Return a copy of the array collapsed into one dimension. See Also -------- :func:`ndarray.flatten` """ if self.fragmented: if defrag: out = empty(self.size, self.dtype, order) # fragmentation index k = np.product(self.shape[1:]) * (self._capacity - self._begin) out[:k] = (self[self._begin:].view(ndarray)).flat out[k:] = (self[:self._end].view(ndarray)).flat else: out = (self.view(ndarray)).flatten() else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] out = (part.view(ndarray)).flatten() return (out)
[docs] def getfield(dtype, offset=0) -> NoReturn: # type: ignore # TODO: Considered """ :raises NotImplementedError: This function is being considered for implementation in the future """ raise NotImplementedError
[docs] def item(self, *args) -> NoReturn: # TODO: Considered """ :raises NotImplementedError: This function is being considered for implementation in the future """ raise NotImplementedError
[docs] def itemset(self, *args) -> NoReturn: # TODO: Consider this for proper overloading """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def max(self, axis=None, out=None, keepdims=False, initial=None, where=True) -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def min(self, axis=None, out=None, keepdims=False, initial=None, where=True) -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def newbyteorder(self, new_order='S') -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def nonzero(self) -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function has no plan for implementation as of this version. """ raise NotImplementedError
def partition( # type: ignore kth, axis=-1, kind='introselect', order=None ) -> NoReturn: """ :raises NotImplementedError: This function has no plan for implementation as of this version. """ raise NotImplementedError
[docs] def prod( # type: ignore axis=None, # type: ignore dtype=None, out=None, keepdims=False, initial=1, where=True ) -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def ptp(axis=None, out=None, keepdims=False) -> NoReturn: # type: ignore # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def put(indices, values, mode='raise') -> NoReturn: # type: ignore """ :raises NotImplementedError: This function has no plan for implementation as of this version. """ raise NotImplementedError
[docs] def ravel(order) -> NoReturn: # type: ignore # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def repeat(order) -> NoReturn: # type: ignore # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
def reshape(shape, order) -> NoReturn: # type: ignore """ :raises NotImplementedError: This function has no plan for implementation as of this version. """ raise NotImplementedError def resize(shape, order) -> NoReturn: # type: ignore """ :raises NotImplementedError: This function has no plan for implementation as of this version. """ raise NotImplementedError
[docs] def round(self, decimals=0, out=None): """ Return a copy of the valid region of the buffer with each element rounded to the given number of decimals. See Also -------- :func:`numpy.around` """ if out is None: out = empty_like( self, subok=False, shape=(self._size, *self.shape[1:]) ) if self.fragmented: if out is self: np.around( self[self._begin:].view(ndarray), decimals, self[self._begin:] ) np.around( self[:self._end].view(ndarray), decimals, self[:self._end] ) else: k = self._capacity - self._begin # fragmentation index np.around(self[self._begin:].view(ndarray), decimals, out[:k]) np.around(self[:self._end].view(ndarray), decimals, out[k:]) else: if self._begin < self._end: part = self[self._begin:self._end] elif self._end == 0: part = self[self._begin:] np.around(part.view(ndarray), decimals, out) return(out)
[docs] def searchsorted(self, v, side='left', sorter=None) -> NoReturn: """ :raises NotImplementedError: This function has no plan for implementation as of this version. """ raise NotImplementedError
[docs] def setfield(self, val, dtype, offset=0) -> NoReturn: """ :raises NotImplementedError: This function has no plan for implementation as of this version. """ raise NotImplementedError
def sort(self, axis=-1, kind=None, order=None) -> NoReturn: """ :raises NotImplementedError: This function has no plan for implementation as of this version. """ raise NotImplementedError
[docs] def squeeze(self, axis=-1) -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def std(self, axis=-1) -> NoReturn: # type: ignore # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def sum(self, axis=-1, dtype=None, out=None, keepdims=False, initial=0, where=True) -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def swapaxes(self, axis1, axis2) -> NoReturn: # TODO: Considered """ :raises NotImplementedError: This function is being considered for implementation in the future """ raise NotImplementedError
[docs] def take(self, indices, axis=None, out=None, mode='raise') -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def tobytes(self, order='C') -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def tofile(self, fid, sep="", format="%s") -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def tolist(self) -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def tostring(self, order='C') -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError
[docs] def transpose(self, *axes) -> NoReturn: """ :raises NotImplementedError: This function has no plan for implementation as of this version. """ raise NotImplementedError
[docs] def var(self, axis=None, dtype=None, out=None, ddof=0, keepdims=False, *, where=True) -> NoReturn: # TODO: Flagged for implementation """ :raises NotImplementedError: This function will be implemented in the future """ raise NotImplementedError