Source code for cudolfinx.assemble

# Copyright (C) 2024-2026 Benjamin Pachev
#
# This file is part of cuDOLFINX
#
# SPDX-License-Identifier:    LGPL-3.0-or-later
"""GPU-accelerated assembly routines."""

from __future__ import annotations

import tempfile
import typing

from petsc4py import PETSc

import numpy as np

from cudolfinx import cpp as _cucpp
from cudolfinx.bcs import CUDADirichletBC
from cudolfinx.context import get_cuda_context
from cudolfinx.form import BlockCUDAForm, CUDAForm
from cudolfinx.la import CUDAMatrix, CUDAVector
from dolfinx.fem.bcs import DirichletBC
from dolfinx.fem.forms import Form
from dolfinx.fem.function import Function, FunctionSpace

__all__ = [
  "CUDAAssembler",
  "create_petsc_cuda_vector",
]

[docs] def create_petsc_cuda_vector(L: Form) -> PETSc.Vec: """Create PETSc Vector on device.""" index_map = L.function_spaces[0].dofmap.index_map bs = L.function_spaces[0].dofmap.index_map_bs size = (index_map.size_local * bs, index_map.size_global * bs) # we need to provide at least the local CPU array arr = np.zeros(size[0]) return PETSc.Vec().createCUDAWithArrays(cpuarray=arr, size=size, bsize=bs, comm=index_map.comm)
[docs] class CUDAAssembler: """Class for assembly on the GPU.""" def __init__(self): """Initialize the assembler.""" self._ctx = get_cuda_context() self._tmpdir = tempfile.TemporaryDirectory() self._cpp_object = _cucpp.fem.CUDAAssembler(self._ctx, self._tmpdir.name)
[docs] def assemble_matrix(self, a: CUDAForm, mat: _cucpp.fem.CUDAMatrix | None = None, bcs: list[DirichletBC] | CUDADirichletBC | None = [], diagonal: float = 1.0, constants: list | None = None, coeffs: list | None = None, zero: bool = True, ): """Assemble bilinear form into a matrix on the GPU. Args: a: The bilinear form to assemble. mat: Matrix in which to store assembled values. bcs: Boundary conditions that affect the assembled matrix. Degrees-of-freedom constrained by a boundary condition will have their rows/columns zeroed and the value ``diagonal`` set on on the matrix diagonal. diagonal: Value to set on the diagonal. constants: Constants that appear in the form. If not provided, any required coefficients will be computed. coeffs: Optional list of form coefficients to repack. If not specified, all will be repacked. If an empty list is passed, no repacking will be performed. zero: Whether to zero the matrix prior to assembly Returns: Matrix representation of the bilinear form ``a``. Note: The returned matrix is not finalised, i.e. ghost values are not accumulated. """ if not isinstance(a, CUDAForm): raise TypeError("Expected CUDAForm, got '{type(a)}'") if mat is None: mat = self.create_matrix(a) if type(bcs) is list: bc_collection = self.pack_bcs(bcs) elif type(bcs) is CUDADirichletBC: bc_collection = bcs else: raise TypeError( f"Expected either a list of DirichletBC's or a CUDADirichletBC, got '{type(bcs)}'" ) _bc0 = bc_collection._get_cpp_bcs(a.dolfinx_form.function_spaces[0]) _bc1 = bc_collection._get_cpp_bcs(a.dolfinx_form.function_spaces[1]) # For now always re-copy to device on assembly # This assumes something has changed on the host a.to_device() self.pack_coefficients(a, coeffs) if zero: _cucpp.fem.zero_matrix_entries(self._ctx, self._cpp_object, mat._cpp_object) _cucpp.fem.assemble_matrix_on_device( self._ctx, self._cpp_object, a.cuda_form, a.cuda_mesh, mat._cpp_object, _bc0, _bc1 ) return mat
[docs] def assemble_matrix_block(self, a: BlockCUDAForm, mat: CUDAMatrix | None = None, bcs: list[typing.Any] | None = None, coeffs: list[list[typing.Any]] | None = None, constants: list[list[typing.Any]] | None = None, zero: bool = True ): """Assemble block form into a matrix on the GPU.""" if mat is None: mat = self.create_matrix_block(a) if zero: _cucpp.fem.zero_matrix_entries(self._ctx, self._cpp_object, mat._cpp_object) _bc0, _bc1 = a.make_block_bc(bcs) for i, row in enumerate(a.forms): for j, form in enumerate(row): _coeffs = coeffs[i][j] if coeffs is not None else None _constants = constants[i][j] if constants is not None else None form.to_device() self.pack_coefficients(form, coeffs) _cucpp.fem.assemble_matrix_on_device( self._ctx, self._cpp_object, form.cuda_form, form.cuda_mesh, mat._cpp_object, _bc0, _bc1 ) return mat
[docs] def assemble_vector(self, b: CUDAForm, vec: CUDAVector | None = None, constants=None, coeffs=None, zero=True ): """Assemble linear form into vector on GPU. Args: b: the linear form to use for assembly vec: the vector to assemble into. Created if it doesn't exist constants: Form constants coeffs: Optional list of form coefficients to repack. If not specified, all will be repacked. If an empty list is passed, no repacking will be performed. zero: Whether to zero vector entries before assembly (defaults to true) """ if not isinstance(b, CUDAForm): raise TypeError(f"Expected CUDAForm, got '{type(b)}'") if vec is None: vec = self.create_vector(b) # For now always re-copy to device on assembly # This assumes something has changed on the host b.to_device() self.pack_coefficients(b, coeffs) if zero: _cucpp.fem.zero_vector_entries(self._ctx, self._cpp_object, vec._cpp_object) _cucpp.fem.assemble_vector_on_device(self._ctx, self._cpp_object, b.cuda_form, b.cuda_mesh, vec._cpp_object) return vec
[docs] def assemble_vector_block(self, b: BlockCUDAForm, vec: CUDAVector | None = None, constants=None, coeffs=None, zero=True ): """Assemble block linear form into vector on GPU. Args: b: the block linear form to use for assembly vec: the vector to assemble into. Created if not provided constants: List of constants for each sub form coeffs: Optional list of form coefficients to repack. If not specified, all will be repacked. If an empty list is passed, no repacking will be performed. zero: Whether to zero vector entries before assembly (defaults to true) """ if not isinstance(b, BlockCUDAForm): raise TypeError(f"Expected BlockCUDAForm, got '{type(b)}'") if vec is None: vec = self.create_vector_block(b) num_forms = len(b.forms) _constants = [None]*num_forms if constants is None else constants _coeffs = [None]*num_forms if coeffs is None else coeffs if len(_constants) != num_forms: raise ValueError( f"Expected constants to be None or a list of length: '{num_forms}'," f" got '{len(_constants)}' instead!" ) if len(_coeffs) != num_forms: raise ValueError( f"Expected coeffs to be None or a list of length: '{num_forms}'," f" got '{len(_coeffs)}' instead!" ) if zero: _cucpp.fem.zero_vector_entries(self._ctx, self._cpp_object, vec._cpp_object) for form, form_constants, form_coeffs in zip(b.forms, _constants, _coeffs): self.assemble_vector( form, vec=vec, constants=form_constants, coeffs=form_coeffs, zero=False )
[docs] def assemble_scalar(self, b: CUDAForm, constants=None, coeffs=None ): """Assemble scalar integral on GPU. Args: b: the functional to use for assembly constants: Form constants coeffs: Optional list of form coefficients to repack. If not specified, all will be repacked. If an empty list is passed, no repacking will be performed. """ if not isinstance(b, CUDAForm): raise TypeError(f"Expected CUDAForm, got '{type(b)}'") # For now always re-copy to device on assembly # This assumes something has changed on the host b.to_device() self.pack_coefficients(b, coeffs) return _cucpp.fem.assemble_scalar_on_device(self._ctx, self._cpp_object, b.cuda_form, b.cuda_mesh)
[docs] def create_matrix(self, a: CUDAForm) -> CUDAMatrix: """Create a CUDAMatrix from a given form.""" if not isinstance(a, CUDAForm): raise TypeError(f"Expected CUDAForm, got type '{type(a)}'.") petsc_mat = _cucpp.fem.petsc.create_cuda_matrix(a.dolfinx_form._cpp_object) return CUDAMatrix(self._ctx, petsc_mat)
[docs] def create_matrix_block(self, a: BlockCUDAForm) -> CUDAMatrix: """Create a block matrix from a block form.""" if not isinstance(a, BlockCUDAForm): raise TypeError(f"Expected BlockCUDAForm, got type '{type(a)}'") _cpp_forms = [[cuda_form.cuda_form for cuda_form in row] for row in a.forms] petsc_mat = _cucpp.fem.petsc.create_cuda_matrix_block(_cpp_forms) return CUDAMatrix(self._ctx, petsc_mat)
[docs] def create_vector(self, b: CUDAForm) -> CUDAVector: """Create a CUDAVector from a given form.""" if not isinstance(b, CUDAForm): raise TypeError(f"Expected CUDAForm, got type '{type(b)}'.") petsc_vec = create_petsc_cuda_vector(b.dolfinx_form) return CUDAVector(self._ctx, petsc_vec)
[docs] def create_vector_block(self, b: BlockCUDAForm) -> CUDAVector: """Create a CUDAVector from a given form.""" if not isinstance(b, BlockCUDAForm): raise TypeError(f"Expected BlockCUDAForm, got type '{type(b)}')") petsc_vec = PETSc.Vec().createCUDAWithArrays( cpuarray=np.zeros(b.local_size), size=(b.local_size, b.global_size) ) return CUDAVector(self._ctx, petsc_vec)
[docs] def pack_bcs(self, bcs: list[DirichletBC]) -> CUDADirichletBC: """Pack boundary conditions into a single object for use in assembly. The returned object is of type CUDADirichletBC and can be used in place of a list of regular DirichletBCs. This is more efficient when performing multiple operations with the same list of boundary conditions, or when boundary condition values need to change over time. """ return CUDADirichletBC(self._ctx, bcs)
[docs] def pack_coefficients(self, a: CUDAForm, coefficients: list[Function] | None=None): """Pack coefficients on device.""" if not isinstance(a, CUDAForm): raise TypeError(f"Expected CUDAForm, got type '{type(a)}'.") if coefficients is None: _cucpp.fem.pack_coefficients(self._ctx, self._cpp_object, a.cuda_form) else: # perform a repacking with only the indicated coefficients _coefficients = [c._cpp_object for c in coefficients] _cucpp.fem.pack_coefficients(self._ctx, self._cpp_object, a.cuda_form, _coefficients)
[docs] def apply_lifting(self, b: CUDAVector, a: list[CUDAForm], bcs: list[list[DirichletBC] | CUDADirichletBC], x0: list[CUDAVector] | None = None, scale: float = 1.0, coeffs: list[list[Function]] | None = None ): """GPU equivalent of apply_lifting. Args: b: CUDAVector to modify a: list of forms to lift bcs: list of boundary condition lists x0: optional list of shift vectors for lifting scale: scale of lifting coeffs: coefficients to (re-)pack """ if len(a) != len(bcs): raise ValueError("Lengths of forms and bcs must match!") if x0 is not None and len(x0) != len(a): raise ValueError("Lengths of forms and x0 must match!") _x0 = [] if x0 is None else [x._cpp_object for x in x0] bc_collections = [] for bc_collection in bcs: if type(bc_collection) is list: bc_collections.append(self.pack_bcs(bc_collection)) elif type(bc_collection) is CUDADirichletBC: bc_collections.append(bc_collection) else: raise TypeError( f"Expected either a list of DirichletBC's or a CUDADirichletBC," f" got '{type(bc_collection)}'" ) if coeffs is None: coeffs = [None] * len(a) elif not len(coeffs): coeffs = [[] for form in a] cuda_forms = [] cuda_mesh = None _bcs = [] for form, bc_collection, form_coeffs in zip(a, bc_collections, coeffs): self.pack_coefficients(form, form_coeffs) cuda_forms.append(form.cuda_form) if cuda_mesh is None: cuda_mesh = form.cuda_mesh _bcs.append(bc_collection._get_cpp_bcs(form.function_spaces[1])) _cucpp.fem.apply_lifting_on_device( self._ctx, self._cpp_object, cuda_forms, cuda_mesh, b._cpp_object, _bcs, _x0, scale )
[docs] def apply_lifting_block(self, b: CUDAVector, a: BlockCUDAForm, bcs: list[DirichletBC] | typing.Any, x0: CUDAVector | None = None, alpha: float = 1.0, set_bcs=True ): """Apply lifting with a single block form.""" _x0 = [] if x0 is None else x0._cpp_object if type(bcs) is list: block_bc = a.make_block_bc(bcs)[0] elif type(bcs) in (_cucpp.fem.CUDADirichletBC_float32, _cucpp.fem.CUDADirichletBC_float64): block_bc = bcs else: raise TypeError( f"Expected either a list of DirichletBCs or" f" a _cpp.fem_CUDADirichletBC_float64/32, got '{type(bcs)}'" ) cuda_mesh = None for row in a.forms: _bcs = [] _x0_list = [] _cuda_forms = [] for form in row: self.pack_coefficients(form) if cuda_mesh is None: cuda_mesh = form.cuda_mesh _cuda_forms.append(form.cuda_form) _bcs.append(block_bc) if x0 is not None: _x0_list.append(_x0) _cucpp.fem.apply_lifting_on_device( self._ctx, self._cpp_object, _cuda_forms, cuda_mesh, b._cpp_object, _bcs, _x0_list, alpha ) if x0 is not None: _cucpp.fem.set_bc_on_device(self._ctx, self._cpp_object, b._cpp_object, block_bc, _x0, alpha) else: _cucpp.fem.set_bc_on_device(self._ctx, self._cpp_object, b._cpp_object, block_bc, alpha)
[docs] def set_bc(self, b: CUDAVector, bcs: list[DirichletBC] | CUDADirichletBC, V: FunctionSpace, x0: CUDAVector | None = None, scale: float = 1.0, ): """Set boundary conditions on device. Args: b: vector to modify bcs: collection of bcs V: FunctionSpace to which bcs apply x0: optional shift vector scale: scaling factor """ if type(bcs) is list: bc_collection = self.pack_bcs(bcs) elif type(bcs) is CUDADirichletBC: bc_collection = bcs else: raise TypeError( f"Expected either a list of DirichletBC's or a CUDADirichletBC, got '{type(bcs)}'" ) if hasattr(V, '_cpp_object'): _cppV = V._cpp_object else: _cppV = V _bcs = bc_collection._get_cpp_bcs(_cppV) if x0 is None: _cucpp.fem.set_bc_on_device( self._ctx, self._cpp_object, b._cpp_object, _bcs, scale ) else: _cucpp.fem.set_bc_on_device( self._ctx, self._cpp_object, b._cpp_object, _bcs, x0._cpp_object, scale )