Python:常见算法题的工程价值,双指针、滑动窗口、递归、回溯、动态规划怎么入门

提到算法,最容易出现的误判通常是两种:

  • 一种是把它当成面试题,写完就结束
  • 另一种是觉得项目里用不到,所以直接跳过

真正写脚本、做数据处理、补测试工具、做任务调度之后,这几类算法会反复出现,只是名字不一定写在代码旁边:

  • 两份有序数据合并去重,常见是双指针
  • 一段连续时间里的异常统计,常见是滑动窗口
  • 扫目录、扫树形配置、扫依赖关系,常见是递归
  • 枚举补跑方案、组合任务、挑参数,常见是回溯
  • 在时间、容量、成本约束下选最优方案,常见是动态规划

这一篇不按题型分类去背,而是直接围绕一个实际脚本展开:夜间失败任务分析与补跑规划脚本

这个脚本要做五件事:

  1. 递归扫描 reports/ 目录,收集多个子目录下的失败任务文件
  2. 把两个来源的失败任务 ID 合并去重
  3. 找出 10 分钟内失败最密集的时间窗口
  4. 在 30 分钟补跑窗口内,枚举可执行的任务组合
  5. 当任务规模变大时,用动态规划求出收益更高的补跑方案

用这个场景把五类常见算法串起来,比单独背题更容易形成直觉:
算法不是额外一层知识,而是脚本处理链里的一种写法。

一、这篇文章要解决什么问题

读完这一篇,应该能独立完成这些动作:

  • 看出一个数据处理问题适不适合用双指针或滑动窗口
  • 写出最小递归函数去扫描目录或树形数据
  • 在小规模组合问题里用回溯枚举方案
  • 在规模变大之后把回溯改成动态规划
  • 识别几类典型错误,例如双指针漏处理尾部元素、回溯路径污染、0/1 背包写成完全背包

如果这些动作能独立做出来,算法就不再只是“会看题解”,而是开始能落进脚本和工具代码里。

二、先看这个实际脚本要处理什么事

假设现在有一个夜间任务系统,每天凌晨会跑一批数据同步任务。第二天上班前,需要一份失败任务分析与补跑建议。

目录结构大概像这样:

1
2
3
4
5
6
7
8
retry_assistant/
├── analyze_retry_plan.py
└── reports/
├── source_a/
│ ├── failed_01.json
│ └── failed_02.json
└── source_b/
└── failed_01.json

每个 JSON 文件里会放失败任务记录:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[
{
"task_id": 1001,
"failed_at": "2018-08-08 01:03:00",
"duration": 8,
"score": 12
},
{
"task_id": 1002,
"failed_at": "2018-08-08 01:08:00",
"duration": 6,
"score": 10
}
]

这里的字段含义很简单:

  • task_id:失败任务 ID
  • failed_at:失败时间
  • duration:补跑需要多少分钟
  • score:补跑成功后的业务收益分值

这篇文章不是讲怎么建平台,而是把下面这条最小处理链真正写出来:

  1. 先把文件找出来
  2. 再把任务读出来
  3. 再分析失败聚集时段
  4. 最后给出补跑建议

三、先给一个最小可运行示例

先把一份最小可运行脚本放出来,后面的每个知识点都围绕它展开。

analyze_retry_plan.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
from datetime import datetime
from pathlib import Path
import json


def collect_json_files(base_dir):
base_path = Path(base_dir)
result = []

def walk(current_path):
for item in current_path.iterdir():
if item.is_dir():
walk(item)
elif item.suffix == ".json":
result.append(item)

walk(base_path)
return sorted(result)


def merge_failed_ids(sorted_a, sorted_b):
i = 0
j = 0
merged = []

while i < len(sorted_a) and j < len(sorted_b):
if sorted_a[i] == sorted_b[j]:
if not merged or merged[-1] != sorted_a[i]:
merged.append(sorted_a[i])
i += 1
j += 1
elif sorted_a[i] < sorted_b[j]:
if not merged or merged[-1] != sorted_a[i]:
merged.append(sorted_a[i])
i += 1
else:
if not merged or merged[-1] != sorted_b[j]:
merged.append(sorted_b[j])
j += 1

