Source code for pyfr.solvers.euler.elements

import numpy as np

from pyfr.solvers.baseadvec import BaseAdvectionElements


class BaseFluidElements:
    privarmap = {2: ['rho', 'u', 'v', 'p'],
                 3: ['rho', 'u', 'v', 'w', 'p']}

    convarmap = {2: ['rho', 'rhou', 'rhov', 'E'],
                 3: ['rho', 'rhou', 'rhov', 'rhow', 'E']}

    dualcoeffs = convarmap

    visvarmap = {
        2: [('density', ['rho']),
            ('velocity', ['u', 'v']),
            ('pressure', ['p'])],
        3: [('density', ['rho']),
            ('velocity', ['u', 'v', 'w']),
            ('pressure', ['p'])]
    }

    @staticmethod
    def pri_to_con(pris, cfg):
        rho, p = pris[0], pris[-1]

        # Multiply velocity components by rho
        rhovs = [rho*c for c in pris[1:-1]]

        # Compute the energy
        gamma = cfg.getfloat('constants', 'gamma')
        E = p/(gamma - 1) + 0.5*rho*sum(c*c for c in pris[1:-1])

        return [rho, *rhovs, E]

    @staticmethod
    def con_to_pri(cons, cfg):
        rho, E = cons[0], cons[-1]

        # Divide momentum components by rho
        vs = [rhov/rho for rhov in cons[1:-1]]

        # Compute the pressure
        gamma = cfg.getfloat('constants', 'gamma')
        p = (gamma - 1)*(E - 0.5*rho*sum(v*v for v in vs))

        return [rho, *vs, p]

    @staticmethod
    def diff_con_to_pri(cons, diff_cons, cfg):
        rho, *rhouvw = cons[:-1]
        diff_rho, *diff_rhouvw, diff_E = diff_cons

        # Divide momentum components by ρ
        uvw = [rhov / rho for rhov in rhouvw]

        # Velocity gradients: ∂u⃗ = 1/ρ·[∂(ρu⃗) - u⃗·∂ρ]
        diff_uvw = [(diff_rhov - v*diff_rho) / rho
                    for diff_rhov, v in zip(diff_rhouvw, uvw)]

        # Pressure gradient: ∂p = (γ - 1)·[∂E - 1/2*(u⃗·∂(ρu⃗) - ρu⃗·∂u⃗)]
        gamma = cfg.getfloat('constants', 'gamma')
        diff_p = diff_E - 0.5*(np.einsum('ijk,ijk->jk', uvw,  diff_rhouvw) +
                               np.einsum('ijk,ijk->jk', rhouvw, diff_uvw))
        diff_p *= gamma - 1

        return [diff_rho, *diff_uvw, diff_p]

    @staticmethod
    def validate_formulation(ctrl):
        shock_capturing = ctrl.cfg.get('solver', 'shock-capturing', 'none')
        if shock_capturing == 'entropy-filter':
            if ctrl.formulation == 'dual':
                raise ValueError('Entropy filtering not compatible with '
                                 'dual time stepping.')
            elif ctrl.controller_has_variable_dt:
                raise ValueError('Entropy filtering not compatible with '
                                 'adaptive time stepping.')

    def set_backend(self, *args, **kwargs):
        super().set_backend(*args, **kwargs)

        # Can elide shock-capturing at p = 0
        shock_capturing = self.cfg.get('solver', 'shock-capturing', 'none')

        # Modified entropy filtering method using specific physical
        # entropy (without operator splitting for Navier-Stokes)
        # doi:10.1016/j.jcp.2022.111501
        if shock_capturing == 'entropy-filter' and self.basis.order != 0:
            self._be.pointwise.register(
                'pyfr.solvers.euler.kernels.entropylocal'
            )
            self._be.pointwise.register(
                'pyfr.solvers.euler.kernels.entropyfilter'
            )

            # Template arguments
            eftplargs = {
                'ndims': self.ndims,
                'nupts': self.nupts,
                'nfpts': self.nfpts,
                'nvars': self.nvars,
                'nfaces': self.nfaces,
                'c': self.cfg.items_as('constants', float),
                'order': self.basis.order
            }

            # Check to see if running anti-aliasing
            if self.antialias:
                raise ValueError('Entropy filter not compatible with '
                                 'anti-aliasing.')

            # Check to see if running collocated solution/flux points
            m0 = self.basis.m0
            mrowsum = np.max(np.abs(np.sum(m0, axis=1) - 1.0))
            if np.min(m0) < -1e-8 or mrowsum > 1e-8:
                raise ValueError('Entropy filter requires flux points to be a '
                                 'subset of solution points or a convex '
                                 'combination thereof.')

            # Minimum density/pressure constraints
            eftplargs['d_min'] = self.cfg.getfloat('solver-entropy-filter',
                                                   'd-min', 1e-6)
            eftplargs['p_min'] = self.cfg.getfloat('solver-entropy-filter',
                                                   'p-min', 1e-6)

            # Entropy tolerance
            eftplargs['e_tol'] = self.cfg.getfloat('solver-entropy-filter',
                                                   'e-tol', 1e-6)

            # Hidden kernel parameters
            eftplargs['f_tol'] = self.cfg.getfloat('solver-entropy-filter',
                                                   'f-tol', 1e-4)
            eftplargs['niters'] = self.cfg.getfloat('solver-entropy-filter',
                                                    'niters', 20)
            efunc = self.cfg.get('solver-entropy-filter', 'e-func',
                                 'numerical')
            eftplargs['e_func'] = efunc
            if efunc not in {'numerical', 'physical'}:
                raise ValueError(f'Unknown entropy functional: {efunc}')

            # Precompute basis orders for filter
            ubdegs = self.basis.ubasis.degrees
            eftplargs['ubdegs'] = [int(max(dd)) for dd in ubdegs]
            eftplargs['order'] = self.basis.order

            # Compute local entropy bounds
            self.kernels['local_entropy'] = lambda uin: self._be.kernel(
                'entropylocal', tplargs=eftplargs, dims=[self.neles],
                u=self.scal_upts[uin], entmin_int=self.entmin_int
            )

            # Apply entropy filter
            self.kernels['entropy_filter'] = lambda uin: self._be.kernel(
                'entropyfilter', tplargs=eftplargs, dims=[self.neles],
                u=self.scal_upts[uin], entmin_int=self.entmin_int,
                vdm=self.vdm, invvdm=self.invvdm
            )


