数据结构和算法最容易被写成两种极端:
- 一种是只背定义,写成概念表
- 另一种是只刷题,写完也不知道项目里什么时候会遇到
真正开始写脚本、接口工具、任务执行器之后,这些内容会反复出现,而且不是以“考试题”的方式出现,而是以这些问题出现:
- 一批任务读进来之后,先放哪
- 等待执行的任务应该先进先出,还是后进先出
- 某个任务是否已经处理过,怎么快速判断
- 最近处理过的几条任务怎么保留下来
- 一段处理逻辑为什么顺序错了、重复了、越跑越慢了
这一篇不按定义背,而是直接做一件实际的事:写一个任务补偿执行脚本。
这个脚本要完成这些事:
- 读取一批待补偿任务
- 按顺序执行任务
- 跳过重复任务
- 任务失败时执行回滚动作
- 保存最近处理过的几条任务,方便排查
用这一条链,把五种内容串起来:
list 负责承接原始批量数据
queue 负责等待执行的先进先出任务
stack 负责失败时的回滚动作
hash 负责去重和快速索引
linked list 负责理解节点串接和最近记录保存
一、这篇文章要解决什么问题
读完这一篇,应该能独立完成这些动作:
- 说清楚列表、链表、栈、队列、哈希分别适合解决什么问题
- 在一个实际脚本里把它们组合起来,而不是各写各的
- 知道为什么 Python 队列场景优先用
deque 而不是 list.pop(0)
- 知道为什么哈希表的键必须可哈希
- 看懂一个最小链表实现,不再把“节点指针”当成完全陌生的东西
- 给这类脚本补最小测试
- 遇到顺序错乱、重复执行、键错误时,知道从哪排查
如果这些动作能独立做出来,数据结构与算法就不再只是“学过”,而是开始进入实际开发阶段。
二、先看这篇文章要完成的实际场景
假设现在有一个最简单的补偿任务文件 tasks.txt:
1 2 3 4 5
| T-100,sync_user,alice T-101,send_email,bob T-100,sync_user,alice T-102,rebuild_cache,carol T-103,fail,dave
|
每一行代表一条补偿任务,格式固定为:
这篇文章要把下面这条执行链写出来:
- 先把文件内容读成一个列表
- 再把任务放进等待队列
- 用哈希结构判断重复任务
- 每执行一步,就把回滚动作压入栈
- 保存最近处理过的三条任务,方便排查
执行结果大概像这样:
1 2 3 4 5 6 7
| 执行 T-100 owner=alice action=sync_user 执行 T-101 owner=bob action=send_email 跳过重复任务 T-100 执行 T-102 owner=carol action=rebuild_cache 执行 T-103 失败,执行回滚 cleanup:T-103 最近处理任务: ['T-103', 'T-102', 'T-101'] 已处理任务数: 4
|
这个需求很小,但已经足够把五种结构都带出来。
三、先给一个最小可运行示例
先不读文件,先把最小版本跑通。
新建 task_runner.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| from collections import deque
tasks = [ {"id": "T-100", "action": "sync_user", "owner": "alice"}, {"id": "T-101", "action": "send_email", "owner": "bob"}, {"id": "T-100", "action": "sync_user", "owner": "alice"}, ]
waiting_queue = deque(tasks) processed_ids = set()
while waiting_queue: task = waiting_queue.popleft()
if task["id"] in processed_ids: print("跳过重复任务", task["id"]) continue
processed_ids.add(task["id"]) print("执行", task["id"], task["action"])
|
执行命令:
输出:
1 2 3
| 执行 T-100 sync_user 执行 T-101 send_email 跳过重复任务 T-100
|
这个最小示例已经带出了两件很重要的事:
- 等待执行的任务适合用队列
- 已处理任务 ID 适合用哈希集合做去重
后面只是把这个最小版本继续补完整。
四、先把五种结构对应到实际角色
先不要急着背定义,先把角色和代码位置对上。
1. 列表:承接一批原始数据
从文件读出来的一批任务,天然就是一批有顺序的数据:
1 2 3 4
| tasks = [ {"id": "T-100", "action": "sync_user", "owner": "alice"}, {"id": "T-101", "action": "send_email", "owner": "bob"}, ]
|
列表最适合做这层工作,因为:
- 它擅长保存一批元素
- 保留原始顺序
- 适合遍历、切片、排序、批量处理
列表更像“装着一批数据的盒子”。
2. 队列:负责等待执行的任务
任务执行通常要求先进先出。
先来的任务先处理,后来的排后面,这就是队列:
1 2 3 4
| from collections import deque
waiting_queue = deque(tasks) task = waiting_queue.popleft()
|
这里不只是语义合适,时间复杂度也合适:
deque.popleft() 是 O(1)
list.pop(0) 是 O(n)
如果任务量稍大,用列表头删很快就会拖慢脚本。
3. 栈:负责回滚动作
回滚最典型的特征就是后进先出。
最后做的那一步,应该最先撤销。这正是栈:
1 2 3 4 5
| rollback_stack = [] rollback_stack.append("cleanup:T-100") rollback_stack.append("cleanup:T-101")
print(rollback_stack.pop())
|
输出:
4. 哈希:负责去重和快速索引
判断任务是否处理过,最自然的结构不是列表,而是哈希集合:
1 2
| processed_ids = {"T-100", "T-101"} print("T-100" in processed_ids)
|
输出:
如果还想通过任务 ID 快速拿到整条任务数据,就用哈希字典:
1 2 3 4
| task_index = { "T-100": {"id": "T-100", "action": "sync_user", "owner": "alice"}, "T-101": {"id": "T-101", "action": "send_email", "owner": "bob"}, }
|
5. 链表:理解节点串接和最近记录
Python 日常开发里不一定经常手写链表,但它还是必须理解,因为它会反复出现在这些地方:
deque 这类双端队列的内部思路
- LRU 缓存
- 节点拼接、摘除、插入
- 最近访问记录、任务链、对象链
链表和列表最大的差异是:
- 列表靠“下标”找元素
- 链表靠“节点指向下一个节点”串起来
下面会用一个最小链表结构来保存最近处理过的三条任务。
五、把列表先用在第一层数据准备
先把文件读取动作写出来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| from pathlib import Path
def load_tasks(file_path: str) -> list[dict]: tasks = []
for line_no, raw_line in enumerate(Path(file_path).read_text().splitlines(), start=1): line = raw_line.strip() if not line: continue
parts = [part.strip() for part in line.split(",")] if len(parts) != 3: raise ValueError(f"line {line_no} format error: {raw_line}")
task_id, action, owner = parts tasks.append({ "id": task_id, "action": action, "owner": owner, })
return tasks
|
调用:
1 2
| tasks = load_tasks("tasks.txt") print(tasks)
|
输出:
1
| [{'id': 'T-100', 'action': 'sync_user', 'owner': 'alice'}, {'id': 'T-101', 'action': 'send_email', 'owner': 'bob'}, {'id': 'T-100', 'action': 'sync_user', 'owner': 'alice'}, {'id': 'T-102', 'action': 'rebuild_cache', 'owner': 'carol'}, {'id': 'T-103', 'action': 'fail', 'owner': 'dave'}]
|
这一层用列表很合适,因为它就是“把一批任务先装进来”。
这一步不要急着上队列,因为:
- 文件读取结束后,通常还要做排序、过滤、预校验
- 这些动作对列表更自然
所以更常见的工程写法是:
- 先读成列表
- 完成预处理
- 再放进队列执行
六、队列为什么更适合待执行任务
很多新手第一反应会这样写:
1 2 3 4
| tasks = ["T-100", "T-101", "T-102"]
while tasks: print(tasks.pop(0))
|
输出:
顺序看起来没问题,但这段代码有一个实际问题:
任务量小时感觉不到,任务量大一点就会开始慢。
更合适的写法是:
1 2 3 4 5 6
| from collections import deque
queue = deque(["T-100", "T-101", "T-102"])
while queue: print(queue.popleft())
|
输出:
顺序一样,但结构更合适。
如果这个脚本是实际运行在定时任务里,任务量从几十条涨到几千条时,这种差异就会开始明显。
七、栈为什么适合回滚
假设执行任务时,每成功一步都要准备一个对应的回滚动作:
1 2 3 4 5 6 7 8
| rollback_stack = []
rollback_stack.append("cleanup:T-100") rollback_stack.append("cleanup:T-101") rollback_stack.append("cleanup:T-102")
print(rollback_stack.pop()) print(rollback_stack.pop())
|
输出:
1 2
| cleanup:T-102 cleanup:T-101
|
这正好符合回滚顺序:
如果这里错误地用队列来存回滚动作,撤销顺序就会反过来,容易留下脏状态。
这也是为什么事务回滚、表达式求值、函数调用栈这些场景会反复碰到“栈”。
八、哈希为什么最适合做去重和索引
先看最直观的去重场景。
1. 用集合判断任务是否已经处理过
1 2 3 4 5 6 7 8 9
| processed_ids = set()
for task_id in ["T-100", "T-101", "T-100"]: if task_id in processed_ids: print("重复任务:", task_id) continue
processed_ids.add(task_id) print("开始执行:", task_id)
|
输出:
1 2 3
| 开始执行: T-100 开始执行: T-101 重复任务: T-100
|
2. 用字典快速拿整条任务
1 2 3 4 5 6 7
| tasks = [ {"id": "T-100", "action": "sync_user"}, {"id": "T-101", "action": "send_email"}, ]
task_index = {task["id"]: task for task in tasks} print(task_index["T-101"])
|
输出:
1
| {'id': 'T-101', 'action': 'send_email'}
|
这里的“哈希”不是一个抽象词,而是一个很实际的工程能力:
如果用列表做同样的事,就要反复线性扫描。
九、链表为什么不常直接写,但必须理解
链表最大的价值不在“项目里每天手写”,而在于理解一类结构的运行方式。
先给一个最小链表节点:
1 2 3 4
| class Node: def __init__(self, value): self.value = value self.next = None
|
如果要把三条任务串成一个链:
1 2 3 4 5 6
| first = Node("T-100") second = Node("T-101") third = Node("T-102")
first.next = second second.next = third
|
这时候结构就是:
链表和列表的差异马上就能看出来:
- 列表可以直接
tasks[1]
- 链表不能直接按下标跳过去,只能顺着
next 往后走
它的代价是按位置查找慢。
它的好处是某些插入、摘除动作很直接。
为了让这个概念真正落地,下面用一个最小单链表,保存最近处理过的三条任务。
十、完整脚本:把五种结构串成一条执行链
task_runner.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
| from collections import deque from pathlib import Path
class Node: def __init__(self, value): self.value = value self.next = None
class RecentTaskList: def __init__(self, limit=3): self.limit = limit self.head = None self.tail = None self.size = 0
def push_front(self, value): node = Node(value) node.next = self.head self.head = node
if self.tail is None: self.tail = node
self.size += 1
if self.size > self.limit: self._drop_tail()
def _drop_tail(self): if self.head is None: return
if self.head is self.tail: self.head = None self.tail = None self.size = 0 return
current = self.head while current.next is not self.tail: current = current.next
current.next = None self.tail = current self.size -= 1
def to_list(self): values = [] current = self.head
while current is not None: values.append(current.value) current = current.next
return values
def load_tasks(file_path: str) -> list[dict]: tasks = []
for line_no, raw_line in enumerate(Path(file_path).read_text().splitlines(), start=1): line = raw_line.strip() if not line: continue
parts = [part.strip() for part in line.split(",")] if len(parts) != 3: raise ValueError(f"line {line_no} format error: {raw_line}")
task_id, action, owner = parts tasks.append({ "id": task_id, "action": action, "owner": owner, })
return tasks
def run_tasks(tasks: list[dict]) -> dict: waiting_queue = deque(tasks) processed_ids = set() task_index = {task["id"]: task for task in tasks} rollback_stack = [] recent_tasks = RecentTaskList(limit=3) logs = []
while waiting_queue: task = waiting_queue.popleft()
if task["id"] in processed_ids: logs.append(f"跳过重复任务 {task['id']}") continue
processed_ids.add(task["id"]) recent_tasks.push_front(task["id"]) rollback_stack.append(f"cleanup:{task['id']}")
if task["action"] == "fail": rollback_action = rollback_stack.pop() logs.append(f"执行 {task['id']} 失败,执行回滚 {rollback_action}") continue
logs.append( f"执行 {task['id']} owner={task['owner']} action={task['action']}" )
return { "logs": logs, "processed_count": len(processed_ids), "recent_tasks": recent_tasks.to_list(), "task_index_keys": sorted(task_index.keys()), }
if __name__ == "__main__": sample_path = Path("tasks.txt") sample_path.write_text( "\n".join([ "T-100,sync_user,alice", "T-101,send_email,bob", "T-100,sync_user,alice", "T-102,rebuild_cache,carol", "T-103,fail,dave", ]) )
tasks = load_tasks("tasks.txt") report = run_tasks(tasks)
for line in report["logs"]: print(line)
print("最近处理任务:", report["recent_tasks"]) print("已处理任务数:", report["processed_count"]) print("任务索引键:", report["task_index_keys"])
|
这段代码里五种结构的角色已经很清楚:
tasks:列表,承接原始输入
waiting_queue:队列,按顺序消费任务
rollback_stack:栈,保存回滚动作
processed_ids 和 task_index:哈希集合 / 哈希字典,负责去重和索引
RecentTaskList:链表,保存最近处理任务
十一、实际执行命令
把上面的脚本保存后,执行:
如果要补最小测试,再执行:
十二、实际输出结果
python task_runner.py 输出:
1 2 3 4 5 6 7 8
| 执行 T-100 owner=alice action=sync_user 执行 T-101 owner=bob action=send_email 跳过重复任务 T-100 执行 T-102 owner=carol action=rebuild_cache 执行 T-103 失败,执行回滚 cleanup:T-103 最近处理任务: ['T-103', 'T-102', 'T-101'] 已处理任务数: 4 任务索引键: ['T-100', 'T-101', 'T-102', 'T-103']
|
这个输出已经能直接验证几个关键点:
- 队列保证了执行顺序
- 哈希集合挡住了重复任务
- 栈在失败时弹出了最后一个回滚动作
- 链表保留了最近三条处理记录
十三、错误示例与修复示例
下面不写抽象“注意事项”,直接看实际会出现的错误。
1. 错误示例:把队列写成了栈
错误写法:
1 2 3 4
| tasks = ["T-100", "T-101", "T-102"]
while tasks: print(tasks.pop())
|
输出:
这已经不是先进先出,而是后进先出。
修复写法:
1 2 3 4 5 6
| from collections import deque
queue = deque(["T-100", "T-101", "T-102"])
while queue: print(queue.popleft())
|
输出:
2. 错误示例:把列表当成哈希键
错误写法:
1 2 3
| route_count = {} route = ["GET", "/orders"] route_count[route] = 1
|
输出:
1
| TypeError: unhashable type: 'list'
|
原因很直接:
修复写法:
1 2 3 4
| route_count = {} route = ("GET", "/orders") route_count[route] = 1 print(route_count)
|
输出:
3. 错误示例:链表首节点处理漏掉了尾指针
错误写法:
1 2 3 4 5 6 7 8 9
| class BrokenRecentTaskList: def __init__(self): self.head = None self.tail = None
def push_front(self, value): node = Node(value) node.next = self.head self.head = node
|
如果第一条数据压进去后没有同时维护 tail,后面删尾时就会出错。
修复写法就是完整处理首节点场景:
1 2
| if self.tail is None: self.tail = node
|
链表问题的难点通常不在定义,而在边界:
十四、怎么测试这个知识点
数据结构文章也不能停在“跑通了”,最少要补两类测试:
- 正常路径
- 边界和坏数据
test_task_runner.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| import pytest
from task_runner import RecentTaskList, load_tasks, run_tasks
def test_run_tasks_dedup_and_recent_order(tmp_path): file_path = tmp_path / "tasks.txt" file_path.write_text( "\n".join([ "T-100,sync_user,alice", "T-101,send_email,bob", "T-100,sync_user,alice", "T-102,rebuild_cache,carol", "T-103,fail,dave", ]) )
tasks = load_tasks(str(file_path)) report = run_tasks(tasks)
assert report["processed_count"] == 4 assert report["recent_tasks"] == ["T-103", "T-102", "T-101"] assert report["task_index_keys"] == ["T-100", "T-101", "T-102", "T-103"]
def test_load_tasks_raise_for_bad_line(tmp_path): file_path = tmp_path / "tasks.txt" file_path.write_text("bad-line")
with pytest.raises(ValueError): load_tasks(str(file_path))
def test_recent_task_list_keep_only_latest_three(): recent = RecentTaskList(limit=3) recent.push_front("T-100") recent.push_front("T-101") recent.push_front("T-102") recent.push_front("T-103")
assert recent.to_list() == ["T-103", "T-102", "T-101"]
|
执行:
输出:
1 2
| ... [100%] 3 passed in 0.03s
|
这三条测试分别覆盖了:
- 队列 + 栈 + 哈希 + 链表组合后的正常结果
- 坏数据输入
- 链表的容量边界
十五、一个实际排错场景
下面看一个这类脚本里很常见的实际问题。
现象
某次补偿任务执行后,日志里出现了这样的顺序:
1 2 3 4
| 执行 T-103 执行 T-102 执行 T-101 执行 T-100
|
文件里的原始顺序明明是从 T-100 到 T-103,结果完全反过来了。
第一反应
先别猜是不是文件读取顺序错了,先看“消费待执行任务”这一步到底用了什么结构。
排查
打开脚本,看到了这一段:
1 2 3 4 5
| tasks = load_tasks("tasks.txt")
while tasks: task = tasks.pop() print("执行", task["id"])
|
问题已经很清楚:
修复
改成:
1 2 3 4 5 6 7
| from collections import deque
waiting_queue = deque(load_tasks("tasks.txt"))
while waiting_queue: task = waiting_queue.popleft() print("执行", task["id"])
|
回归验证
再补一条最小测试,确保顺序不会再反:
1 2 3 4 5 6 7 8 9 10 11 12
| def test_task_order_is_fifo(): tasks = [ {"id": "T-100", "action": "a", "owner": "alice"}, {"id": "T-101", "action": "b", "owner": "bob"}, {"id": "T-102", "action": "c", "owner": "carol"}, ]
report = run_tasks(tasks)
assert report["logs"][0].startswith("执行 T-100") assert report["logs"][1].startswith("执行 T-101") assert report["logs"][2].startswith("执行 T-102")
|
这个排错过程里,真正起作用的不是“多打印几行日志”,而是先把现象映射回结构选择:
- 顺序反了,先怀疑队列和栈有没有用错
- 重复执行了,先怀疑哈希去重有没有生效
- 最近记录不对,先怀疑链表头尾有没有维护好
十六、这一节学完之后还要继续补什么
把这一篇真正吃透之后,下一步最适合继续补的是:
- 排序和查找
- 双指针和滑动窗口
- 树、堆、优先队列
- 图和依赖关系
因为脚本和服务继续往前写,很快会碰到:
这一篇只是先把最基础的五块骨架搭起来。
十七、一个实际练习
不要只停在读懂代码,直接做下面这个练习。
练习目标:
- 在现有任务执行脚本上,增加“重试两次后再失败”的能力
要求:
- 等待重试的任务仍然进入队列
- 每个任务的重试次数用哈希字典记录
- 最近失败任务继续保存在链表里
- 回滚动作仍然使用栈
建议输出:
1 2 3
| 执行 T-103 失败,第 1 次重试 执行 T-103 失败,第 2 次重试 执行 T-103 最终失败,执行回滚 cleanup:T-103
|
如果这个练习能独立写出来,说明这几种结构已经不再是孤立名词,而是真的进入了代码组织能力。
十八、小结
这篇文章里,列表、链表、栈、队列、哈希没有分开各讲一遍,而是放进了一条实际执行链。
回头看这条链:
- 一批输入任务先落到列表
- 待执行任务进队列
- 回滚动作进栈
- 去重和索引用哈希
- 最近记录用链表维护
这也是学习数据结构与算法更稳的方式:
- 不从抽象定义起步
- 不停留在刷题答案
- 直接问它在实际代码里承担什么角色
能把“结构”和“场景”连起来,后面再学排序、树、堆、图,理解速度会快很多。