while i < len(sorted_a):
if not merged or merged[-1] != sorted_a[i]:
merged.append(sorted_a[i])
i += 1

while j < len(sorted_b):
if not merged or merged[-1] != sorted_b[j]:
merged.append(sorted_b[j])
j += 1

return merged


def parse_time(text):
return datetime.strptime(text, "%Y-%m-%d %H:%M:%S")


def find_busiest_window(failed_times, window_minutes=10):
times = sorted(parse_time(item) for item in failed_times)
left = 0
best_count = 0
best_range = None

for right in range(len(times)):
while (times[right] - times[left]).total_seconds() > window_minutes * 60:
left += 1

current_count = right - left + 1
if current_count > best_count:
best_count = current_count
best_range = (times[left], times[right])

return best_count, best_range


def choose_tasks_by_backtracking(tasks, time_limit):
best_score = 0
best_plan = []
path = []

def dfs(index, used_minutes, current_score):
nonlocal best_score, best_plan

if used_minutes > time_limit:
return

if current_score > best_score:
best_score = current_score
best_plan = path[:]

if index == len(tasks):
return

path.append(tasks[index])
dfs(
index + 1,
used_minutes + tasks[index]["duration"],
current_score + tasks[index]["score"],
)
path.pop()

dfs(index + 1, used_minutes, current_score)

dfs(0, 0, 0)
return best_score, best_plan


def choose_tasks_by_dp(tasks, time_limit):
dp = [0] * (time_limit + 1)
pick = [[] for _ in range(time_limit + 1)]

for task in tasks:
duration = task["duration"]
score = task["score"]

for minute in range(time_limit, duration - 1, -1):
candidate_score = dp[minute - duration] + score
if candidate_score > dp[minute]:
dp[minute] = candidate_score
pick[minute] = pick[minute - duration] + [task]

return dp[time_limit], pick[time_limit]


def main():
source_a_ids = [1001, 1002, 1004, 1008]
source_b_ids = [1002, 1003, 1008, 1010]
merged_ids = merge_failed_ids(source_a_ids, source_b_ids)

failed_times = [
"2018-08-08 01:01:00",
"2018-08-08 01:03:00",
"2018-08-08 01:07:00",
"2018-08-08 01:08:00",
"2018-08-08 01:16:00",
"2018-08-08 01:19:00",
]
window_count, window_range = find_busiest_window(failed_times, window_minutes=10)

tasks = [
{"task_id": 1001, "duration": 8, "score": 12},
{"task_id": 1002, "duration": 6, "score": 10},
{"task_id": 1003, "duration": 7, "score": 13},
{"task_id": 1004, "duration": 9, "score": 15},
]

backtracking_score, backtracking_plan = choose_tasks_by_backtracking(tasks, 20)
dp_score, dp_plan = choose_tasks_by_dp(tasks, 20)

print("合并后的失败任务ID:", merged_ids)
print(
"10分钟内最密集失败窗口:",
window_count,
window_range[0].strftime("%H:%M:%S"),
"->",
window_range[1].strftime("%H:%M:%S"),
)
print("回溯最优收益:", backtracking_score)
print("回溯最优任务:", [item["task_id"] for item in backtracking_plan])
print("动态规划最优收益:", dp_score)
print("动态规划最优任务:", [item["task_id"] for item in dp_plan])


if __name__ == "__main__":
main()

执行命令:

1
python3 analyze_retry_plan.py

输出:

1
2
3
4
5
6
合并后的失败任务ID: [1001, 1002, 1003, 1004, 1008, 1010]
10分钟内最密集失败窗口: 4 01:01:00 -> 01:08:00
回溯最优收益: 35
回溯最优任务: [1001, 1003, 1004]
动态规划最优收益: 35
动态规划最优任务: [1001, 1003, 1004]

这个输出已经能对应到五类常见算法:

  • collect_json_files():递归
  • merge_failed_ids():双指针
  • find_busiest_window():滑动窗口
  • choose_tasks_by_backtracking():回溯
  • choose_tasks_by_dp():动态规划

