Python 算法:DAG 关键路径(Critical Path Method, CPM)
作者:野牛程序员:2025-12-22 11:25:15python阅读 1996
Python 算法:DAG 关键路径(Critical Path Method, CPM)
# /*
# Python 算法:DAG 关键路径(Critical Path Method, CPM)
# --------------------------------------------------------
# 原理概要:
# 关键路径(Critical Path)表示在 DAG 中所有依赖链中
# “整体耗时最长的路径”,决定项目最短完工时间。
#
# CPM 由两次拓扑动态规划构成:
#
# 1) 正向拓扑松弛 —— 计算最早开始时间 ES(Earliest Start)
# ES[v] = max( ES[u] + duration(u, v) )
#
# 2) 反向拓扑松弛 —— 计算最迟开始时间 LS(Latest Start)
# LS[u] = min( LS[v] - duration(u, v) )
#
# 关键路径性质:
# - ES == LS 的节点属于关键节点;
# - 延迟任一关键节点,将导致整体完工时间延迟;
# - 关键路径的总时长即项目总工期。
#
# 典型应用:
# - 工程施工排期 CPM/PERT
# - 任务工作流调度
# - 软件任务依赖图
# - 多阶段项目时间管理
# */
# --------------------------------------------------------
from collections import defaultdict, deque
class DAG_CriticalPath:
def __init__(self):
self.graph = defaultdict(list) # u -> [(v, w)]
self.rev_graph = defaultdict(list) # v -> [(u, w)]
self.nodes = set()
# 添加边 u → v,耗时 w
def add_edge(self, u, v, w):
self.graph[u].append((v, w))
self.rev_graph[v].append((u, w))
self.nodes.add(u)
self.nodes.add(v)
# --------------------------------------------------------
# 拓扑排序(Kahn)
def topological_sort(self):
indegree = defaultdict(int)
for u in self.graph:
for v, _ in self.graph[u]:
indegree[v] += 1
queue = deque([n for n in self.nodes if indegree[n] == 0])
topo = []
while queue:
u = queue.popleft()
topo.append(u)
for v, _ in self.graph[u]:
indegree[v] -= 1
if indegree[v] == 0:
queue.append(v)
return topo
# 反拓扑序
def reverse_topological_sort(self, topo):
return topo[::-1]
# --------------------------------------------------------
# 关键路径算法 CPM
def critical_path(self):
topo = self.topological_sort()
# ------------------------------
# 1) 正向 DP:最早开始时间 ES
ES = {n: float("-inf") for n in self.nodes}
# 无入边节点 ES = 0
for n in self.nodes:
if not self.rev_graph[n]:
ES[n] = 0
for u in topo:
if ES[u] != float("-inf"):
for v, w in self.graph[u]:
ES[v] = max(ES[v], ES[u] + w)
# ------------------------------
# 2) 反向 DP:最迟开始时间 LS
LS = {n: float("inf") for n in self.nodes}
# 项目总工期 = 所有 ES 最大值
project_time = max(ES.values())
# 无出边节点 LS = 项目总工期
for n in self.nodes:
if not self.graph[n]:
LS[n] = project_time
for u in self.reverse_topological_sort(topo):
for v, w in self.graph[u]:
LS[u] = min(LS[u], LS[v] - w)
# ------------------------------
# 3) ES == LS → 关键节点
critical_nodes = [n for n in self.nodes if ES[n] == LS[n]]
return ES, LS, project_time, critical_nodes
# --------------------------------------------------------
# 示例 DAG(任务依赖关系 + 耗时)
G = DAG_CriticalPath()
G.add_edge("A", "B", 3)
G.add_edge("A", "C", 2)
G.add_edge("B", "D", 4)
G.add_edge("C", "D", 3)
G.add_edge("D", "E", 6)
G.add_edge("C", "F", 5)
G.add_edge("F", "E", 2)
# 运行 CPM
ES, LS, total, critical = G.critical_path()
# --------------------------------------------------------
# 输出结果
print("最早开始时间 ES:")
for k in ES:
print(k, ":", ES[k])
print("\n最迟开始时间 LS:")
for k in LS:
print(k, ":", LS[k])
print("\n项目总工期:", total)
print("\n关键路径节点:", critical)
# --------------------------------------------------------
# 预期输出示例:
# 最早开始时间 ES:
# A : 0
# C : 2
# B : 3
# D : 7
# F : 7
# E : 13
#
# 最迟开始时间 LS:
# A : 0
# C : 2
# B : 3
# D : 7
# F : 8
# E : 13
#
# 项目总工期: 13
#
# 关键路径节点: ['A', 'C', 'D', 'E']
#
# 要点总结:
# - ES/LS 用于判断关键路径;
# - 关键路径上的节点满足 ES == LS;
# - CPM 是工程管理与任务调度的重要基础工具。
# */野牛程序员教少儿编程与信息学奥赛-微信|电话:15892516892