[docs] class EulerElements(BaseFluidElements, BaseAdvectionElements):
[docs] def set_backend(self, *args, **kwargs): super().set_backend(*args, **kwargs) # Can elide interior flux calculations at p = 0 if self.basis.order == 0: return # Register our flux kernels self._be.pointwise.register('pyfr.solvers.euler.kernels.tflux') # Template parameters for the flux kernels tplargs = { 'ndims': self.ndims, 'nvars': self.nvars, 'nverts': len(self.basis.linspts), 'c': self.cfg.items_as('constants', float), 'jac_exprs': self.basis.jac_exprs } # Helpers tdisf = [] c, l = 'curved', 'linear' r, s = self._mesh_regions, self._slice_mat slicedk = self._make_sliced_kernel if c in r and 'flux' not in self.antialias: tdisf.append(lambda uin: self._be.kernel( 'tflux', tplargs=tplargs | {'ktype': 'curved'}, dims=[self.nupts, r[c]], u=s(self.scal_upts[uin], c), f=s(self._vect_upts, c), smats=self.curved_smat_at('upts') )) elif c in r: tdisf.append(lambda: self._be.kernel( 'tflux', tplargs=tplargs | {'ktype': 'curved'}, dims=[self.nqpts, r[c]], u=s(self._scal_qpts, c), f=s(self._vect_qpts, c), smats=self.curved_smat_at('qpts') )) if l in r and 'flux' not in self.antialias: tdisf.append(lambda uin: self._be.kernel( 'tflux', tplargs=tplargs | {'ktype': 'linear'}, dims=[self.nupts, r[l]], u=s(self.scal_upts[uin], l), f=s(self._vect_upts, l), verts=self.ploc_at('linspts', l), upts=self.upts )) elif l in r: tdisf.append(lambda: self._be.kernel( 'tflux', tplargs=tplargs | {'ktype': 'linear'}, dims=[self.nqpts, r[l]], u=s(self._scal_qpts, l), f=s(self._vect_qpts, l), verts=self.ploc_at('linspts', l), upts=self.qpts )) if 'flux' not in self.antialias: self.kernels['tdisf'] = lambda uin: slicedk(k(uin) for k in tdisf) else: self.kernels['tdisf'] = lambda: slicedk(k() for k in tdisf)