四、双指针为什么会反复出现在脚本里

双指针很适合处理这类问题:

  • 两份有序数据合并
  • 两端收缩
  • 原地去重
  • 两个列表同步推进

在这个脚本里,两个来源的失败任务 ID 已经是升序的:

1
2
source_a_ids = [1001, 1002, 1004, 1008]
source_b_ids = [1002, 1003, 1008, 1010]

这时如果直接把它们拼起来再转 set,当然也能得到结果:

1
2
result = sorted(set(source_a_ids + source_b_ids))
print(result)

输出:

1
[1001, 1002, 1003, 1004, 1008, 1010]

这个写法在数据量不大时没问题。
但如果数据已经有序,而且后面还想顺手做去重、对齐、差异分析,双指针通常更自然,因为它保留了“同步推进”的过程。

最关键的部分是这段:

1
2
3
4
5
6
7
while i < len(sorted_a) and j < len(sorted_b):
if sorted_a[i] == sorted_b[j]:
...
elif sorted_a[i] < sorted_b[j]:
...
else:
...

这类代码真正要理解的不是模板,而是三个事实:

  1. 两个指针都只往前走,不回头
  2. 当前较小的那个值,如果不先处理,后面也不会再有更好的时机
  3. 相等时两个指针一起前进,能顺手完成去重

一个实际错误:忘了处理尾部剩余元素

错误写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def bad_merge_failed_ids(sorted_a, sorted_b):
i = 0
j = 0
merged = []

while i < len(sorted_a) and j < len(sorted_b):
if sorted_a[i] < sorted_b[j]:
merged.append(sorted_a[i])
i += 1
else:
merged.append(sorted_b[j])
j += 1

return merged


print(bad_merge_failed_ids([1, 3, 5], [2, 4, 6, 7]))

输出:

1
[1, 2, 3, 4, 5]

7 丢了。原因不是 Python 出问题,而是:

  • while 结束时,只代表有一边走完了
  • 另一边还没处理的尾部元素必须补进去

修复后就要补这两段:

1
2
3
4
5
6
7
while i < len(sorted_a):
merged.append(sorted_a[i])
i += 1

while j < len(sorted_b):
merged.append(sorted_b[j])
j += 1

这个写法在项目里通常会在哪出现

  • 合并两套来源的任务 ID
  • 合并两个升序日志偏移量列表
  • 对齐两个版本的测试用例清单
  • 比较新旧配置项差异

只要看到“两个有序列表同步前进”,就应该先想到双指针。

五、滑动窗口为什么适合做连续时间段统计

滑动窗口最适合的问题通常是:

  • 连续子数组
  • 固定时间窗口
  • 最长、最短、最多、最少这一类区间统计

在这个脚本里,值班最关心的问题通常不是“总共失败了几次”,而是:

某个连续 10 分钟里失败是不是突然扎堆了。

如果失败时间是:

1
2
3
4
5
6
7
8
failed_times = [
"2018-08-08 01:01:00",
"2018-08-08 01:03:00",
"2018-08-08 01:07:00",
"2018-08-08 01:08:00",
"2018-08-08 01:16:00",
"2018-08-08 01:19:00",
]

最密集的 10 分钟窗口就是:

1
01:01:00 -> 01:08:00,共 4 次失败

这段代码的关键只有一句:

1
2
while (times[right] - times[left]).total_seconds() > window_minutes * 60:
left += 1

它表达的是:

  • right 不断往右扩
  • 只要窗口已经超过 10 分钟,就把 left 往右收
  • 这样每次都能保证 [left, right] 是一个合法窗口

一个实际错误:拿字符串直接比较时间差

错误写法:

1
2
3
4
failed_times = ["01:01:00", "01:03:00", "01:15:00"]

if failed_times[2] - failed_times[0] > 600:
print("窗口过大")

执行:

1
python3 bad_window.py

输出:

1
TypeError: unsupported operand type(s) for -: 'str' and 'str'

修复方式不是硬猜字符串切片,而是先转成 datetime

