当前位置:首页python > 正文

Python 算法:DAG 关键路径(Critical Path Method, CPM)

作者:野牛程序员:2025-12-22 11:25:15python阅读 1996
Python 算法:DAG 关键路径(Critical Path Method, CPM)
# /*
# Python 算法:DAG 关键路径(Critical Path Method, CPM)
# --------------------------------------------------------
# 原理概要:
# 关键路径(Critical Path)表示在 DAG 中所有依赖链中
# “整体耗时最长的路径”,决定项目最短完工时间。
#
# CPM 由两次拓扑动态规划构成:
#
# 1) 正向拓扑松弛 —— 计算最早开始时间 ES(Earliest Start)
#       ES[v] = max( ES[u] + duration(u, v) )
#
# 2) 反向拓扑松弛 —— 计算最迟开始时间 LS(Latest Start)
#       LS[u] = min( LS[v] - duration(u, v) )
#
# 关键路径性质:
# - ES == LS 的节点属于关键节点;
# - 延迟任一关键节点,将导致整体完工时间延迟;
# - 关键路径的总时长即项目总工期。
#
# 典型应用:
# - 工程施工排期 CPM/PERT
# - 任务工作流调度
# - 软件任务依赖图
# - 多阶段项目时间管理
# */

# --------------------------------------------------------
from collections import defaultdict, deque

class DAG_CriticalPath:
    def __init__(self):
        self.graph = defaultdict(list)      # u -> [(v, w)]
        self.rev_graph = defaultdict(list)  # v -> [(u, w)]
        self.nodes = set()

    # 添加边 u → v,耗时 w
    def add_edge(self, u, v, w):
        self.graph[u].append((v, w))
        self.rev_graph[v].append((u, w))
        self.nodes.add(u)
        self.nodes.add(v)

    # --------------------------------------------------------
    # 拓扑排序(Kahn)
    def topological_sort(self):
        indegree = defaultdict(int)

        for u in self.graph:
            for v, _ in self.graph[u]:
                indegree[v] += 1

        queue = deque([n for n in self.nodes if indegree[n] == 0])
        topo = []

        while queue:
            u = queue.popleft()
            topo.append(u)

            for v, _ in self.graph[u]:
                indegree[v] -= 1
                if indegree[v] == 0:
                    queue.append(v)

        return topo

    # 反拓扑序
    def reverse_topological_sort(self, topo):
        return topo[::-1]

    # --------------------------------------------------------
    # 关键路径算法 CPM
    def critical_path(self):
        topo = self.topological_sort()

        # ------------------------------
        # 1) 正向 DP:最早开始时间 ES
        ES = {n: float("-inf") for n in self.nodes}

        # 无入边节点 ES = 0
        for n in self.nodes:
            if not self.rev_graph[n]:
                ES[n] = 0

        for u in topo:
            if ES[u] != float("-inf"):
                for v, w in self.graph[u]:
                    ES[v] = max(ES[v], ES[u] + w)

        # ------------------------------
        # 2) 反向 DP:最迟开始时间 LS
        LS = {n: float("inf") for n in self.nodes}

        # 项目总工期 = 所有 ES 最大值
        project_time = max(ES.values())

        # 无出边节点 LS = 项目总工期
        for n in self.nodes:
            if not self.graph[n]:
                LS[n] = project_time

        for u in self.reverse_topological_sort(topo):
            for v, w in self.graph[u]:
                LS[u] = min(LS[u], LS[v] - w)

        # ------------------------------
        # 3) ES == LS → 关键节点
        critical_nodes = [n for n in self.nodes if ES[n] == LS[n]]

        return ES, LS, project_time, critical_nodes


# --------------------------------------------------------
# 示例 DAG(任务依赖关系 + 耗时)
G = DAG_CriticalPath()
G.add_edge("A", "B", 3)
G.add_edge("A", "C", 2)
G.add_edge("B", "D", 4)
G.add_edge("C", "D", 3)
G.add_edge("D", "E", 6)
G.add_edge("C", "F", 5)
G.add_edge("F", "E", 2)

# 运行 CPM
ES, LS, total, critical = G.critical_path()

# --------------------------------------------------------
# 输出结果
print("最早开始时间 ES:")
for k in ES:
    print(k, ":", ES[k])

print("\n最迟开始时间 LS:")
for k in LS:
    print(k, ":", LS[k])

print("\n项目总工期:", total)

print("\n关键路径节点:", critical)

# --------------------------------------------------------
# 预期输出示例:
# 最早开始时间 ES:
# A : 0
# C : 2
# B : 3
# D : 7
# F : 7
# E : 13
#
# 最迟开始时间 LS:
# A : 0
# C : 2
# B : 3
# D : 7
# F : 8
# E : 13
#
# 项目总工期: 13
#
# 关键路径节点: ['A', 'C', 'D', 'E']
#
# 要点总结:
# - ES/LS 用于判断关键路径;
# - 关键路径上的节点满足 ES == LS;
# - CPM 是工程管理与任务调度的重要基础工具。
# */


野牛程序员教少儿编程与信息学奥赛-微信|电话:15892516892
野牛程序员教少儿编程与信息学竞赛-微信|电话:15892516892
  • Python 算法:DAG 关键路径(Critical Path Method
  • CPM)
  • 相关推荐

    最新推荐

    热门点击