import numpy as np
from pyfr.solvers.baseadvec import BaseAdvectionElements
class BaseFluidElements:
@staticmethod
def privars(ndims, cfg):
if ndims == 2:
return ['rho', 'u', 'v', 'p']
elif ndims == 3:
return ['rho', 'u', 'v', 'w', 'p']
@staticmethod
def convars(ndims, cfg):
if ndims == 2:
return ['rho', 'rhou', 'rhov', 'E']
elif ndims == 3:
return ['rho', 'rhou', 'rhov', 'rhow', 'E']
dualcoeffs = convars
@staticmethod
def visvars(ndims, cfg):
if ndims == 2:
return {
'density': ['rho'],
'velocity': ['u', 'v'],
'pressure': ['p']
}
elif ndims == 3:
return {
'density': ['rho'],
'velocity': ['u', 'v', 'w'],
'pressure': ['p']
}
@staticmethod
def pri_to_con(pris, cfg):
rho, p = pris[0], pris[-1]
# Multiply velocity components by rho
rhovs = [rho*c for c in pris[1:-1]]
# Compute the energy
gamma = cfg.getfloat('constants', 'gamma')
E = p/(gamma - 1) + 0.5*rho*sum(c*c for c in pris[1:-1])
return [rho, *rhovs, E]
@staticmethod
def con_to_pri(cons, cfg):
rho, E = cons[0], cons[-1]
# Divide momentum components by rho
vs = [rhov/rho for rhov in cons[1:-1]]
# Compute the pressure
gamma = cfg.getfloat('constants', 'gamma')
p = (gamma - 1)*(E - 0.5*rho*sum(v**2 for v in vs))
return [rho, *vs, p]
@staticmethod
def diff_con_to_pri(cons, diff_cons, cfg):
rho, *rhouvw = cons[:-1]
diff_rho, *diff_rhouvw, diff_E = diff_cons
# Divide momentum components by ρ
uvw = [rhov / rho for rhov in rhouvw]
# Velocity gradients: ∂u⃗ = 1/ρ·[∂(ρu⃗) - u⃗·∂ρ]
diff_uvw = [(diff_rhov - v*diff_rho) / rho
for diff_rhov, v in zip(diff_rhouvw, uvw)]
# Pressure gradient: ∂p = (γ - 1)·[∂E - 1/2*(u⃗·∂(ρu⃗) + ρu⃗·∂u⃗)]
gamma = cfg.getfloat('constants', 'gamma')
diff_p = diff_E - 0.5*(sum(u*dru for u, dru in zip(uvw, diff_rhouvw)) +
sum(ru*du for ru, du in zip(rhouvw, diff_uvw)))
diff_p *= gamma - 1
return [diff_rho, *diff_uvw, diff_p]
@staticmethod
def validate_formulation(ctrl):
shock_capturing = ctrl.cfg.get('solver', 'shock-capturing', 'none')
if shock_capturing == 'entropy-filter':
if ctrl.formulation == 'dual':
raise ValueError('Entropy filtering not compatible with '
'dual time stepping.')
elif ctrl.controller_has_variable_dt:
raise ValueError('Entropy filtering not compatible with '
'adaptive time stepping.')
def set_backend(self, *args, **kwargs):
super().set_backend(*args, **kwargs)
# Can elide shock-capturing at p = 0
shock_capturing = self.cfg.get('solver', 'shock-capturing', 'none')
# Modified entropy filtering method using specific physical
# entropy (without operator splitting for Navier-Stokes)
# doi:10.1016/j.jcp.2022.111501
if shock_capturing == 'entropy-filter' and self.basis.order != 0:
self._be.pointwise.register(
'pyfr.solvers.euler.kernels.entropylocal'
)
self._be.pointwise.register(
'pyfr.solvers.euler.kernels.entropyfilter'
)
# Template arguments
fpts_in_upts = self.basis.fpts_in_upts
self.nefpts = self.nupts if fpts_in_upts else self.nupts + self.nfpts
ub = self.basis.ubasis
meanwts = ub.invvdm[:, 0] / np.sum(ub.invvdm[:, 0])
eftplargs = {
'ndims': self.ndims, 'nupts': self.nupts,
'nfpts': self.nfpts, 'nefpts': self.nefpts,
'nvars': self.nvars, 'nfaces': self.nfaces,
'c': self.cfg.items_as('constants', float),
'order': self.basis.order, 'fpts_in_upts': fpts_in_upts,
'meanwts': meanwts
}
# Check to see if running anti-aliasing
if self.antialias:
raise ValueError('Entropy filter not compatible with '
'anti-aliasing.')
# Minimum density/pressure constraints
eftplargs['d_min'] = self.cfg.getfloat('solver-entropy-filter',
'd-min', 1e-6)
eftplargs['p_min'] = self.cfg.getfloat('solver-entropy-filter',
'p-min', 1e-6)
# Entropy tolerance
eftplargs['e_tol'] = self.cfg.getfloat('solver-entropy-filter',
'e-tol', 1e-6)
# Inner solver parameters
eftplargs['f_tol'] = self.cfg.getfloat('solver-entropy-filter',
'f-tol', 1e-4)
eftplargs['niters'] = self.cfg.getfloat('solver-entropy-filter',
'niters', 2)
# Use linearised constraints/limiting kernel approach from
# Ching et al. (doi:10.1016/j.jcp.2024.112881)
form = self.cfg.get('solver-entropy-filter', 'formulation',
'nonlinear')
eftplargs['linearise'] = form == 'linearised'
# Precompute basis orders for filter
ubdegs = self.basis.ubasis.degrees
eftplargs['ubdegs'] = [int(max(dd)) for dd in ubdegs]
eftplargs['order'] = self.basis.order
# Compute local entropy bounds
self.kernels['local_entropy'] = lambda uin: self._be.kernel(
'entropylocal', tplargs=eftplargs, dims=[self.neles],
u=self.scal_upts[uin], entmin_int=self.entmin_int,
m0=self.m0
)
# Apply entropy filter
self.kernels['entropy_filter'] = lambda uin: self._be.kernel(
'entropyfilter', tplargs=eftplargs, dims=[self.neles],
u=self.scal_upts[uin], entmin_int=self.entmin_int,
vdm=self.vdm_ef, invvdm=self.invvdm, m0=self.m0
)
[docs]
class EulerElements(BaseFluidElements, BaseAdvectionElements):
[docs]
def set_backend(self, *args, **kwargs):
super().set_backend(*args, **kwargs)
# Can elide interior flux calculations at p = 0
if self.basis.order == 0:
return
# Register our flux kernels
self._be.pointwise.register('pyfr.solvers.euler.kernels.tflux')
# Template parameters for the flux kernels
tplargs = {
'ndims': self.ndims,
'nvars': self.nvars,
'nverts': len(self.basis.linspts),
'c': self.cfg.items_as('constants', float),
'jac_exprs': self.basis.jac_exprs
}
# Helpers
tdisf = []
c, l = 'curved', 'linear'
r, s = self._mesh_regions, self._slice_mat
slicedk = self._make_sliced_kernel
if c in r and 'flux' not in self.antialias:
tdisf.append(lambda uin: self._be.kernel(
'tflux', tplargs=tplargs | {'ktype': 'curved'},
dims=[self.nupts, r[c]], u=s(self.scal_upts[uin], c),
f=s(self._vect_upts, c), smats=self.curved_smat_at('upts')
))
elif c in r:
tdisf.append(lambda: self._be.kernel(
'tflux', tplargs=tplargs | {'ktype': 'curved'},
dims=[self.nqpts, r[c]], u=s(self._scal_qpts, c),
f=s(self._vect_qpts, c), smats=self.curved_smat_at('qpts')
))
if l in r and 'flux' not in self.antialias:
tdisf.append(lambda uin: self._be.kernel(
'tflux', tplargs=tplargs | {'ktype': 'linear'},
dims=[self.nupts, r[l]], u=s(self.scal_upts[uin], l),
f=s(self._vect_upts, l), verts=self.ploc_at('linspts', l),
upts=self.upts
))
elif l in r:
tdisf.append(lambda: self._be.kernel(
'tflux', tplargs=tplargs | {'ktype': 'linear'},
dims=[self.nqpts, r[l]], u=s(self._scal_qpts, l),
f=s(self._vect_qpts, l), verts=self.ploc_at('linspts', l),
upts=self.qpts
))
if 'flux' not in self.antialias:
self.kernels['tdisf'] = lambda uin: slicedk(k(uin) for k in tdisf)
else:
self.kernels['tdisf'] = lambda: slicedk(k() for k in tdisf)