1
2
3
from datetime import datetime

times = [datetime.strptime(item, "%H:%M:%S") for item in failed_times]

这个写法在项目里通常会在哪出现

  • 连续 5 分钟内错误数是否超阈值
  • 最近 1 小时内用户请求是否突然增多
  • 流式日志里慢请求最密集区间
  • 一段连续数据里最长稳定片段、最短覆盖片段

只要问题里有“连续区间”和“窗口大小变化”,就应该先想到滑动窗口。

六、递归不是神秘写法,它经常只是把目录一层层走完

第一次看递归时,很容易卡在“函数为什么又调用自己”。
放到目录扫描场景里通常就没那么抽象了。

当前脚本里,需要递归扫描 reports/ 目录下所有子目录中的 JSON 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
def collect_json_files(base_dir):
base_path = Path(base_dir)
result = []

def walk(current_path):
for item in current_path.iterdir():
if item.is_dir():
walk(item)
elif item.suffix == ".json":
result.append(item)

walk(base_path)
return sorted(result)

这个函数的思路很朴素:

  1. 先看当前目录里有什么
  2. 如果是子目录,就继续往下走
  3. 如果是 JSON 文件,就收集起来

执行命令:

1
python3 -c "from analyze_retry_plan import collect_json_files; print([str(p) for p in collect_json_files('reports')])"

输出大概像这样:

1
['reports/source_a/failed_01.json', 'reports/source_a/failed_02.json', 'reports/source_b/failed_01.json']

一个实际错误:递归里把结果变量写成默认参数

错误写法:

1
2
3
4
5
6
7
def bad_collect_json_files(base_dir, result=[]):
for item in Path(base_dir).iterdir():
if item.is_dir():
bad_collect_json_files(item, result)
elif item.suffix == ".json":
result.append(item)
return result

这个写法的坑点在默认参数 result=[]
函数多次调用时,这个列表会被复用,后面的调用可能带着上一次的结果继续跑。

修复方式有两个:

  • 要么像本文一样,把 result 放到外层函数局部变量里
  • 要么把默认参数写成 None,在函数内部初始化

这个写法在项目里通常会在哪出现

  • 扫目录收集测试数据文件
  • 扫树形配置
  • 遍历组织结构树
  • 遍历依赖关系或评论树

只要结构是“节点里还套节点”,递归通常就是自然写法。

七、回溯适合做小规模组合枚举

回溯经常出现在这类问题里:

  • 从一堆选项里找可行组合
  • 枚举所有路径
  • 选或不选
  • 放或不放

放到这个脚本里,最直接的场景就是:

现在只有 20 分钟补跑窗口,哪些失败任务值得优先补。

任务样例:

1
2
3
4
5
6
tasks = [
{"task_id": 1001, "duration": 8, "score": 12},
{"task_id": 1002, "duration": 6, "score": 10},
{"task_id": 1003, "duration": 7, "score": 13},
{"task_id": 1004, "duration": 9, "score": 15},
]

回溯版本做的事很直接:

  • 当前任务选,往下走
  • 当前任务不选,也往下走
  • 超时就剪枝返回
  • 每次记录更优解

关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def choose_tasks_by_backtracking(tasks, time_limit):
best_score = 0
best_plan = []
path = []

def dfs(index, used_minutes, current_score):
nonlocal best_score, best_plan

if used_minutes > time_limit:
return

if current_score > best_score:
best_score = current_score
best_plan = path[:]

if index == len(tasks):
return

path.append(tasks[index])
dfs(index + 1, used_minutes + tasks[index]["duration"], current_score + tasks[index]["score"])
path.pop()

dfs(index + 1, used_minutes, current_score)

dfs(0, 0, 0)
return best_score, best_plan

这个函数一眼看上去比双指针复杂,但真正要抓住的只有两件事:

  1. path 代表当前路径
  2. 每个任务都有“选”和“不选”两种分支

一个实际错误:忘记 pop() 导致路径污染

错误写法:

