mesh_cart_reorder_s.f90 Source File


Source Code

submodule(mesh_cart_m) mesh_cart_reorder_s
    use screen_io_m, only : get_stderr, get_stdout

    implicit none

    !! Module implementing index-reorder functionality for grids.
    !!! 2020-03-10 Chichi Lalescu:
    !!! In this module functionality related to the `mesh%reorder` method is put together.
    !!! The basic pieces are the functions for computing the 2D Morton z-index, i.e. functions for
    !!! bit interleaving and appropriate bit masks.
    !!! On top of that the method `Morton_2D_shuffle_block` adds the ability to assign Morton z-indices
    !!! to square blocks rather than single grid nodes, which should provide much better options of
    !!! cache-access optimization.
    !!!
    !!! 2020-01-10 Chichi Lalescu:
    !!! In this module I'm putting together functionality related to the `mesh%reorder` method.
    !!! While I am starting with a Morton z-index, I believe in the future we will need to do something
    !!! slightly smarter, hence I think it's best to have this space dedicated to that.

contains

    module subroutine reorder(self, grid_reorder_block, dbgout)
        class(mesh_cart_t), intent(inout) :: self
        integer, intent(in) :: grid_reorder_block
        !! Size of regularly ordered grid blocks
        integer, intent(in), optional :: dbgout

        integer :: l, in, jn, ki, kb, kg, outi
        integer, dimension(self%n_points) :: index_original, direct_index_permutation, &
                                             wrki, wrkj, wrkpinfo, wrkdistrict

        ! set output level --------------------------------------------------------------------------------------------
        outi = 0
        if (present(dbgout)) then
            if (is_master()) then
                outi = dbgout
            endif
            if (dbgout >= 3) then
                outi = dbgout ! every rank writes
            endif
        endif
        if (outi >= 1) then
            write(get_stdout(), *) &
                '-------------------------------------------------------'
            write(get_stdout(), *) 'Reordering mesh'
            write(get_stdout(), 403) grid_reorder_block
            403 FORMAT(3X,'grid_reorder_block   = ',I8)
        endif

        ! Generate permutation map ----------------------------------------------------------------
        do  l = 1, self%n_points
            index_original(l) = l
        enddo

        if (grid_reorder_block > 0) then
            if (outi >= 1) &
                write(get_stdout(), *) '...Morton Z index permutation...'
            call Morton_2D_shuffle_block(index_original, self%cart_i, self%cart_j, grid_reorder_block, direct_index_permutation)
        elseif (grid_reorder_block == 0) then
            if (outi >= 1) &
                write(get_stdout(), *) &
                    '...permutation for consecutive ordering &
                    &(inner, boundary, ghost)...'
            call permutation_consecutive(self, direct_index_permutation)
        elseif (grid_reorder_block == -1) then
                if (outi >= 1) &
                    write(get_stdout(), *) &
                        '...permutation for redblack ordering &
                        &(red, black, border)...'
                call permutation_redblack(self, direct_index_permutation)
        else
            if (outi >= 1) &
                write(get_stdout(), *) &
                    '...identity permutation (no reordering)...'
            direct_index_permutation = index_original
        endif


        ! Permute cartesian indices, pinfo, district
        !$OMP PARALLEL PRIVATE(l, in, jn)
        !$OMP DO
        do l = 1, self%n_points
            wrki(l) = self%cart_i(direct_index_permutation(l))
            wrkj(l) = self%cart_j(direct_index_permutation(l))
            wrkpinfo(l) = self%pinfo(direct_index_permutation(l))
            wrkdistrict(l) = self%district(direct_index_permutation(l))
        enddo
        !$OMP END DO
        !$OMP DO
        do l = 1, self%n_points
            self%cart_i(l) = wrki(l)
            self%cart_j(l) = wrkj(l)
            self%pinfo(l)  = wrkpinfo(l)
            self%district(l) = wrkdistrict(l)
        enddo
        !$OMP END DO
        !$OMP END PARALLEL
        if (outi >= 1) write(get_stdout(), *) '...done...'

        if (allocated(self%cart_shaped_l)) then
            if (outi >= 1) then
                write(get_stdout(), *) &
                    '...rebuilding shaped cartesian arrays after reordering...'
            endif
            call self%build_shaped_cart_arrays()
        endif

        ! refill inner_indices, boundary_indices, ghost_indices
        ki = 0
        kb = 0
        kg = 0
        do l = 1, self%n_points
            if (self%pinfo(l) == PINFO_INNER) then
                ki = ki+1
                if (ki > self%n_points_inner) then
                    write(get_stderr(), *) &
                        'error(mesh_cart%reorder):: ki > n_points_inner'
                    error stop
                endif
                self%inner_indices(ki) = l
            elseif (self%pinfo(l) == PINFO_BOUNDARY) then
                kb = kb+1
                if (kb > self%n_points_boundary) then
                    write(get_stderr(), *) &
                        'error(mesh_cart%reorder):: kb > n_points_boundary'
                    error stop
                endif
                self%boundary_indices(kb) = l
            elseif (self%pinfo(l) == PINFO_GHOST) then
                kg = kg+1
                if (kg > self%n_points_ghost) then
                    write(get_stderr(), *) &
                        'error(mesh_cart%reorder):: kg > n_points_ghost'
                    error stop
                endif
                self%ghost_indices(kg) = l
            else
                write(get_stderr(), *) &
                    'error(mesh_cart%reorder):: pinfo not valid', self%pinfo(l)
                error stop
            endif
        enddo

        if ( (ki /= self%n_points_inner)    .or. &
             (kb /= self%n_points_boundary) .or. &
             (kg /= self%n_points_ghost) ) then
            write(get_stderr(), *) &
                'error(mesh_cart%reorder):: &
                ki, kb or kg inconsistent after reordering', ki, kb, kg
            error stop
        endif

        ! Rebuild connectivity
        call self%build_connectivity(self%size_neighbor)

        ! Rebuild redblack_indices
        deallocate(self%redblack_indices)
        call self%build_redblack()

        if (outi >= 1) then
            write(get_stdout(), *) 'Reordering complete'
            write(get_stdout(), *) &
                '-------------------------------------------------------'
        endif

    end subroutine

    subroutine permutation_consecutive(mesh, direct_index_permutation)
        !! Returns permutation of mesh shuffling with indices according to consecutive order (inner, boundary, ghost)
        type(mesh_cart_t), intent(in) :: mesh
        !! Mesh
        integer, dimension(mesh%get_n_points()), intent(out) :: direct_index_permutation
        !! Permutation indices

        integer :: n, ki, kb, kg, l

        n = 0
        do ki = 1, mesh%get_n_points_inner()
            l = mesh%inner_indices(ki)
            n = n+1
            direct_index_permutation(n) = l
        enddo
        do kb = 1, mesh%get_n_points_boundary()
            l = mesh%boundary_indices(kb)
            n = n+1
            direct_index_permutation(n) = l
        enddo
        do kg = 1, mesh%get_n_points_ghost()
            l = mesh%ghost_indices(kg)
            n = n+1
            direct_index_permutation(n) = l
        enddo

    end subroutine

    subroutine permutation_redblack(mesh, direct_index_permutation)
        !! Returns permutation of mesh shuffling with indices according to red-black order (red, black, border)
        type(mesh_cart_t), intent(in) :: mesh
        !! Mesh
        integer, dimension(mesh%get_n_points()), intent(out) :: direct_index_permutation
        !! Permutation indices

        integer :: k
        do k = 1, mesh%get_n_points()
            direct_index_permutation(k) = mesh%redblack_indices(k)
        enddo
    end subroutine

    recursive subroutine quick_sort_indices(array_key, array_size, array_indices)
        !! Given two arrays, `key` and `indices`, sort both of them according to `key`.
        !! taken from https://rosettacode.org/wiki/Sorting_algorithms/Quicksort#Fortran
        integer, intent(in) :: array_size
        !! Size of array
        integer, dimension(array_size), intent(inout) :: array_key
        !! Array holding sort key, expected to hold Morton z-index values, or more generally large integers
        integer, dimension(array_size), intent(inout) :: array_indices
        !! Array holding actual indices (i.e. integers between 1 and mesh%get_n_points())

        ! LOCAL VARIABLES
        integer :: left, right
        real :: random
        integer :: pivot
        integer :: temp_key, temp_index
        integer :: marker

        if (array_size > 1) then

            call random_number(random)
            pivot = array_key(int(random*real(array_size-1))+1) ! random pivot (not best performance, but avoids worst-case)
            left = 0
            right = array_size + 1

            do while (left < right)
                right = right - 1
                do while (array_key(right) > pivot)
                    right = right - 1
                end do
                left = left + 1
                do while (array_key(left) < pivot)
                    left = left + 1
                end do
                if (left < right) then
                    temp_key = array_key(left)
                    array_key(left) = array_key(right)
                    array_key(right) = temp_key
                    temp_index = array_indices(left)
                    array_indices(left) = array_indices(right)
                    array_indices(right) = temp_index
                end if
            end do

            if (left == right) then
                marker = left + 1
            else
                marker = left
            end if

            call quick_sort_indices(array_key(:marker-1),marker-1, array_indices(:marker-1))
            call quick_sort_indices(array_key(marker:),array_size-marker+1, array_indices(marker:))

        end if
    end subroutine quick_sort_indices

    elemental integer function part1by1(x)
        !! Given an integer x, apply the following binary-representation-based transform:
        !! consider the list of its binary digits d_i
        !! make a new list of binary digits b_i such that
        !! b_{2i}   = d_i
        !! b_{2i+1} = 0
        !! convert the new list of binary digits b_i to an integer n, which
        !! is the output of this subroutine.
        !! As clear from the code, this is accomplished in practice with
        !! bitwise operations against magic masks. When reading up about this,
        !! I saw claims that the operation can be optimized, but I thought we'd
        !! start with something simple that works and optimize later if needed.
        integer, intent(in)  :: x
        !! Number(s) whose binary digits will be interleaved with 0

        integer :: n
        !! temporary integer

        n = iand(x, z'0000ffff')
        n = iand(ior(n, ishft(n, 8)), z'00ff00ff')
        n = iand(ior(n, ishft(n, 4)), z'0f0f0f0f')
        n = iand(ior(n, ishft(n, 2)), z'33333333')
        part1by1 = iand(ior(n, ishft(n, 1)), z'55555555')
    end function

    elemental integer function interleave2(ix, iy)
        !! Given two integers `ix` and `iy`, interleave their binary
        !! representations to obtain a single number `z` (this is
        !! the Morton z-index).
        !! This is achieved by first interleaving both binary representations
        !! with many 0 digits (see `part1by1`, then shifting one of them by one
        !! bit, and finally merging the results into a single binary sequence.
        integer, intent(in)  :: ix, iy
        !! Number(s) whose digits must be interleaved --- pairs are meant to be
        !! nodes of the 2D grid

        !! local variables
        integer :: tix, tiy

        tix = part1by1(ix)
        tiy = part1by1(iy)

        interleave2 = ior(tix, ishft(tiy, 1))
    end function

    subroutine Morton_2D_shuffle(a, ix, iy, a2)
        !! Given an array a(l) defined at locations (x(l), y(l)), shuffle it
        !! according to the Morton z-index.
        !! I.e. compute the Morton z-indices associated to the pairs
        !! (x(l), y(l)), and sort the array according to increasing z index.
        !!
        !! For details on Morton z-indexing, see
        !! https://en.wikipedia.org/wiki/Z-order_curve
        !! https://www.forceflow.be/2013/10/07/morton-encodingdecoding-through-bit-interleaving-implementations/
        integer, intent(in)  :: a(:)
        !! values to shuffle
        integer, intent(in)  :: ix(:), iy(:)
        !! coordinates in plane
        integer, intent(out) :: a2(:)
        !! shuffled array

        !! local variables
        integer :: i
        integer, allocatable, dimension(:) :: z
        integer, allocatable, dimension(:) :: z_argsort

        allocate(z(size(ix)))
        allocate(z_argsort(size(ix)))

        !! initialize indices
        !! this is a linear operation in size(a), thread-safe, it should be parallelized
        do i = 1, size(z)
            z_argsort(i) = i
        end do

        !! compute Morton z-index
        !! this is a linear operation in size(a), thread-safe, it should be parallelized
        z = interleave2(ix, iy)

        !! sort in increasing z-index
        !! look up parallelization options
        call quick_sort_indices(z, size(z), z_argsort)

        !! copy data in shuffled order
        !! linear operation in size(a), it should be parallelized
        !! even though different threads would write at arbitrary locations
        !! within `a2`, they will never write to the same location, so it's
        !! already thread-safe.
        do i = 1, size(a)
            a2(i) = a(z_argsort(i))
        end do

        deallocate(z_argsort)
        deallocate(z)
    end subroutine Morton_2D_shuffle

    subroutine Morton_2D_shuffle_block(a, ix, iy, base_block_size, a2)
        !! Given an array a(l) defined at locations (x(l), y(l)), shuffle it
        !! according to the Morton z-index.
        !! I.e. compute the Morton z-indices associated to the pairs
        !! (x(l), y(l)), and sort the array according to increasing z index.
        !!
        !! For details on Morton z-indexing, see
        !! https://en.wikipedia.org/wiki/Z-order_curve
        !! https://www.forceflow.be/2013/10/07/morton-encodingdecoding-through-bit-interleaving-implementations/
        integer, intent(in)  :: a(:)
        !! values to shuffle
        integer, intent(in)  :: ix(:), iy(:)
        !! coordinates in plane
        integer, intent(in)  :: base_block_size
        !! blocks will be squares of adjacent grid points, with `base_block_size` points on the side.
        integer, intent(out) :: a2(:)
        !! shuffled array

        !! local variables
        integer :: i
        integer, allocatable, dimension(:) :: z
        integer, allocatable, dimension(:) :: z_argsort

        allocate(z(size(ix)))
        allocate(z_argsort(size(ix)))

        !! initialize indices
        !$OMP PARALLEL PRIVATE(i)
        !$OMP DO
        do i = 1, size(z)
            z_argsort(i) = i

            !! compute Morton z-index for blocks of side `base_block_size`
            !! points within the same z block will have the same z-index
            z(i) = interleave2(ix(i) / base_block_size, iy(i) / base_block_size)

            !! enforce regular ordering within blocks
            !! compute regularly ordered linear index within block, and then compute
            !! a corresponding location in a (block-x-index, block-y-index, z-index-of-block) fortran ordering
            z(i) = z(i)*base_block_size**2 + (mod(iy(i), base_block_size)*base_block_size + mod(ix(i), base_block_size))
        end do
        !$OMP END DO
        !$OMP END PARALLEL

        !! sort in increasing z-index
        !! look up parallelization options
        call quick_sort_indices(z, size(z), z_argsort)

        !! copy data in shuffled order
        !! linear operation in size(a), it should be parallelized
        !! even though different threads would write at arbitrary locations
        !! within `a2`, they will never write to the same location, so it's
        !! already thread-safe.
        !$OMP PARALLEL PRIVATE(i)
        !$OMP DO
        do i = 1, size(a)
            a2(i) = a(z_argsort(i))
        end do
        !$OMP END DO
        !$OMP END PARALLEL

        deallocate(z_argsort)
        deallocate(z)
    end subroutine Morton_2D_shuffle_block

end submodule