submodule(mesh_cart_m) mesh_cart_reorder_s use screen_io_m, only : get_stderr, get_stdout implicit none !! Module implementing index-reorder functionality for grids. !!! 2020-03-10 Chichi Lalescu: !!! In this module functionality related to the `mesh%reorder` method is put together. !!! The basic pieces are the functions for computing the 2D Morton z-index, i.e. functions for !!! bit interleaving and appropriate bit masks. !!! On top of that the method `Morton_2D_shuffle_block` adds the ability to assign Morton z-indices !!! to square blocks rather than single grid nodes, which should provide much better options of !!! cache-access optimization. !!! !!! 2020-01-10 Chichi Lalescu: !!! In this module I'm putting together functionality related to the `mesh%reorder` method. !!! While I am starting with a Morton z-index, I believe in the future we will need to do something !!! slightly smarter, hence I think it's best to have this space dedicated to that. contains module subroutine reorder(self, grid_reorder_block, dbgout) class(mesh_cart_t), intent(inout) :: self integer, intent(in) :: grid_reorder_block !! Size of regularly ordered grid blocks integer, intent(in), optional :: dbgout integer :: l, in, jn, ki, kb, kg, outi integer, dimension(self%n_points) :: index_original, direct_index_permutation, & wrki, wrkj, wrkpinfo, wrkdistrict ! set output level -------------------------------------------------------------------------------------------- outi = 0 if (present(dbgout)) then if (is_master()) then outi = dbgout endif if (dbgout >= 3) then outi = dbgout ! every rank writes endif endif if (outi >= 1) then write(get_stdout(), *) & '-------------------------------------------------------' write(get_stdout(), *) 'Reordering mesh' write(get_stdout(), 403) grid_reorder_block 403 FORMAT(3X,'grid_reorder_block = ',I8) endif ! Generate permutation map ---------------------------------------------------------------- do l = 1, self%n_points index_original(l) = l enddo if (grid_reorder_block > 0) then if (outi >= 1) & write(get_stdout(), *) '...Morton Z index permutation...' call Morton_2D_shuffle_block(index_original, self%cart_i, self%cart_j, grid_reorder_block, direct_index_permutation) elseif (grid_reorder_block == 0) then if (outi >= 1) & write(get_stdout(), *) & '...permutation for consecutive ordering & &(inner, boundary, ghost)...' call permutation_consecutive(self, direct_index_permutation) elseif (grid_reorder_block == -1) then if (outi >= 1) & write(get_stdout(), *) & '...permutation for redblack ordering & &(red, black, border)...' call permutation_redblack(self, direct_index_permutation) else if (outi >= 1) & write(get_stdout(), *) & '...identity permutation (no reordering)...' direct_index_permutation = index_original endif ! Permute cartesian indices, pinfo, district !$OMP PARALLEL PRIVATE(l, in, jn) !$OMP DO do l = 1, self%n_points wrki(l) = self%cart_i(direct_index_permutation(l)) wrkj(l) = self%cart_j(direct_index_permutation(l)) wrkpinfo(l) = self%pinfo(direct_index_permutation(l)) wrkdistrict(l) = self%district(direct_index_permutation(l)) enddo !$OMP END DO !$OMP DO do l = 1, self%n_points self%cart_i(l) = wrki(l) self%cart_j(l) = wrkj(l) self%pinfo(l) = wrkpinfo(l) self%district(l) = wrkdistrict(l) enddo !$OMP END DO !$OMP END PARALLEL if (outi >= 1) write(get_stdout(), *) '...done...' if (allocated(self%cart_shaped_l)) then if (outi >= 1) then write(get_stdout(), *) & '...rebuilding shaped cartesian arrays after reordering...' endif call self%build_shaped_cart_arrays() endif ! refill inner_indices, boundary_indices, ghost_indices ki = 0 kb = 0 kg = 0 do l = 1, self%n_points if (self%pinfo(l) == PINFO_INNER) then ki = ki+1 if (ki > self%n_points_inner) then write(get_stderr(), *) & 'error(mesh_cart%reorder):: ki > n_points_inner' error stop endif self%inner_indices(ki) = l elseif (self%pinfo(l) == PINFO_BOUNDARY) then kb = kb+1 if (kb > self%n_points_boundary) then write(get_stderr(), *) & 'error(mesh_cart%reorder):: kb > n_points_boundary' error stop endif self%boundary_indices(kb) = l elseif (self%pinfo(l) == PINFO_GHOST) then kg = kg+1 if (kg > self%n_points_ghost) then write(get_stderr(), *) & 'error(mesh_cart%reorder):: kg > n_points_ghost' error stop endif self%ghost_indices(kg) = l else write(get_stderr(), *) & 'error(mesh_cart%reorder):: pinfo not valid', self%pinfo(l) error stop endif enddo if ( (ki /= self%n_points_inner) .or. & (kb /= self%n_points_boundary) .or. & (kg /= self%n_points_ghost) ) then write(get_stderr(), *) & 'error(mesh_cart%reorder):: & ki, kb or kg inconsistent after reordering', ki, kb, kg error stop endif ! Rebuild connectivity call self%build_connectivity(self%size_neighbor) ! Rebuild redblack_indices deallocate(self%redblack_indices) call self%build_redblack() if (outi >= 1) then write(get_stdout(), *) 'Reordering complete' write(get_stdout(), *) & '-------------------------------------------------------' endif end subroutine subroutine permutation_consecutive(mesh, direct_index_permutation) !! Returns permutation of mesh shuffling with indices according to consecutive order (inner, boundary, ghost) type(mesh_cart_t), intent(in) :: mesh !! Mesh integer, dimension(mesh%get_n_points()), intent(out) :: direct_index_permutation !! Permutation indices integer :: n, ki, kb, kg, l n = 0 do ki = 1, mesh%get_n_points_inner() l = mesh%inner_indices(ki) n = n+1 direct_index_permutation(n) = l enddo do kb = 1, mesh%get_n_points_boundary() l = mesh%boundary_indices(kb) n = n+1 direct_index_permutation(n) = l enddo do kg = 1, mesh%get_n_points_ghost() l = mesh%ghost_indices(kg) n = n+1 direct_index_permutation(n) = l enddo end subroutine subroutine permutation_redblack(mesh, direct_index_permutation) !! Returns permutation of mesh shuffling with indices according to red-black order (red, black, border) type(mesh_cart_t), intent(in) :: mesh !! Mesh integer, dimension(mesh%get_n_points()), intent(out) :: direct_index_permutation !! Permutation indices integer :: k do k = 1, mesh%get_n_points() direct_index_permutation(k) = mesh%redblack_indices(k) enddo end subroutine recursive subroutine quick_sort_indices(array_key, array_size, array_indices) !! Given two arrays, `key` and `indices`, sort both of them according to `key`. !! taken from https://rosettacode.org/wiki/Sorting_algorithms/Quicksort#Fortran integer, intent(in) :: array_size !! Size of array integer, dimension(array_size), intent(inout) :: array_key !! Array holding sort key, expected to hold Morton z-index values, or more generally large integers integer, dimension(array_size), intent(inout) :: array_indices !! Array holding actual indices (i.e. integers between 1 and mesh%get_n_points()) ! LOCAL VARIABLES integer :: left, right real :: random integer :: pivot integer :: temp_key, temp_index integer :: marker if (array_size > 1) then call random_number(random) pivot = array_key(int(random*real(array_size-1))+1) ! random pivot (not best performance, but avoids worst-case) left = 0 right = array_size + 1 do while (left < right) right = right - 1 do while (array_key(right) > pivot) right = right - 1 end do left = left + 1 do while (array_key(left) < pivot) left = left + 1 end do if (left < right) then temp_key = array_key(left) array_key(left) = array_key(right) array_key(right) = temp_key temp_index = array_indices(left) array_indices(left) = array_indices(right) array_indices(right) = temp_index end if end do if (left == right) then marker = left + 1 else marker = left end if call quick_sort_indices(array_key(:marker-1),marker-1, array_indices(:marker-1)) call quick_sort_indices(array_key(marker:),array_size-marker+1, array_indices(marker:)) end if end subroutine quick_sort_indices elemental integer function part1by1(x) !! Given an integer x, apply the following binary-representation-based transform: !! consider the list of its binary digits d_i !! make a new list of binary digits b_i such that !! b_{2i} = d_i !! b_{2i+1} = 0 !! convert the new list of binary digits b_i to an integer n, which !! is the output of this subroutine. !! As clear from the code, this is accomplished in practice with !! bitwise operations against magic masks. When reading up about this, !! I saw claims that the operation can be optimized, but I thought we'd !! start with something simple that works and optimize later if needed. integer, intent(in) :: x !! Number(s) whose binary digits will be interleaved with 0 integer :: n !! temporary integer n = iand(x, z'0000ffff') n = iand(ior(n, ishft(n, 8)), z'00ff00ff') n = iand(ior(n, ishft(n, 4)), z'0f0f0f0f') n = iand(ior(n, ishft(n, 2)), z'33333333') part1by1 = iand(ior(n, ishft(n, 1)), z'55555555') end function elemental integer function interleave2(ix, iy) !! Given two integers `ix` and `iy`, interleave their binary !! representations to obtain a single number `z` (this is !! the Morton z-index). !! This is achieved by first interleaving both binary representations !! with many 0 digits (see `part1by1`, then shifting one of them by one !! bit, and finally merging the results into a single binary sequence. integer, intent(in) :: ix, iy !! Number(s) whose digits must be interleaved --- pairs are meant to be !! nodes of the 2D grid !! local variables integer :: tix, tiy tix = part1by1(ix) tiy = part1by1(iy) interleave2 = ior(tix, ishft(tiy, 1)) end function subroutine Morton_2D_shuffle(a, ix, iy, a2) !! Given an array a(l) defined at locations (x(l), y(l)), shuffle it !! according to the Morton z-index. !! I.e. compute the Morton z-indices associated to the pairs !! (x(l), y(l)), and sort the array according to increasing z index. !! !! For details on Morton z-indexing, see !! https://en.wikipedia.org/wiki/Z-order_curve !! https://www.forceflow.be/2013/10/07/morton-encodingdecoding-through-bit-interleaving-implementations/ integer, intent(in) :: a(:) !! values to shuffle integer, intent(in) :: ix(:), iy(:) !! coordinates in plane integer, intent(out) :: a2(:) !! shuffled array !! local variables integer :: i integer, allocatable, dimension(:) :: z integer, allocatable, dimension(:) :: z_argsort allocate(z(size(ix))) allocate(z_argsort(size(ix))) !! initialize indices !! this is a linear operation in size(a), thread-safe, it should be parallelized do i = 1, size(z) z_argsort(i) = i end do !! compute Morton z-index !! this is a linear operation in size(a), thread-safe, it should be parallelized z = interleave2(ix, iy) !! sort in increasing z-index !! look up parallelization options call quick_sort_indices(z, size(z), z_argsort) !! copy data in shuffled order !! linear operation in size(a), it should be parallelized !! even though different threads would write at arbitrary locations !! within `a2`, they will never write to the same location, so it's !! already thread-safe. do i = 1, size(a) a2(i) = a(z_argsort(i)) end do deallocate(z_argsort) deallocate(z) end subroutine Morton_2D_shuffle subroutine Morton_2D_shuffle_block(a, ix, iy, base_block_size, a2) !! Given an array a(l) defined at locations (x(l), y(l)), shuffle it !! according to the Morton z-index. !! I.e. compute the Morton z-indices associated to the pairs !! (x(l), y(l)), and sort the array according to increasing z index. !! !! For details on Morton z-indexing, see !! https://en.wikipedia.org/wiki/Z-order_curve !! https://www.forceflow.be/2013/10/07/morton-encodingdecoding-through-bit-interleaving-implementations/ integer, intent(in) :: a(:) !! values to shuffle integer, intent(in) :: ix(:), iy(:) !! coordinates in plane integer, intent(in) :: base_block_size !! blocks will be squares of adjacent grid points, with `base_block_size` points on the side. integer, intent(out) :: a2(:) !! shuffled array !! local variables integer :: i integer, allocatable, dimension(:) :: z integer, allocatable, dimension(:) :: z_argsort allocate(z(size(ix))) allocate(z_argsort(size(ix))) !! initialize indices !$OMP PARALLEL PRIVATE(i) !$OMP DO do i = 1, size(z) z_argsort(i) = i !! compute Morton z-index for blocks of side `base_block_size` !! points within the same z block will have the same z-index z(i) = interleave2(ix(i) / base_block_size, iy(i) / base_block_size) !! enforce regular ordering within blocks !! compute regularly ordered linear index within block, and then compute !! a corresponding location in a (block-x-index, block-y-index, z-index-of-block) fortran ordering z(i) = z(i)*base_block_size**2 + (mod(iy(i), base_block_size)*base_block_size + mod(ix(i), base_block_size)) end do !$OMP END DO !$OMP END PARALLEL !! sort in increasing z-index !! look up parallelization options call quick_sort_indices(z, size(z), z_argsort) !! copy data in shuffled order !! linear operation in size(a), it should be parallelized !! even though different threads would write at arbitrary locations !! within `a2`, they will never write to the same location, so it's !! already thread-safe. !$OMP PARALLEL PRIVATE(i) !$OMP DO do i = 1, size(a) a2(i) = a(z_argsort(i)) end do !$OMP END DO !$OMP END PARALLEL deallocate(z_argsort) deallocate(z) end subroutine Morton_2D_shuffle_block end submodule