1
2
3
path.append(tasks[index])
dfs(index + 1, used_minutes + tasks[index]["duration"], current_score + tasks[index]["score"])
dfs(index + 1, used_minutes, current_score)

这里少了 path.pop(),后果通常是:

  • 当前分支选过的任务,会污染到后面的“不选分支”
  • 最终最优方案里可能出现并没有真正选中的任务

修复后必须恢复现场:

1
2
3
4
path.append(tasks[index])
dfs(...)
path.pop()
dfs(...)

回溯适合什么规模

回溯不是不能用,而是要看规模。

如果失败任务只有 10 个、12 个、15 个,回溯完全够用,还很好解释。
如果失败任务一下变成 80 个、100 个,这种“全组合枚举”很快就会变慢。

这时就该往动态规划过渡。

八、动态规划是在重复子问题上省时间

动态规划最容易被写成很抽象。
放到这个脚本里,它解决的其实还是上一节同一个问题:

20 分钟补跑窗口内,怎样拿到更高收益。

不同的是:

  • 回溯:把组合一个个试出来
  • 动态规划:把中间结果存下来,不重复算

这道题本质上就是一个 0/1 背包:

  • 每个任务只能选一次
  • duration 是体积
  • score 是价值
  • time_limit 是背包容量

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def choose_tasks_by_dp(tasks, time_limit):
dp = [0] * (time_limit + 1)
pick = [[] for _ in range(time_limit + 1)]

for task in tasks:
duration = task["duration"]
score = task["score"]

for minute in range(time_limit, duration - 1, -1):
candidate_score = dp[minute - duration] + score
if candidate_score > dp[minute]:
dp[minute] = candidate_score
pick[minute] = pick[minute - duration] + [task]

return dp[time_limit], pick[time_limit]

一个实际错误:分钟从小到大遍历,导致同一任务被重复使用

错误写法:

1
2
for minute in range(duration, time_limit + 1):
dp[minute] = max(dp[minute], dp[minute - duration] + score)

这个写法会把当前任务在同一轮里重复利用,等价于“一个任务可以被选无限次”。
可当前补跑任务显然只能选一次,这就错了。

修复方式是倒序遍历:

1
2
for minute in range(time_limit, duration - 1, -1):
...

这样每个任务在当前轮只会被使用一次,才符合 0/1 背包的语义。

动态规划在项目里通常会在哪出现

  • 固定时间内选择收益最高的一批任务
  • 固定容量下选择最优缓存项
  • 金额拆分、成本拆分、最少步骤
  • 配额预算、资源分配、最大收益组合

只要看到“有约束、求最优、子问题重复”,就要开始考虑动态规划。

九、把几类算法放回同一条实际处理链

单独看每个算法都不算难,真正容易断掉的是:

  • 刷题时会写
  • 一到项目里就认不出来

把这一篇里的脚本处理链重新整理一下,就能看到迁移关系:

1. 递归负责收集原始数据

先把散落在多个子目录里的任务文件找出来。

1
2
3
reports/source_a/failed_01.json
reports/source_a/failed_02.json
reports/source_b/failed_01.json

2. 双指针负责整理有序数据

把不同来源的失败任务 ID 合并去重,为后续分析准备一份稳定列表。

3. 滑动窗口负责定位异常时间段

判断故障是零散分布,还是集中在一段连续时间里爆发。

4. 回溯负责在小规模下枚举可行方案

任务数少时,直接找出所有可行补跑组合并选择最优解。

5. 动态规划负责在大规模下把最优解算快

任务数变大后,不再枚举所有组合,而是保存中间结果。

这就是为什么算法不能只背模板。
真正有用的是:看到数据形状和约束条件时,能把问题映射到合适的写法。

十、怎么测试这几个知识点

语言类文章不能只讲“能跑通”,还要讲“怎么测”。

先给一个最小测试文件:

test_analyze_retry_plan.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from analyze_retry_plan import (
merge_failed_ids,
find_busiest_window,
choose_tasks_by_dp,
)


def test_merge_failed_ids():
assert merge_failed_ids([1, 2, 4], [2, 3, 5]) == [1, 2, 3, 4, 5]


