Source code for pymech.core

"""Core data structures for pymech"""
import copy
from textwrap import dedent, indent
import itertools
from itertools import product
from functools import reduce, partial

import numpy as np
from pymech.log import logger

"""Repeat N times. Pythonic idiom to use when the iterated value is discarded.

Instead of:

>>> [0 for _ in range(10)]

You could use:

>>> [0 for _ in repeat(10)]

repeat = partial(itertools.repeat, None)

# ==============================================================================
[docs]class DataLims: """A class containing the extrema of all quantities stored in the mesh Attributes ---------- - pos: x,y,z min,max - vel: u,v,w min,max - pres: p min,max - temp: T min,max - scal: s_i min,max """ def __init__(self, elements): self._variables = ("pos", "vel", "pres", "temp", "scal") aggregated_lims = reduce(self._lims_aggregator, elements) for var in self._variables: agg_lims_var = aggregated_lims[var] # set minimum, maximum of variables as a nested tuple setattr(self, var, tuple(zip(*agg_lims_var))) # prevent further mutation of attributes via __setattr__ self._initialized = True def __repr__(self): return dedent( f"""\ * x: {self.pos[0]} * y: {self.pos[1]} * z: {self.pos[2]}""" ) def __setattr__(self, name, value): if hasattr(self, "_initialized") and self._initialized: raise AttributeError(f"Setting attribute {name} is not permitted") else: super().__setattr__(name, value) def _lims_per_element(self, elem): """Get local limits for a given element.""" if isinstance(elem, dict): return elem axis = (1, 2, 3) elem_lims = { var: (getattr(elem, var).min(axis), getattr(elem, var).max(axis)) for var in self._variables } return elem_lims def _lims_aggregator(self, elem1, elem2): """Reduce local limits to global limits.""" l1 = self._lims_per_element(elem1) l2 = self._lims_per_element(elem2) aggregated_lims = { var: ( np.minimum(l1[var][0], l2[var][0]), np.maximum(l1[var][1], l2[var][1]), ) for var in self._variables } return aggregated_lims
# ==============================================================================
[docs]class Elem: """A class containing one hexahedral element of Nek5000/SIMSON flow field. Parameters ---------- var : iterable Iterable of integers of size 5, indicating how many variables are to be initialized lr1 : iterable Iterable of integers of size 3, defining the shape of an element as ``(lx, ly, lz)`` nbc : int Number of boundary conditions dtype : str Floating point data type. Typical values are 'f4' or 'float32' for single precision, 'f8' or 'float64' for double precision """ def __init__(self, var, lr1, nbc, dtype="float64"): # x,y,z lz ly lx self.pos = np.zeros((3, lr1[2], lr1[1], lr1[0]), dtype=dtype) # one per edge self.curv = np.zeros((12, 5), dtype=dtype) # curvature type self.ccurv = ["" for _ in repeat(12)] # u,v,w lz ly lx self.vel = np.zeros((3, lr1[2], lr1[1], lr1[0]), dtype=dtype) # p lz ly lx self.pres = np.zeros((var[2], lr1[2], lr1[1], lr1[0]), dtype=dtype) # T lz ly lx self.temp = np.zeros((var[3], lr1[2], lr1[1], lr1[0]), dtype=dtype) # s_i lz ly lx self.scal = np.zeros((var[4], lr1[2], lr1[1], lr1[0]), dtype=dtype) # list of 8 parameters, one per face # one column for velocity, one for temperature, and one for each scalar self.bcs = np.zeros((nbc, 6), dtype="U3, i4, i4" + f", {dtype}" * 5) def __repr__(self): message = f"<elem centered at {self.centroid}>" return message @property def centroid(self): return self.pos.mean(axis=(1, 2, 3))
[docs] def smallest_edge(self): """returns the length of the smallest edge, neglecting curvature""" # get coordinates of points x1, y1, z1 = self.pos[:, 0, 0, 0] x2, y2, z2 = self.pos[:, 0, 0, 1] x3, y3, z3 = self.pos[:, 0, 1, 0] x4, y4, z4 = self.pos[:, 0, 1, 1] # compute squares of edges lengths edges_l2 = np.zeros((12,)) edges_l2[0] = (x2 - x1) ** 2 + (y2 - y1) ** 2 + (z2 - z1) ** 2 edges_l2[1] = (x3 - x2) ** 2 + (y3 - y2) ** 2 + (z3 - z2) ** 2 edges_l2[2] = (x4 - x3) ** 2 + (y4 - y3) ** 2 + (z4 - z3) ** 2 edges_l2[3] = (x1 - x4) ** 2 + (y1 - y4) ** 2 + (z1 - z4) ** 2 # the dimension is not stored but we can cheat if self.pos.shape[1] > 1: ndim = 3 else: ndim = 2 if ndim > 2: # in 3D, do the same for the upper face, and also the side edges x5, y5, z5 = self.pos[:, 1, 0, 0] x6, y6, z6 = self.pos[:, 1, 0, 1] x7, y7, z7 = self.pos[:, 1, 1, 0] x8, y8, z8 = self.pos[:, 1, 1, 1] edges_l2[4] = (x6 - x5) ** 2 + (y6 - y5) ** 2 + (z6 - z5) ** 2 edges_l2[5] = (x7 - x6) ** 2 + (y7 - y6) ** 2 + (z7 - z6) ** 2 edges_l2[6] = (x8 - x7) ** 2 + (y8 - y7) ** 2 + (z8 - z7) ** 2 edges_l2[7] = (x5 - x8) ** 2 + (y5 - y8) ** 2 + (z5 - z8) ** 2 edges_l2[8] = (x5 - x1) ** 2 + (y5 - y1) ** 2 + (z5 - z1) ** 2 edges_l2[9] = (x6 - x2) ** 2 + (y6 - y2) ** 2 + (z6 - z2) ** 2 edges_l2[10] = (x7 - x3) ** 2 + (y7 - y3) ** 2 + (z7 - z3) ** 2 edges_l2[11] = (x8 - x4) ** 2 + (y8 - y4) ** 2 + (z8 - z4) ** 2 return np.sqrt(edges_l2.min()) else: return np.sqrt(edges_l2[:4].min())
[docs] def face_center(self, i): """Return the coordinates (x, y, z) of the center of the face number i""" if i == 0: kx1, ky1, kz1 = 0, 0, 0 kx2, ky2, kz2 = 0, 0, -1 kx3, ky3, kz3 = -1, 0, 0 kx4, ky4, kz4 = -1, 0, -1 elif i == 1: kx1, ky1, kz1 = -1, 0, 0 kx2, ky2, kz2 = -1, 0, -1 kx3, ky3, kz3 = -1, -1, 0 kx4, ky4, kz4 = -1, -1, -1 elif i == 2: kx1, ky1, kz1 = 0, -1, 0 kx2, ky2, kz2 = 0, -1, -1 kx3, ky3, kz3 = -1, -1, 0 kx4, ky4, kz4 = -1, -1, -1 elif i == 3: kx1, ky1, kz1 = 0, 0, 0 kx2, ky2, kz2 = 0, 0, -1 kx3, ky3, kz3 = 0, -1, 0 kx4, ky4, kz4 = 0, -1, -1 elif i == 4: kx1, ky1, kz1 = 0, 0, 0 kx2, ky2, kz2 = 0, -1, 0 kx3, ky3, kz3 = -1, 0, 0 kx4, ky4, kz4 = -1, -1, 0 elif i == 5: kx1, ky1, kz1 = 0, 0, -1 kx2, ky2, kz2 = 0, -1, -1 kx3, ky3, kz3 = -1, 0, -1 kx4, ky4, kz4 = -1, -1, -1 else: logger.error(f"Invalid face number {i} (must be between 0 and 5)") (x1, y1, z1) = self.pos[:, kz1, ky1, kx1] (x2, y2, z2) = self.pos[:, kz2, ky2, kx2] (x3, y3, z3) = self.pos[:, kz3, ky3, kx3] (x4, y4, z4) = self.pos[:, kz4, ky4, kx4] return ( 0.25 * (x1 + x2 + x3 + x4), 0.25 * (y1 + y2 + y3 + y4), 0.25 * (z1 + z2 + z3 + z4), )
# ==============================================================================
[docs]class HexaData: """A class containing data related to a hexahedral mesh""" def __init__(self, ndim, nel, lr1, var, nbc=0, dtype="float64"): self.ndim = ndim self.nel = nel self.ncurv = [] self.nbc = nbc self.var = var self.lr1 = lr1 self.time = [] self.istep = [] self.wdsz = [] self.endian = [] if isinstance(dtype, type): # For example np.float64 -> "float64" dtype = dtype.__name__ self.elem = [Elem(var, lr1, nbc, dtype) for _ in repeat(nel)] self.elmap = np.linspace(1, nel, nel, dtype=np.int32) def __repr__(self): representation = dedent( f"""\ <pymech.core.HexaData> Dimensions: {self.ndim} Precision: {self.wdsz} bytes Mesh limits:\n{indent(repr(self.lims), " "*10)} Time: * time: {self.time} * istep: {self.istep} Elements: * nel: {self.nel} * elem: [{self.elem[0]} ... {self.elem[-1]}] """ ) return representation @property def lims(self): return DataLims(self.elem)
[docs] def check_connectivity(self, tol=1e-3): """Check element connectivity, specifically for matching boundary conditions and geometry. Errors are reported as logging messages. Parameters ---------- tol : float relative tolerance (compared to the smallest edge of adjacent elements) for detecting whether faces are at the same location """ dim = self.ndim err = False for (iel, el), ibc, iface in product( enumerate(self.elem), range(self.nbc), range(2 * dim) ): cbc = el.bcs[ibc, iface][0] if cbc == "E" or cbc == "P": connected_iel = int(el.bcs[ibc, iface][3]) - 1 connected_face = int(el.bcs[ibc, iface][4]) - 1 xc, yc, zc = el.face_center(iface) if connected_iel < 0 or connected_iel >= self.nel: err = True logger.error( f"face {iface} of element {iel} is connected ('{cbc}') to face " f"{connected_face} of the nonexistent element {connected_iel}" ) logger.error(f"face center: ({xc:.6e} {yc:.6e} {zc:.6e})") else: cbc1 = self.elem[connected_iel].bcs[ibc, connected_face][0] iel1 = int(self.elem[connected_iel].bcs[ibc, connected_face][3]) - 1 iface1 = ( int(self.elem[connected_iel].bcs[ibc, connected_face][4]) - 1 ) xc1, yc1, zc1 = self.elem[connected_iel].face_center(connected_face) if cbc1 != cbc or iel1 != iel or iface1 != iface: err = True logger.error( "mismatched boundary conditions: " f"face {iface + 1} of element {iel + 1} with " f"condition '{cbc}' is connected to face {connected_face + 1} " f"of element {connected_iel + 1}, which has condition '{cbc1}' " f"and points to face {iface1} of element {iel1}" ) logger.error( f"face centers: ({xc:.6e} {yc:.6e} {zc:.6e}), ({xc1:.6e} {yc1:.6e} {zc1:.6e})" ) elif cbc == "E": # no check for 'P' yet, but it should be possible dist = np.sqrt( (xc - xc1) ** 2 + (yc - yc1) ** 2 + (zc - zc1) ** 2 ) max_dist = tol * min( el.smallest_edge(), self.elem[connected_iel].smallest_edge() ) if dist > max_dist: err = True logger.error( "mismatched face locations: " f"face {iface + 1} of element {iel + 1} and " f"face {connected_face + 1} of element {connected_iel + 1} " "are connected but are not at the same location" ) logger.error( f"face centers: ({xc:.6e} {yc:.6e} {zc:.6e}), ({xc1:.6e} {yc1:.6e} {zc1:.6e})" ) return not err
[docs] def check_bcs_present(self): """ Returns True if and only if all faces of all elements have boundary conditions applied. Note that this function returning False does not mean the mesh is invalid: it is not mandatory to define internal boundary conditions for Nek5000. """ res = True for (iel, el), ibc, iface in product( enumerate(self.elem), range(self.nbc), range(2 * self.ndim) ): if el.bcs[ibc, iface][0] == "": res = False logger.error( f"missing boundary condition at element {iel}, face {iface}, field {ibc}" ) return res
[docs] def merge(self, other, tol=1e-2, ignore_empty=True, ignore_all_bcs=False): """ Merges another :class:`pymech.core.HexaData` into the current one and connects it Parameters ---------- other: :class:`pymech.core.HexaData` mesh to merge into self tol: float maximum distance, relative to the smallest edge of neighbouring elements, at which faces are considered touching ignore_empty: bool if True, the faces with an empty boundary condition ('') will be treated as internal faces and will not be merged. This is useful if internal boundary conditions are not defined and will make the operation *much* faster, but requires boundary conditions to be defined on the faces to be merged. ignore_all_bcs: bool if True, the boundary conditions will not be changed. This is likely to result in invalid boundary conditions at the interface between the merged meshes. This option is intended for fast merging in a situation in which the boundary conditions are either irrelevant or will be defined or corrected later. """ # perform some consistency checks if self.ndim != other.ndim: logger.error( f"Cannot merge meshes of dimensions {self.ndim} and {other.ndim}!" ) return -1 if self.lr1[0] != other.lr1[0]: logger.error( "Cannot merge meshes of different polynomial orders ({} != {})".format( self.lr1[0], other.lr1[0] ) ) return -2 # add the new elements (in an inconsistent state if there are internal boundary conditions) nel1 = self.nel self.nel = self.nel + other.nel self.ncurv = self.ncurv + other.ncurv self.elmap = np.concatenate((self.elmap, other.elmap + nel1)) # the deep copy is required here to avoid leaving the 'other' mesh in an inconsistent state by modifying its elements mesh_add = copy.deepcopy(other) mesh_add.offset_connectivity(nel1) self.elem = self.elem + mesh_add.elem # check how many boundary condition fields we have nbc = min(self.nbc, other.nbc) # glue common faces together # only look for the neighbour in the first BC field because it should be the same in all fields. nfaces = 2 * self.ndim nchanges = 0 # counter for the boundary conditions connected if nbc == 0 or ignore_all_bcs: # Quickly exit the function logger.debug("no pairs of faces to merge") return nchanges for iel0, iface0 in product(range(nel1, self.nel), range(nfaces)): elem0 = self.elem[iel0] bc0 = elem0.bcs[0, iface0][0] if bc0 != "E" and not (ignore_empty and bc0 == ""): # boundary element, look if it can be connected to something for iel1, iface1 in product(range(nel1), range(nfaces)): elem1 = self.elem[iel1] bc1 = elem1.bcs[0, iface1][0] if bc1 != "E" and not (ignore_empty and bc1 == ""): # if the centers of the faces are close, connect them together x0, y0, z0 = elem0.face_center(iface0) x1, y1, z1 = elem1.face_center(iface1) dist = np.sqrt((x1 - x0) ** 2 + (y1 - y0) ** 2 + (z1 - z0) ** 2) dist_ref = min(elem0.smallest_edge(), elem1.smallest_edge()) if dist <= tol * dist_ref: # reconnect the periodic faces together (assumes that all fields are periodic) if bc0 == bc1 == "P": iel_p0 = int(elem0.bcs[0, iface0][3]) - 1 iel_p1 = int(elem1.bcs[0, iface1][3]) - 1 iface_p0 = int(elem0.bcs[0, iface0][4]) - 1 iface_p1 = int(elem1.bcs[0, iface1][4]) - 1 for ibc in range(nbc): elem_p0_bcs = self.elem[iel_p0].bcs[ibc, iface_p0] elem_p1_bcs = self.elem[iel_p1].bcs[ibc, iface_p1] elem_p0_bcs[0] = "P" elem_p1_bcs[0] = "P" elem_p0_bcs[3] = iel_p1 + 1 elem_p1_bcs[3] = iel_p0 + 1 elem_p0_bcs[4] = iface_p1 + 1 elem_p1_bcs[4] = iface_p0 + 1 for ibc in range(nbc): elem0.bcs[ibc, iface0][0] = "E" elem1.bcs[ibc, iface1][0] = "E" elem0.bcs[ibc, iface0][3] = iel1 + 1 elem1.bcs[ibc, iface1][3] = iel0 + 1 elem0.bcs[ibc, iface0][4] = iface1 + 1 elem1.bcs[ibc, iface1][4] = iface0 + 1 nchanges = nchanges + 1 logger.debug(f"merged {nchanges} pairs of faces") return nchanges
[docs] def get_points(self): """ Returns an array containing the coordinates of all the points in the mesh as a (nel, lx1*ly1*lz1, 3) array """ lx1, ly1, lz1 = self.lr1 nxyz = lx1 * ly1 * lz1 xyz = np.zeros((self.nel, nxyz, 3)) for el, lxyz in zip(self.elem, xyz): for idim in range(3): lxyz[:, idim] = el.pos[idim, ...].ravel() return xyz
[docs] def update_ncurv(self): """ Updates the metadata `ncurv` integer to match the actual number of curved faces present in the mesh """ ncurv = 0 for el in self.elem: for iedge in range(12): if el.ccurv[iedge] != "": ncurv = ncurv + 1 self.ncurv = ncurv
[docs] def offset_connectivity(self, offset: int, iel_min=0): """ Adds a value to the index of the elements connected via internal or periodic boundary conditions to elements of the mesh. This is used to keep the connectivity valid when deleting or inserting elements in the mesh. Parameters ---------- offset : int The value by which to offset the indices iel_min : int The first element (in zero-based indexing) to offset """ for el, ibc, iface in product(self.elem, range(self.nbc), range(2 * self.ndim)): bc = el.bcs[ibc, iface][0] if bc == "E" or bc == "P": if ( int(el.bcs[ibc, iface][3]) > iel_min ): # the connected element number is 1-indexed el.bcs[ibc, iface][3] += offset # also fix the index of the elements themselves in its their BC if relevant for el, ibc, iface in product( self.elem[iel_min:], range(self.nbc), range(2 * self.ndim) ): if int(el.bcs[ibc, iface][1]) > iel_min: el.bcs[ibc, iface][1] += offset