# Copyright (C) 2024-2026 Benjamin Pachev
#
# This file is part of cuDOLFINX
#
# SPDX-License-Identifier: LGPL-3.0-or-later
"""GPU-accelerated assembly routines."""
from __future__ import annotations
import tempfile
import typing
from petsc4py import PETSc
import numpy as np
from cudolfinx import cpp as _cucpp
from cudolfinx.bcs import CUDADirichletBC
from cudolfinx.context import get_cuda_context
from cudolfinx.form import BlockCUDAForm, CUDAForm
from cudolfinx.la import CUDAMatrix, CUDAVector
from dolfinx.fem.bcs import DirichletBC
from dolfinx.fem.forms import Form
from dolfinx.fem.function import Function, FunctionSpace
__all__ = [
"CUDAAssembler",
"create_petsc_cuda_vector",
]
[docs]
def create_petsc_cuda_vector(L: Form) -> PETSc.Vec:
"""Create PETSc Vector on device."""
index_map = L.function_spaces[0].dofmap.index_map
bs = L.function_spaces[0].dofmap.index_map_bs
size = (index_map.size_local * bs, index_map.size_global * bs)
# we need to provide at least the local CPU array
arr = np.zeros(size[0])
return PETSc.Vec().createCUDAWithArrays(cpuarray=arr, size=size, bsize=bs, comm=index_map.comm)
[docs]
class CUDAAssembler:
"""Class for assembly on the GPU."""
def __init__(self):
"""Initialize the assembler."""
self._ctx = get_cuda_context()
self._tmpdir = tempfile.TemporaryDirectory()
self._cpp_object = _cucpp.fem.CUDAAssembler(self._ctx, self._tmpdir.name)
[docs]
def assemble_matrix(self,
a: CUDAForm,
mat: _cucpp.fem.CUDAMatrix | None = None,
bcs: list[DirichletBC] | CUDADirichletBC | None = [],
diagonal: float = 1.0,
constants: list | None = None,
coeffs: list | None = None,
zero: bool = True,
):
"""Assemble bilinear form into a matrix on the GPU.
Args:
a: The bilinear form to assemble.
mat: Matrix in which to store assembled values.
bcs: Boundary conditions that affect the assembled matrix.
Degrees-of-freedom constrained by a boundary condition will
have their rows/columns zeroed and the value ``diagonal``
set on on the matrix diagonal.
diagonal: Value to set on the diagonal.
constants: Constants that appear in the form. If not provided,
any required coefficients will be computed.
coeffs: Optional list of form coefficients to repack.
If not specified, all will be repacked.
If an empty list is passed, no repacking will be performed.
zero: Whether to zero the matrix prior to assembly
Returns:
Matrix representation of the bilinear form ``a``.
Note:
The returned matrix is not finalised, i.e. ghost values are not
accumulated.
"""
if not isinstance(a, CUDAForm):
raise TypeError("Expected CUDAForm, got '{type(a)}'")
if mat is None:
mat = self.create_matrix(a)
if type(bcs) is list:
bc_collection = self.pack_bcs(bcs)
elif type(bcs) is CUDADirichletBC:
bc_collection = bcs
else:
raise TypeError(
f"Expected either a list of DirichletBC's or a CUDADirichletBC, got '{type(bcs)}'"
)
_bc0 = bc_collection._get_cpp_bcs(a.dolfinx_form.function_spaces[0])
_bc1 = bc_collection._get_cpp_bcs(a.dolfinx_form.function_spaces[1])
# For now always re-copy to device on assembly
# This assumes something has changed on the host
a.to_device()
self.pack_coefficients(a, coeffs)
if zero:
_cucpp.fem.zero_matrix_entries(self._ctx, self._cpp_object, mat._cpp_object)
_cucpp.fem.assemble_matrix_on_device(
self._ctx, self._cpp_object, a.cuda_form,
a.cuda_mesh, mat._cpp_object, _bc0, _bc1
)
return mat
[docs]
def assemble_matrix_block(self,
a: BlockCUDAForm,
mat: CUDAMatrix | None = None,
bcs: list[typing.Any] | None = None,
coeffs: list[list[typing.Any]] | None = None,
constants: list[list[typing.Any]] | None = None,
zero: bool = True
):
"""Assemble block form into a matrix on the GPU."""
if mat is None:
mat = self.create_matrix_block(a)
if zero:
_cucpp.fem.zero_matrix_entries(self._ctx, self._cpp_object, mat._cpp_object)
_bc0, _bc1 = a.make_block_bc(bcs)
for i, row in enumerate(a.forms):
for j, form in enumerate(row):
_coeffs = coeffs[i][j] if coeffs is not None else None
_constants = constants[i][j] if constants is not None else None
form.to_device()
self.pack_coefficients(form, coeffs)
_cucpp.fem.assemble_matrix_on_device(
self._ctx, self._cpp_object, form.cuda_form,
form.cuda_mesh, mat._cpp_object, _bc0, _bc1
)
return mat
[docs]
def assemble_vector(self,
b: CUDAForm,
vec: CUDAVector | None = None,
constants=None, coeffs=None, zero=True
):
"""Assemble linear form into vector on GPU.
Args:
b: the linear form to use for assembly
vec: the vector to assemble into. Created if it doesn't exist
constants: Form constants
coeffs: Optional list of form coefficients to repack.
If not specified, all will be repacked.
If an empty list is passed, no repacking will be performed.
zero: Whether to zero vector entries before assembly (defaults to true)
"""
if not isinstance(b, CUDAForm):
raise TypeError(f"Expected CUDAForm, got '{type(b)}'")
if vec is None:
vec = self.create_vector(b)
# For now always re-copy to device on assembly
# This assumes something has changed on the host
b.to_device()
self.pack_coefficients(b, coeffs)
if zero:
_cucpp.fem.zero_vector_entries(self._ctx, self._cpp_object, vec._cpp_object)
_cucpp.fem.assemble_vector_on_device(self._ctx, self._cpp_object, b.cuda_form,
b.cuda_mesh, vec._cpp_object)
return vec
[docs]
def assemble_vector_block(self,
b: BlockCUDAForm,
vec: CUDAVector | None = None,
constants=None, coeffs=None, zero=True
):
"""Assemble block linear form into vector on GPU.
Args:
b: the block linear form to use for assembly
vec: the vector to assemble into. Created if not provided
constants: List of constants for each sub form
coeffs: Optional list of form coefficients to repack.
If not specified, all will be repacked.
If an empty list is passed, no repacking will be performed.
zero: Whether to zero vector entries before assembly (defaults to true)
"""
if not isinstance(b, BlockCUDAForm):
raise TypeError(f"Expected BlockCUDAForm, got '{type(b)}'")
if vec is None:
vec = self.create_vector_block(b)
num_forms = len(b.forms)
_constants = [None]*num_forms if constants is None else constants
_coeffs = [None]*num_forms if coeffs is None else coeffs
if len(_constants) != num_forms:
raise ValueError(
f"Expected constants to be None or a list of length: '{num_forms}',"
f" got '{len(_constants)}' instead!"
)
if len(_coeffs) != num_forms:
raise ValueError(
f"Expected coeffs to be None or a list of length: '{num_forms}',"
f" got '{len(_coeffs)}' instead!"
)
if zero:
_cucpp.fem.zero_vector_entries(self._ctx, self._cpp_object, vec._cpp_object)
for form, form_constants, form_coeffs in zip(b.forms, _constants, _coeffs):
self.assemble_vector(
form,
vec=vec,
constants=form_constants,
coeffs=form_coeffs,
zero=False
)
[docs]
def assemble_scalar(self,
b: CUDAForm,
constants=None, coeffs=None
):
"""Assemble scalar integral on GPU.
Args:
b: the functional to use for assembly
constants: Form constants
coeffs: Optional list of form coefficients to repack.
If not specified, all will be repacked.
If an empty list is passed, no repacking will be performed.
"""
if not isinstance(b, CUDAForm):
raise TypeError(f"Expected CUDAForm, got '{type(b)}'")
# For now always re-copy to device on assembly
# This assumes something has changed on the host
b.to_device()
self.pack_coefficients(b, coeffs)
return _cucpp.fem.assemble_scalar_on_device(self._ctx, self._cpp_object, b.cuda_form,
b.cuda_mesh)
[docs]
def create_matrix(self, a: CUDAForm) -> CUDAMatrix:
"""Create a CUDAMatrix from a given form."""
if not isinstance(a, CUDAForm):
raise TypeError(f"Expected CUDAForm, got type '{type(a)}'.")
petsc_mat = _cucpp.fem.petsc.create_cuda_matrix(a.dolfinx_form._cpp_object)
return CUDAMatrix(self._ctx, petsc_mat)
[docs]
def create_matrix_block(self, a: BlockCUDAForm) -> CUDAMatrix:
"""Create a block matrix from a block form."""
if not isinstance(a, BlockCUDAForm):
raise TypeError(f"Expected BlockCUDAForm, got type '{type(a)}'")
_cpp_forms = [[cuda_form.cuda_form for cuda_form in row] for row in a.forms]
petsc_mat = _cucpp.fem.petsc.create_cuda_matrix_block(_cpp_forms)
return CUDAMatrix(self._ctx, petsc_mat)
[docs]
def create_vector(self, b: CUDAForm) -> CUDAVector:
"""Create a CUDAVector from a given form."""
if not isinstance(b, CUDAForm):
raise TypeError(f"Expected CUDAForm, got type '{type(b)}'.")
petsc_vec = create_petsc_cuda_vector(b.dolfinx_form)
return CUDAVector(self._ctx, petsc_vec)
[docs]
def create_vector_block(self, b: BlockCUDAForm) -> CUDAVector:
"""Create a CUDAVector from a given form."""
if not isinstance(b, BlockCUDAForm):
raise TypeError(f"Expected BlockCUDAForm, got type '{type(b)}')")
petsc_vec = PETSc.Vec().createCUDAWithArrays(
cpuarray=np.zeros(b.local_size),
size=(b.local_size, b.global_size)
)
return CUDAVector(self._ctx, petsc_vec)
[docs]
def pack_bcs(self, bcs: list[DirichletBC]) -> CUDADirichletBC:
"""Pack boundary conditions into a single object for use in assembly.
The returned object is of type CUDADirichletBC and can be used in place of a list of
regular DirichletBCs. This is more efficient when performing multiple operations
with the same list of boundary conditions, or when
boundary condition values need to change over time.
"""
return CUDADirichletBC(self._ctx, bcs)
[docs]
def pack_coefficients(self, a: CUDAForm, coefficients: list[Function] | None=None):
"""Pack coefficients on device."""
if not isinstance(a, CUDAForm):
raise TypeError(f"Expected CUDAForm, got type '{type(a)}'.")
if coefficients is None:
_cucpp.fem.pack_coefficients(self._ctx, self._cpp_object, a.cuda_form)
else:
# perform a repacking with only the indicated coefficients
_coefficients = [c._cpp_object for c in coefficients]
_cucpp.fem.pack_coefficients(self._ctx, self._cpp_object, a.cuda_form, _coefficients)
[docs]
def apply_lifting(self,
b: CUDAVector,
a: list[CUDAForm],
bcs: list[list[DirichletBC] | CUDADirichletBC],
x0: list[CUDAVector] | None = None,
scale: float = 1.0,
coeffs: list[list[Function]] | None = None
):
"""GPU equivalent of apply_lifting.
Args:
b: CUDAVector to modify
a: list of forms to lift
bcs: list of boundary condition lists
x0: optional list of shift vectors for lifting
scale: scale of lifting
coeffs: coefficients to (re-)pack
"""
if len(a) != len(bcs):
raise ValueError("Lengths of forms and bcs must match!")
if x0 is not None and len(x0) != len(a):
raise ValueError("Lengths of forms and x0 must match!")
_x0 = [] if x0 is None else [x._cpp_object for x in x0]
bc_collections = []
for bc_collection in bcs:
if type(bc_collection) is list:
bc_collections.append(self.pack_bcs(bc_collection))
elif type(bc_collection) is CUDADirichletBC:
bc_collections.append(bc_collection)
else:
raise TypeError(
f"Expected either a list of DirichletBC's or a CUDADirichletBC,"
f" got '{type(bc_collection)}'"
)
if coeffs is None:
coeffs = [None] * len(a)
elif not len(coeffs):
coeffs = [[] for form in a]
cuda_forms = []
cuda_mesh = None
_bcs = []
for form, bc_collection, form_coeffs in zip(a, bc_collections, coeffs):
self.pack_coefficients(form, form_coeffs)
cuda_forms.append(form.cuda_form)
if cuda_mesh is None:
cuda_mesh = form.cuda_mesh
_bcs.append(bc_collection._get_cpp_bcs(form.function_spaces[1]))
_cucpp.fem.apply_lifting_on_device(
self._ctx, self._cpp_object,
cuda_forms, cuda_mesh,
b._cpp_object, _bcs, _x0, scale
)
[docs]
def apply_lifting_block(self,
b: CUDAVector,
a: BlockCUDAForm,
bcs: list[DirichletBC] | typing.Any,
x0: CUDAVector | None = None,
alpha: float = 1.0,
set_bcs=True
):
"""Apply lifting with a single block form."""
_x0 = [] if x0 is None else x0._cpp_object
if type(bcs) is list:
block_bc = a.make_block_bc(bcs)[0]
elif type(bcs) in (_cucpp.fem.CUDADirichletBC_float32, _cucpp.fem.CUDADirichletBC_float64):
block_bc = bcs
else:
raise TypeError(
f"Expected either a list of DirichletBCs or"
f" a _cpp.fem_CUDADirichletBC_float64/32, got '{type(bcs)}'"
)
cuda_mesh = None
for row in a.forms:
_bcs = []
_x0_list = []
_cuda_forms = []
for form in row:
self.pack_coefficients(form)
if cuda_mesh is None:
cuda_mesh = form.cuda_mesh
_cuda_forms.append(form.cuda_form)
_bcs.append(block_bc)
if x0 is not None:
_x0_list.append(_x0)
_cucpp.fem.apply_lifting_on_device(
self._ctx, self._cpp_object,
_cuda_forms, cuda_mesh,
b._cpp_object, _bcs, _x0_list, alpha
)
if x0 is not None:
_cucpp.fem.set_bc_on_device(self._ctx, self._cpp_object,
b._cpp_object, block_bc, _x0, alpha)
else:
_cucpp.fem.set_bc_on_device(self._ctx, self._cpp_object,
b._cpp_object, block_bc, alpha)
[docs]
def set_bc(self,
b: CUDAVector,
bcs: list[DirichletBC] | CUDADirichletBC,
V: FunctionSpace,
x0: CUDAVector | None = None,
scale: float = 1.0,
):
"""Set boundary conditions on device.
Args:
b: vector to modify
bcs: collection of bcs
V: FunctionSpace to which bcs apply
x0: optional shift vector
scale: scaling factor
"""
if type(bcs) is list:
bc_collection = self.pack_bcs(bcs)
elif type(bcs) is CUDADirichletBC:
bc_collection = bcs
else:
raise TypeError(
f"Expected either a list of DirichletBC's or a CUDADirichletBC, got '{type(bcs)}'"
)
if hasattr(V, '_cpp_object'):
_cppV = V._cpp_object
else:
_cppV = V
_bcs = bc_collection._get_cpp_bcs(_cppV)
if x0 is None:
_cucpp.fem.set_bc_on_device(
self._ctx, self._cpp_object,
b._cpp_object, _bcs, scale
)
else:
_cucpp.fem.set_bc_on_device(
self._ctx, self._cpp_object,
b._cpp_object, _bcs, x0._cpp_object, scale
)