def test_find_busiest_window():
count, best_range = find_busiest_window(
[
"2018-08-08 01:01:00",
"2018-08-08 01:03:00",
"2018-08-08 01:07:00",
"2018-08-08 01:16:00",
],
window_minutes=10,
)
assert count == 3
assert best_range[0].strftime("%H:%M:%S") == "01:01:00"
assert best_range[1].strftime("%H:%M:%S") == "01:07:00"


def test_choose_tasks_by_dp():
tasks = [
{"task_id": 1001, "duration": 8, "score": 12},
{"task_id": 1002, "duration": 6, "score": 10},
{"task_id": 1003, "duration": 7, "score": 13},
{"task_id": 1004, "duration": 9, "score": 15},
]
score, plan = choose_tasks_by_dp(tasks, 20)
assert score == 35
assert [item["task_id"] for item in plan] == [1001, 1003, 1004]

执行命令:

1
pytest -q

输出:

1
2
...                                                                  [100%]
3 passed in 0.03s

测试这几类算法时,不要只测最顺的 happy path,至少还要补:

  • 双指针:一边为空、重复值很多、尾部未对齐
  • 滑动窗口:全部都在窗口内、全部都超出窗口、只有一条记录
  • 递归:空目录、嵌套多层目录、目录里混有非 JSON 文件
  • 回溯:时间刚好够、完全不够、最优解不唯一
  • 动态规划:容量为 0、任务比容量大、正好装满

十一、一个实际排错场景

有一类问题特别像真实项目里的线上 bug:

补跑规划脚本给出的结果里,出现了同一个任务被“选了两次”。

现象:

  • 输出结果收益异常高
  • 明明只有一条任务 1002
  • 结果里却像是把 1002 重复计算了

先看异常代码:

1
2
3
4
5
6
7
8
9
10
11
def bad_choose_tasks_by_dp(tasks, time_limit):
dp = [0] * (time_limit + 1)

for task in tasks:
duration = task["duration"]
score = task["score"]

for minute in range(duration, time_limit + 1):
dp[minute] = max(dp[minute], dp[minute - duration] + score)

return dp[time_limit]

排查顺序可以这样走:

  1. 先确认输入任务列表里有没有重复任务
  2. 再打印每一轮 dp 数组变化
  3. 发现同一轮里,当前任务刚更新出来的值又被后面的 minute 继续用了
  4. 这说明问题不在输入,而在状态转移顺序

实际打印:

1
2
3
4
for minute in range(duration, time_limit + 1):
old_value = dp[minute]
dp[minute] = max(dp[minute], dp[minute - duration] + score)
print("minute=", minute, "old=", old_value, "new=", dp[minute])

看到这里,根因就很清楚了:

  • 这是把 0/1 背包写成了完全背包
  • 任务被无限复用

修复方式:

1
2
for minute in range(time_limit, duration - 1, -1):
...

修完之后再跑:

1
pytest -q

输出:

1
2
...                                                                  [100%]
3 passed in 0.03s

这类排错过程很有代表性,因为它不是“记住倒序”这么简单,而是:

  • 先看现象
  • 再核对输入
  • 再看中间状态
  • 最后定位到状态转移顺序

十二、这些算法学完之后还要继续补什么

这一篇是入门,不是终点。

如果这几类写法已经能独立跑出来,后面建议继续补这些方向:

  • 哈希表:去重、计数、索引映射
  • 栈和队列:解析表达式、任务调度、广度优先遍历
  • 堆:优先级任务、TopK、延时任务
  • 树和图:依赖关系、权限继承、节点搜索
  • 排序和二分:查找区间、排名、阈值判断

但顺序不要反过来。
更稳的路径通常是:

  1. 先把几类高频模式写顺
  2. 再把模式和真实脚本问题一一对应
  3. 最后再回头做题巩固边界条件

十三、一个实际练习

可以直接照着这篇文章做一个最小练习,不需要额外平台:

  1. 新建一个 retry_assistant/ 目录
  2. 写一个 analyze_retry_plan.py
  3. 把文中的五个函数先抄一遍跑通
  4. 再自己补三组任务数据
  5. 观察回溯和动态规划得到的结果是否一致
  6. 把双指针函数改坏一次,再用测试把问题抓出来

