核心任务描述
给定 个三维空间内的点,每个点坐标 均为非负整数。同时确保每个点坐标唯一,且任意两个点之间的欧氏距离互不相等。试计算:
- 所有点对之间的前 短距离,并输出将其作为边连接后,前三大连通分量的大小之积;
- 按照边的添加顺序,输出在所有点被连接之前,最后一次添加边时,该边所连接的两个点的 坐标之积。
思考过程
拆解问题
这个题目的需求比较复杂,我们首先需要将其拆解为几个子问题。对于第一问,我们需要:
- 找出所有点对之间的欧氏距离中前 短距离;
- 使用这些距离作为边,构建图并找到连通分量;
- 计算前三大连通分量的大小之积。
对于第二问,我们需要:
- 按照边的添加顺序,逐步连接点;
- 在所有点被连接之前,记录最后一次添加边时所连接的两个点的 坐标之积。
不难发现,第一问中我们可以用并查集来管理连通分量,而第二问中我们也同样需要使用并查集来检查点的连接状态。这并不是什么难事,因此最大的挑战均在于每个问题的第一步:如何高效地找到所有点对之间的前 短距离或者将点按距离顺序连接起来。
并行优化
对于找寻前 短距离以及前 大连通分量,我们可以用最大/最小堆来辅助计算。当然,最耗时的步骤是计算所有点对之间的距离。为此,我们可以使用一些简单的并行优化手段:
- 替代欧氏距离:由于我们只需要比较距离的大小关系,而不需要具体的距离值,我们可以使用欧氏距离的平方来代替欧氏距离本身进行比较。这样我们只需要使用整数的乘法和加法,避免了浮点数的开方运算,从而提升计算速度。
- 避免重复计算:由于欧氏距离是对称的,我们只需要计算每对点一次,避免重复计算。我们可以只计算 的点对 。
- 并行处理:利用并行计算库,可以将所有需要计算的任务拆分为 份,每一份由一个线程处理,最后将结果合并。这可以显著减少计算时间,尤其是在多核处理器上。
在并行计算中,
并行归约
顶层节点为需要归约的输入数据,每一层中将相邻的两个节点进行归约操作,最终得到一个输出结果
并行归约
顶层节点为需要归约的输入数据,每一层中将相邻的两个节点进行归约操作,最终得到一个输出结果
如果是
这样的话我们就已经能够解决第一问了:
let mut dsu = DisjointSet::new(self.nodes.nrows());(0..self.nodes.nrows()) .into_par_iter() .flat_map_iter(|i| {
((i + 1)..self.nodes.nrows()) .map(|j| (i, j)) .collect::<Vec<_>>() })
.fold(BinaryHeap::new, |mut heap, (i, j)| { let d = self.dist(i, j); heap.push((d, i, j)); if heap.len() > self.max_steps { heap.pop(); } heap })
.reduce(BinaryHeap::new, |mut acc, mut heap| { acc.extend(heap.drain()); while acc.len() > self.max_steps { acc.pop(); } acc }) .into_iter() .for_each(|(_, i, j)| dsu.union(i, j));dsu.sizes .values()
.fold(BinaryHeap::new(), |mut heap, &size| { heap.push(Reverse(size)); if heap.len() > 3 { heap.pop(); } heap }) .iter() .map(|&Reverse(x)| x) .product::<u64>() .to_string()在第二问中,直接计算出所有边并排序显然是不现实的,因为我们需要处理的边数将是 级别的。为此,我们需要一些优化来减少计算。不难发现,由于我们只关心最后一条将所有点连成一个连通分量的边,因此我们实际上并不需要关心那些不会连接不同连通分量的边。基于这一点,我们可以使用如下策略:
- 基于点的最短边集:对于每个点,我们只计算其与属于不同连通分量的点之间的最短边,并将这些边加入候选边集中。这样可以大幅减少需要计算的边数。
- 动态更新边集:在每次从边集中取走最短边 后,我们需要重新计算从 出发,连接到其他连通分量的最短边,并将这条边加入边集中。这样可以确保尽在需要时更新边集,避免不必要的计算。
- 循环直到所有点连通:重复上述过程,直到所有点都属于同一个连通分量为止。在这个过程中,我们记录最后一条连接不同连通分量的边 ,并输出其 坐标之积。
根据这个思路,我们可以实现如下代码:
let mut closest_neighbor = (0..self.nodes.nrows()) .into_par_iter() .map(|i| { (0..self.nodes.nrows()) .filter_map(|j| (j != i).then_some((self.dist(i, j), i, j))) .min_by_key(|&(dist, _, _)| dist) .map_or_else( || unreachable!("There should be at least one other node"), Reverse, ) }) .collect::<BinaryHeap<_>>();let mut dsu = DisjointSet::new(self.nodes.nrows());loop { let Some(Reverse((_, i, j))) = closest_neighbor.pop() else { panic!("No more edges to process"); };
let root_i = dsu.find(i); let root_j = dsu.find(j); if root_i != root_j { dsu.union(i, j); }
if dsu.sizes.len() == 1 { return (self.nodes[[i, 0]] * self.nodes[[j, 0]]).to_string(); }
closest_neighbor.push( (0..self.nodes.nrows()) .filter_map(|k| (root_i != dsu.find(k)).then_some((self.dist(i, k), i, k))) .min_by_key(|&(dist, _, _)| dist) .map_or_else( || unreachable!("At least one different component should exist"), Reverse, ), );}八叉树表示
在上述方法中,尽管我们利用并行计算和一些动态更新策略减少了计算时间,但计算量仍然是 级别的,面对非常大的数据集时,仍然可能无法在合理时间内完成计算。即便是如本题中仅有 ,使用上述算法也需要 和 的时间。为此,我们需要进一步优化算法。
对于空间中的点对距离问题,我们很自然可以想到使用八叉树来划分空间。在八叉树中,空间将会被递归地划分为八个象限(如下图所示),从而使得我们可以快速地定位到某个点所在的空间区域,并且理论上可以高效地查询某个点在空间中的邻近点。
八叉树划分空间
不同颜色的立方体表示不同层级的八叉树节点,每个节点包含其子节点所覆盖的空间
八叉树划分空间
不同颜色的立方体表示不同层级的八叉树节点,每个节点包含其子节点所覆盖的空间
值得注意的是,在本题中,由于每个点的坐标均为整数,我们实际上可以使用 Morton 编码,也即
Morton 编码
将 坐标的二进制位交错排列,生成 Morton 编码值
Morton 编码
将 坐标的二进制位交错排列,生成 Morton 编码值
正如 Jeroen Baert 在他的博客中所描述的那样,我们可以使用一些“魔法位运算”来高效地
const fn interleave_bits_by_3(x: i64) -> u64 { let mut x = x.cast_unsigned() & 0x1ff_fff; x = (x | (x << 32)) & 0x1f_000_000_00f_fff; x = (x | (x << 16)) & 0x1f_000_0ff_000_0ff; x = (x | (x << 8)) & 0x1_00f_00f_00f_00f_00f; x = (x | (x << 4)) & 0x1_0c3_0c3_0c3_0c3_0c3; x = (x | (x << 2)) & 0x1_249_249_249_249_249; x}魔法位运算
通过位移和掩码操作,可以在数个步骤内实现二进制位的交织,而不需要循环
魔法位运算
通过位移和掩码操作,可以在数个步骤内实现二进制位的交织,而不需要循环
利用 Morton 编码,我们可以将空间中的点按照其 Morton 编码值进行排序,此时我们可以得到一条类似于下面这样的分形曲线:
Z-序曲线
按照 Morton 编码值排序后的点在空间中的分布构成一个遍历八个象限的分形曲线
Z-序曲线
按照 Morton 编码值排序后的点在空间中的分布构成一个遍历八个象限的分形曲线
Morton 编码值实际上隐含了对一个多维空间中的层级划分。由于我们将三维坐标的二进制位交错排列,因此在编码中每三位可以看作是一个层级,每个层级中的三位二进制数表示该层级中所在的八个象限之一。因此,我们实际上通过按照 Morton 编码排序已经隐式构建了一个八叉树结构,任意一个八叉树节点都可以表示为一个 Morton 编码值的前缀和/或一个对应的数组区间。
分支定界
在有了八叉树结构后,我们可以利用空间划分来加速最近邻搜索,也就是
由于在这两问中,我们都需要考虑所有点之间的距离,因此我们可以改用
- 对于当前的节点对 ,计算其覆盖空间之间的最小距离 ;
- (定界) 如果 大于当前已知的第 大距离,则跳过该节点对;
- (批次计算) 否则,如果 和 的大小低于某个阈值,则直接计算 和 中所有点对之间的距离,并更新最近邻堆;
- (分支) 否则,按照以下规则产出新的节点对并继续遍历:
- 如果 和 是同一个节点,则产出所有不重复的子节点两两组合的节点对();
- 否则,分裂较大的节点,产出其子节点与另一个节点的所有组合的节点对。
在计算 时,我们可以利用 Morton 编码的性质来快速估计两个节点覆盖空间之间的最小距离。具体来说,每个节点对应一个 Morton 编码值的前缀,其所有子节点对应的 Morton 编码值均以该前缀开头。因此我们可以从该前缀中提取出对应的坐标范围,从而得到该节点对应的
由前缀估计得到的 AABB 为理论上最大的包围盒,实际节点所包含的点集可能分布得更紧凑,从而导致实际 AABB 更小,如下图所示:
前缀估计的 AABB 与实际 AABB
(左)前缀估计的 AABB
(右)实际点集的 AABB
前缀估计的 AABB 与实际 AABB
(左)前缀估计的 AABB
(右)实际点集的 AABB
当点集分布较为均匀时,这种差异通常较小;但当点集分布极为不均匀时,差异可能会很大,从而影响剪枝效果。因此,在实际实现中,我们可以在构建八叉树时,预先计算出每个节点的实际 AABB,从而提升剪枝路径的命中率。
这样一来,我们就可以高效地计算所有点对之间的前 短距离,进而解决第一问。但是第二问呢?由于第二问中我们需要动态地更新连通分量,因此无法像第一问那样知道需要计算多少条边,从而无法直接使用双树遍历的方法。为此,我们还需要进一步改进。
最小生成树
实际上,我们可以重新审视第二问的需求。在之前的分析中,我们已经得出结论:我们只需要关注那些连接不同连通分量的边。这个过程实际上就是 Kruskal 算法构建
对于一个加权无向图 ,如果所有边的权重均不相等,那么 的最小生成树是唯一的。
反证法证明:假设 上存在两棵不同的最小生成树 和 ,。由于它们不同,必然存在一条边 属于 但不属于 ()。如果将 从 中移除,则 会变成两个连通分量 和 。
由于 是连通的,必然存在一条边 属于 但不属于 (),且连接 和 。这是因为,如果这样的边不存在,即 和 在 中也完全使用来自 的边连接,那么原先 中就已经存在一个环路,与 为生成树矛盾。
此时,我们可以比较 和 的权重。由于所有边的权重均不相等,必然有 。假设 ,则我们可以构造一棵新的生成树 ,其总权重小于 的总权重,这与 为最小生成树矛盾。同理,如果 ,则我们可以构造一棵新的生成树 ,其总权重小于 的总权重,这与 为最小生成树矛盾。
因此,假设不成立, 的最小生成树是唯一的。
事实上,我们确实有更快的构建最小生成树的算法,比如 Borůvka 算法。该算法的核心思想是:
- 初始化时,每个点作为一个独立的连通分量;
- 对于每个连通分量,找到其与其他任意连通分量之间的最短边;
- 将这些边加入最小生成树中,同时合并这些连通分量;
- 重复步骤 2 和 3,直到所有点都属于同一个连通分量为止。
利用和八叉树结合的双树遍历方法,我们可以高效地找到每个连通分量的最短边,从而快速构建最小生成树。并且我们可以添加一个额外的剪枝条件:如果当前处理的两个节点所属的连通分量已经相同,则跳过该节点对。完整的算法流程如下:
- 初始化所有点为独立的连通分量;
- 初始化每个连通分量的对短距离为无穷大;
- 将根节点对 入栈;
- 重复以下步骤直到栈为空:
- 对于栈顶的节点对 ,计算其覆盖空间之间的最小距离 ;
- 如果 大于 和 中包含的连通分量的最短距离的最大值,则跳过该节点对;
- 如果 和 所属的连通分量相同,则跳过该节点对;
- 如果 的大小低于某个阈值,则直接计算 和 中所有点对之间的距离,并更新每个连通分量的最短距离;
- 否则,按照以下规则产出新的节点对入栈:
- 如果 和 是同一个节点,则产出所有不重复的子节点两两组合的节点对();
- 否则,分裂较大的节点,产出其子节点与另一个节点的所有组合的节点对。
- 使用每个连通分量的最短边合并连通分量,并记录更新记录最长的边。
- 重复步骤 2 到 5,直到所有点都属于同一个连通分量为止。
核心算法实现
在本题的实际实现中,由于点集规模较小,我们实际可以采用一些细节上的优化:
- 完整构建八叉树,确定每个节点对应的点集区间,避免在遍历过程中频繁地进行点集划分;
- 预计算每个节点的实际边界盒,从而得到比依赖 Morton 编码前缀估计的更紧凑的边界盒,提升剪枝路径命中率
- 第二问中,每次循环时,预先计算出所有节点包含的连通分量,避免在遍历过程中频繁地进行并查集查询。
impl Puzzle {
fn deeper_search_nodes( &self, left_node_idx: usize, right_node_idx: usize, ) -> Vec<(usize, usize)> {
if left_node_idx == right_node_idx { let children = &self.octree_children[left_node_idx]; return children .iter() .enumerate() .flat_map(|(idx, &i)| children[idx..].iter().map(move |&j| (i, j))) .collect(); }
if self.octree[left_node_idx].size() > self.octree[right_node_idx].size() { self.octree_children[left_node_idx] .iter() .map(|&child| (child, right_node_idx)) .collect() } else { self.octree_children[right_node_idx] .iter() .map(|&child| (left_node_idx, child)) .collect() } }}
impl Solution for Puzzle { fn part1(&self) -> String { let mut min_dist = BinaryHeap::new(); let mut search_stack = vec![(0, 0)]; while let Some((left_node_idx, right_node_idx)) = search_stack.pop() { let left_node = self.octree[left_node_idx]; let right_node = self.octree[right_node_idx]; if let Some(&(delta, _, _)) = min_dist.peek() {
let (left_min, left_max) = self.octree_bounding_boxes[left_node_idx]; let (right_min, right_max) = self.octree_bounding_boxes[right_node_idx]; let mut min_possible_dist = 0; for dim in 0..3 { if left_max[dim] < right_min[dim] { min_possible_dist += (right_min[dim] - left_max[dim]).pow(2); } else if right_max[dim] < left_min[dim] { min_possible_dist += (left_min[dim] - right_max[dim]).pow(2); } } if min_possible_dist >= delta { continue; } }
if left_node.size() * right_node.size() <= self.max_steps { let pairs = if left_node_idx == right_node_idx {
(left_node.start..left_node.end) .flat_map(|i| (i + 1..left_node.end).map(move |j| (i, j))) .collect::<Vec<_>>() } else {
(left_node.start..left_node.end) .flat_map(|i| (right_node.start..right_node.end).map(move |j| (i, j))) .collect::<Vec<_>>() }; for (i, j) in pairs { let d = Self::dist(&self.nodes[i], &self.nodes[j]); if d > min_dist.peek().map_or(i64::MAX, |&(d, _, _)| d) { continue; } min_dist.push((d, i, j)); if min_dist.len() > self.max_steps { min_dist.pop(); } } continue; } search_stack.extend(self.deeper_search_nodes(left_node_idx, right_node_idx)); } let mut dsu = DisjointSet::new(self.nodes.len()); for (_, i, j) in min_dist { dsu.union(i, j); } dsu.sizes .values() .fold(BinaryHeap::new(), |mut heap, &size| { heap.push(Reverse(size)); if heap.len() > 3 { heap.pop(); } heap }) .iter() .map(|&Reverse(x)| x) .product::<u64>() .to_string() }
fn part2(&self) -> String { let mut dsu = DisjointSet::new(self.nodes.len()); let mut last_edge = (i64::MIN, 0, 0); let mut components = vec![BTreeSet::new(); self.octree.len()]; while dsu.sizes.len() > 1 {
let mut min_edges = dsu.sizes.keys().fold(HashMap::new(), |mut map, &comp| { map.insert(comp, (i64::MAX, 0, 0)); map });
self.compute_components(&mut dsu, &mut components); let mut search_stack = vec![(0, 0)]; while let Some((left_node_idx, right_node_idx)) = search_stack.pop() { let left_node = self.octree[left_node_idx]; let right_node = self.octree[right_node_idx];
if components[left_node_idx].len() == 1 && components[left_node_idx] == components[right_node_idx] { continue; }
let max_min_edge = components[left_node_idx] .iter() .chain(components[right_node_idx].iter()) .fold(i64::MIN, |max_edge, &comp| { let Some((d, _, _)) = min_edges.get(&comp) else { unreachable!("Every component should have an entry in min_edges") }; max_edge.max(*d) });
if max_min_edge < i64::MAX {
let (left_min, left_max) = self.octree_bounding_boxes[left_node_idx]; let (right_min, right_max) = self.octree_bounding_boxes[right_node_idx]; let mut min_possible_dist = 0; for dim in 0..3 { if left_max[dim] < right_min[dim] { min_possible_dist += (right_min[dim] - left_max[dim]).pow(2); } else if right_max[dim] < left_min[dim] { min_possible_dist += (left_min[dim] - right_max[dim]).pow(2); } } if min_possible_dist >= max_min_edge { continue; } }
if left_node.size() * right_node.size() <= 64 { let pairs = if left_node_idx == right_node_idx {
let nodes = (left_node.start..left_node.end) .map(|i| (i, dsu.find(self.nodes[i].index))) .collect::<Vec<_>>(); nodes .iter() .enumerate() .flat_map(|(idx, &i)| { nodes[idx + 1..] .iter() .filter_map(move |&j| (i.1 != j.1).then_some((i, j))) }) .collect::<Vec<_>>() } else {
let left_nodes = (left_node.start..left_node.end) .map(|i| (i, dsu.find(self.nodes[i].index))) .collect::<Vec<_>>(); let right_nodes = (right_node.start..right_node.end) .map(|i| (i, dsu.find(self.nodes[i].index))) .collect::<Vec<_>>();
left_nodes .into_iter() .flat_map(|i| { right_nodes .iter() .filter_map(move |&j| (i.1 != j.1).then_some((i, j))) }) .collect::<Vec<_>>() }; for ((i, ci), (j, cj)) in pairs { let d = Self::dist(&self.nodes[i], &self.nodes[j]); min_edges.entry(ci).and_modify(|entry| { if d < entry.0 { *entry = (d, i, j); } }); min_edges.entry(cj).and_modify(|entry| { if d < entry.0 { *entry = (d, i, j); } }); } continue; } search_stack.extend(self.deeper_search_nodes(left_node_idx, right_node_idx)); } for (_, (d, i, j)) in min_edges { dsu.union(self.nodes[i].index, self.nodes[j].index); if d > last_edge.0 { last_edge = (d, i, j); } } } (self.nodes[last_edge.1].coordinate[0] * self.nodes[last_edge.2].coordinate[0]).to_string() }}总结
完整代码见此处。
通过以上分析与实现,我们成功地将解法耗时降低到 和 ,尽管没有使用并行计算,仍然实现了 10-20 倍的性能提升。这很好地展示了利用八叉树的空间划分和分支定界策略能够有效地减少计算量,使得我们能够高效地处理点对之间的距离计算问题。当然,这种方法的实现相对复杂,需要仔细处理八叉树的构建、节点遍历以及剪枝条件等细节,这就需要根据具体的场景进行权衡和选择了。