建议至少补下面三组练习:

  • 练习 1:把两个失败任务 ID 列表改成有大量重复值
  • 练习 2:把失败时间改成跨 20 分钟,观察窗口如何收缩
  • 练习 3:把补跑任务改成 12 个,先跑回溯,再跑动态规划,感受速度差异

如果还能继续往前走,可以再做一版:

  • 真实读取 reports/ 目录下的 JSON 文件
  • collect_json_files() 扫描文件
  • 把测试补到 8 条以上

这样这篇文章里的内容就不只是“看懂了”,而是真正写过一遍。

十四、这一篇最后要记住什么

这五类算法放到一起看,会更容易建立工程直觉:

  • 双指针:两个有序序列同步推进
  • 滑动窗口:连续区间统计
  • 递归:树形结构或目录结构遍历
  • 回溯:小规模组合枚举
  • 动态规划:重复子问题下的最优解

真正有价值的不是背下题型名字,而是在写脚本时看到这些信号:

  • 数据是不是有序
  • 问题是不是连续区间
  • 结构是不是一层套一层
  • 选择是不是“选或不选”
  • 最优解里是不是有重复子问题

看到这些信号,算法就会从“题库里的东西”,变成“手边能用的工具”。
这才是这类基础文章真正要建立起来的能力。

十五、一个更接近值班现场的完整案例

只看单个函数时,这几类算法很容易又被拆散。
放回一次真实的值班处理链,会更容易理解它们为什么经常一起出现。

假设凌晨 01:00 到 01:20 之间,夜间同步任务连续失败,值班人员第二天 09:00 前需要给出两件事:

  1. 故障是不是集中爆发,而不是零散波动
  2. 如果补跑窗口只有 30 分钟,先补哪一批任务收益更高

这时更顺的处理顺序通常是:

1. 先递归收集失败文件

先确认不是只看了一个目录,漏掉了另一批失败记录。

执行:

1
python3 -c "from analyze_retry_plan import collect_json_files; print([str(p) for p in collect_json_files('reports')])"

输出:

1
['reports/source_a/failed_01.json', 'reports/source_a/failed_02.json', 'reports/source_b/failed_01.json']

如果这一步只扫了 source_a/,后面所有判断都会偏掉,因为输入本身就不完整。

2. 再用双指针合并两个来源的失败任务 ID

两个来源里有不少重复任务,不能直接按条数算,不然会把同一个任务当成两次失败。

1
2
3
4
source_a_ids = [1001, 1002, 1004, 1008, 1012]
source_b_ids = [1002, 1003, 1008, 1010, 1012]

print(merge_failed_ids(source_a_ids, source_b_ids))

输出:

1
[1001, 1002, 1003, 1004, 1008, 1010, 1012]

到这里,才拿到一份能继续分析的稳定任务清单。

3. 用滑动窗口确认失败是不是集中爆发

值班现场真正关心的通常不是“总共失败了多少次”,而是:

  • 是否在一个连续时间段里突然密集失败
  • 这更像系统性故障,还是零散脏数据
1
2
3
4
5
6
7
8
9
10
11
failed_times = [
"2018-08-08 01:02:00",
"2018-08-08 01:04:00",
"2018-08-08 01:05:00",
"2018-08-08 01:06:00",
"2018-08-08 01:09:00",
"2018-08-08 01:17:00",
]

count, best_range = find_busiest_window(failed_times, window_minutes=10)
print(count, best_range[0].strftime("%H:%M:%S"), best_range[1].strftime("%H:%M:%S"))

输出:

1
5 01:02:00 01:09:00

这时结论就比较明确:

  • 失败不是均匀分布
  • 01:02:0001:09:00 有明显聚集
  • 后续排查要优先对齐这个时间窗口去看配置发布、依赖服务重启、数据库锁等待或上游限流

4. 任务规模小的时候先用回溯看全量组合

假设这次只剩 8 条高价值失败任务要补跑,30 分钟窗口内完全可以先用回溯把组合枚举出来。

这一步的价值不是炫技,而是:

  • 能直接验证“最优解到底是不是这组”
  • 能把组合过程讲清楚
  • 出现异常时更容易追踪是哪一层分支选错了

5. 任务规模一大,立刻切到动态规划

如果候选任务扩到 40 条、60 条、80 条,再继续全组合枚举,值班处理就会开始拖时间。

这时更合适的做法是:

  • 回溯只保留给小规模验证
  • 动态规划承担正式计算

真正落到现场的判断通常不是“哪个算法更高级”,而是:

  • 现在任务规模多大
  • 要不要在有限时间里尽快给出稳定结果
  • 是先追求可解释,还是先追求计算效率

6. 最后把算法结论翻译成排障动作

算法本身不是结论,算法只是把判断变得更快、更稳。

这类场景里最终要交付的通常是下面这些动作:

  1. 标出最密集失败窗口,方便和监控、发布记录、日志时间线对齐
  2. 给出一份去重后的失败任务清单,避免重复补跑
  3. 给出 30 分钟窗口内的优先补跑方案
  4. 说明当前方案的边界,例如“这份结果只覆盖已有失败文件,不包含实时新增任务”

到这里,双指针、滑动窗口、递归、回溯、动态规划才真正从“题型”变成“值班时能落地的处理动作”。

十六、什么时候该用,什么时候不要硬套

算法文章最容易带来的另一个误判是:学完之后看到什么都想套模板。
更稳的方式是先看问题形状,再决定要不要用。

1. 双指针不是所有合并问题都要上

更适合双指针的前提通常是:

  • 两边数据已经有序
  • 需要同步推进
  • 还想顺手做去重、对齐、差异分析

如果数据本来就是无序的,而且只想快速求并集,直接 set 往往更省事。

2. 滑动窗口只适合连续区间问题

如果题目关心的是:

  • 连续 10 分钟
  • 连续 100 条记录
  • 最长连续片段

那滑动窗口通常很合适。
如果要统计的是“任意分散的几条记录”,那就不是窗口问题,硬套只会把代码写复杂。

3. 递归适合层级结构,不适合无脑代替循环

目录、树、嵌套配置这类天然分层结构,用递归通常很顺。
但如果层级极深,或者中间状态需要精细控制,显式栈有时会比递归更容易排查。

4. 回溯适合小规模、强约束、需要看组合过程的问题

回溯的优势是:

  • 好解释
  • 好验证
  • 好看出路径选择过程

但它的边界也很明确:

  • 规模一大,速度会明显下降
  • 分支一多,日志和调试会迅速膨胀

所以它更适合小规模枚举和验证,不适合拿去硬扛大批量正式计算。

5. 动态规划适合重复子问题明显、且目标是最优解的问题

如果问题里同时出现了这些信号:

  • 每一步都有选与不选
  • 子问题会反复出现
  • 最终目标是最大、最小、最优

那就应该开始考虑动态规划。
如果只是简单过滤、去重、统计,直接上动态规划反而会让实现和排错都变重。

真正有工程价值的不是“会背多少模板”,而是:

  • 什么时候用简单写法就够了
  • 什么时候该换成更系统的写法
  • 什么时候继续优化已经不划算

十七、结语

算法题真正值得学的,不是为了把题库刷完,而是为了在写脚本、做数据处理、补测试工具、排值班问题时,能更快看出问题结构。

回头看这一篇里的五类模式:

  • 双指针处理有序数据同步推进
  • 滑动窗口处理连续区间统计
  • 递归处理层级结构遍历
  • 回溯处理小规模组合枚举
  • 动态规划处理重复子问题下的最优解

只要能把“数据形状、约束条件、目标类型”这三件事看清,算法就不会再停留在题解层,而会开始变成代码设计能力的一部分。

这也是语言类算法文章最应该建立的基础:
不是多会做题,而是开始知道代码为什么这样写、什么时候该换一种写法、出了问题该从哪一层先查。