💻 Python collections
基础
python内置库。
collections库是一个内置模块,提供了一系列高效且实用的数据结构,作为标准数据类型(如列表、字典、元组)的补充和增强。这些数据结构在特定场景下能显著提高代码的效率和可读性。
ChainMap
ChainMap用于将多个字典(或其他映射)组合在一起,形成一个逻辑上的单一字典视图。它按照传入映射的顺序查找键,优先返回第一个匹配的键值对。
ChainMap在Python3.3及以上版本中可用。
优点:
- 动态视图:ChainMap 创建的是一个动态视图,不复制数据,节省内存。
- 优先级查找:按顺序查找键,适合处理配置、命令行参数等场景。
- 上下文管理:可以用于管理不同作用域的变量,如局部变量和全局变量。
使用场景:
- 合并多个字典并保持查找顺序。
- 管理多个配置源,如默认配置、用户配置和命令行参数。
- 实现嵌套作用域的变量查找,如在编程语言解释器中模拟局部和全局变量。
常用方法和属性:
maps
:一个列表,包含了 ChainMap 中所有映射的引用。new_child(m=None)
:返回一个新的 ChainMap,包含一个新的映射(默认为空字典)在原有映射之前。parents
:返回一个新的 ChainMap,包含除第一个映射外的所有映射。__getitem__(key)
:按顺序在映射中查找键。__setitem__(key, value)
:在第一个映射中设置键值对。__delitem__(key)
:在第一个映射中删除键。get(key, default=None)
:获取键的值,如果键不存在,返回默认值。keys()
:返回所有映射中键的并集。values()
:返回所有映射中值的列表(可能包含重复)。items()
:返回所有映射中键值对的列表(可能包含重复)。
注意:
- ChainMap 是一个视图,不复制数据,对原始映射的修改会反映在ChainMap中。【ChainMap 并不是通过复制原始数据来创建一个新的独立对象,而是直接引用传入的原始映射(通常是字典),并将它们组合成一个逻辑上的视图。因此,任何对原始映射的修改都会直接反映在 ChainMap 中,反之亦然。这样的好处是:节省内存(不需要额外的存储空间来复制数据)和动态更新(原始映射的任何变化会实时反映在ChainMap中)】
- 修改操作(如
__setitem__
和__delitem__
)只影响第一个映射。- keys()、values() 和 items() 方法返回所有映射的并集,可能包含重复的键。
- ChainMap 不是线程安全的,多线程使用时需自行加锁。
使用Demo:
In [1]: from collections import ChainMap
In [2]: dict1 = {'a': 1, 'b': 2}
In [3]: dict2 = {'b': 3, 'c': 4}
In [4]: cm = ChainMap(dict1, dict2)
In [5]: cm
Out[5]: ChainMap({'a': 1, 'b': 2}, {'b': 3, 'c': 4})
# 查找键 'b',优先返回 dict1 中的值
In [6]: cm['b']
Out[6]: 2
# 查找键 'c',在 dict2 中找到
In [7]: cm['c']
Out[7]: 4
# 添加新键值对,只影响第一个映射(dict1)
In [8]: cm['d'] = 5
In [9]: cm
Out[9]: ChainMap({'a': 1, 'b': 2, 'd': 5}, {'b': 3, 'c': 4})
# 删除键 'a',只影响第一个映射(dict1)
In [10]: del cm['a']
In [11]: cm
Out[11]: ChainMap({'b': 2, 'd': 5}, {'b': 3, 'c': 4})
# 使用 new_child() 创建一个新的 ChainMap
In [12]: child_cm = cm.new_child({'e': 6})
In [13]: child_cm
Out[13]: ChainMap({'e': 6}, {'b': 2, 'd': 5}, {'b': 3, 'c': 4})
In [14]: child_cm['e']
Out[14]: 6
In [15]: child_cm['b']
Out[15]: 2
# 使用 parents 属性获取除第一个映射外的 ChainMap
In [16]: child_cm.parents['b']
Out[16]: 2
高级使用(Advanced):
① 配置管理
ChainMap 可以用于合并多个配置源,如默认配置、用户配置和命令行参数,优先级从高到低排列。
default_config = {'theme': 'light', 'language': 'en'}
user_config = {'theme': 'dark'}
command_line_config = {'language': 'fr'}
config = ChainMap(command_line_config, user_config, default_config)
print(config['theme']) # 输出: 'dark' (来自 user_config)
print(config['language']) # 输出: 'fr' (来自 command_line_config)
② 模拟嵌套作用域
在编程语言解释器或模板引擎中,ChainMap 可以模拟局部和全局变量的作用域。
global_vars = {'x': 10, 'y': 20}
local_vars = {'x': 100, 'z': 30}
scope = ChainMap(local_vars, global_vars)
print(scope['x']) # 输出: 100 (局部变量)
print(scope['y']) # 输出: 20 (全局变量)
③ 上下文管理 使用 new_child()
方法可以在函数调用时创建新的局部变量作用域。
base = ChainMap({'a': 1})
local = base.new_child({'b': 2})
print(local['a']) # 输出: 1
print(local['b']) # 输出: 2
④ 实现默认值
ChainMap 可以与 defaultdict 结合,为缺失的键提供默认值。
from collections import ChainMap, defaultdict
default_values = defaultdict(lambda: 'default')
cm = ChainMap({}, default_values)
print(cm['missing']) # 输出: 'default'
⑤ 线程安全的 ChainMap
ChainMap 本身不是线程安全的,可以通过加锁实现线程安全。
import threading
from collections import ChainMap
lock = threading.Lock()
cm = ChainMap({})
def thread_safe_set(key, value):
with lock:
cm[key] = value
def thread_safe_get(key):
with lock:
return cm.get(key)
⑥ 实现 LRU 缓存
ChainMap 可以通过与 OrderedDict 结合实现简单的 LRU 缓存。
from collections import ChainMap, OrderedDict
cache = ChainMap(OrderedDict())
def lru_get(key):
if key in cache:
value = cache[key]
del cache[key]
cache[key] = value
return value
return None
LRU:Least Recently Used,最近最少使用
实现:将元素按照访问顺序存储在队列中,当缓存满时,删除最久未使用的元素。
Counter
UserDict
UserList
UserString
defaultdict
内置dict字典的子类。
它的主要特点是为不存在的键自动提供一个默认值,从而避免在访问缺失键时抛出KeyError。
defaultdict在需要动态初始化键值对的场景中非常实用。
优点:
- 避免 KeyError:无需显式检查键是否存在,直接访问即可得到默认值
- 简化代码:在需要累加、分组或初始化复杂数据结构的场景中,代码更简洁
- 高效灵活:默认值通过工厂函数动态生成,支持各种类型(如 int、list、set 等)
使用场景:
- 统计元素出现次数(类似 Counter)
- 分组数据(例如按键收集值列表)
- 动态构建嵌套数据结构(如字典中的列表或集合)
常用方法:
- 继承了dict的所有方法,如get()、set()、update()、pop()、keys()、values()、items()等。
- 新增了:’
- default_factory:默认值工厂函数。
__missing__(key)
:当键不存在时,返回默认值(使用default_factory生成默认值赋值给该键)
注意:
- default_factory 必须是可调用对象:可以是内置类型(如 int、list)、自定义函数或 lambda 表达式,但不能直接是值(例如 0 或 [])。
- 初始化时赋值:在构造 defaultdict 时传入初始键值对,不会调用 default_factory
dd = defaultdict(int, {'a': 5}) print(dd['a']) # 输出: 5 print(dd['b']) # 输出: 0
- 与普通字典的转换:可以用 dict(dd) 将 defaultdict 转为普通字典
Demo:
In [17]: from collections import defaultdict
# 使用int作为default_factory,默认值为0
In [18]: dd = defaultdict(int)
In [19]: dd['a'] += 1
In [20]: dd['a']
Out[20]: 1
In [21]: dd['b']
Out[21]: 0
# 使用list做默认值:
In [22]: d2 = defaultdict(list)
In [23]: d2['fruits'].append('apple')
In [24]: d2['fruits'].append('banana')
In [25]: d2['veggies'].append('carrot')
In [26]: d2
Out[26]: defaultdict(list, {'fruits': ['apple', 'banana'], 'veggies': ['carrot']})
In [27]: d2['rice']
Out[27]: []
# 可以自定义默认值:
In [28]: def default_value():
...: return 'N/A'
...:
In [29]: d3 = defaultdict(default_value)
In [30]: d3['name']
Out[30]: 'N/A'
In [31]: d3['age']
Out[31]: 'N/A'
高级使用(Advanced):
① 统计频率
defaultdict(int) 可以用来统计元素出现次数:
from collections import defaultdict
text = "hello world"
freq = defaultdict(int)
for char in text:
freq[char] += 1
print(freq) # 输出: defaultdict(<class 'int'>, {'h': 1, 'e': 1, 'l': 3, 'o': 2, ' ': 1, 'w': 1, 'r': 1, 'd': 1})
② 分组数据
使用 defaultdict(list) 将数据按键分组
from collections import defaultdict
data = [('fruit', 'apple'), ('veggie', 'carrot'), ('fruit', 'banana')]
groups = defaultdict(list)
for category, item in data:
groups[category].append(item)
print(groups) # 输出: defaultdict(<class 'list'>, {'fruit': ['apple', 'banana'], 'veggie': ['carrot']})
③ 嵌套结构
创建嵌套的 defaultdict,用于多级数据
from collections import defaultdict
# 嵌套 defaultdict
nested_dd = defaultdict(lambda: defaultdict(int))
nested_dd['user1']['login'] += 1
nested_dd['user2']['logout'] += 1
print(nested_dd) # 输出: defaultdict(<function <lambda> at ...>, {'user1': defaultdict(<class 'int'>, {'login': 1}), 'user2': defaultdict(<class 'int'>, {'logout': 1})})
④ 与 ChainMap 结合
将 defaultdict 作为 ChainMap 的默认层,提供兜底值
from collections import ChainMap, defaultdict
defaults = defaultdict(lambda: "Unknown")
cm = ChainMap({'name': 'Alice'}, defaults)
print(cm['name']) # 输出: Alice
print(cm['address']) # 输出: Unknown
⑤ 线程安全考虑
defaultdict 本身不是线程安全的,多线程环境下需要加锁
import threading
from collections import defaultdict
dd = defaultdict(int)
lock = threading.Lock()
def thread_safe_increment(key):
with lock:
dd[key] += 1
⑥ 实现计数器替代品
可以用 defaultdict 代替 Counter,虽然功能不如 Counter 丰富,但可以实现类似的功能
from collections import defaultdict
items = ['a', 'b', 'a', 'c']
counter = defaultdict(int)
for item in items:
counter[item] += 1
print(counter['a']) # 输出: 2
deque
double-ended queue 双端队列(缩写 -> deque)
class collections.deque([iterable[, maxlen]])
即:可以在队列的两端进行添加和删除元素的操作。
比普通列表更高效的插入和删除,在实现栈、队列等数据结构时非常有用。
常用方法:
- append(x)
- appendleft(x)
- clear()
- copy()
- count(x)
- extend(iterable)
- extendleft(iterable)
- index(x, [start[, stop]])
- insert(i, x)
- pop()
- popleft()
- revmoe(value)
- reverse()
- rotate(n=1)
- maxlen: 用于限制队列的最大长度。如果队列已满,再添加元素时,会从队列的左侧删除元素。
rotate()
用于将队列中的元素向左或向右循环移动。它的作用是将队列中的元素移动指定的步数,如果步数为正数,则向右移动,如果步数为负数,则向左移动。移动后,队列中的元素会重新排列,但不会改变队列中元素的数量。
from collections import deque
# 创建一个 deque
d = deque([1, 2, 3, 4, 5])
# 向右移动 2 步
d.rotate(2)
# 打印移动后的 deque
print(d) # 输出:deque([4, 5, 1, 2, 3])
# 向左移动 3 步
d.rotate(-3)
# 打印移动后的 deque
print(d) # 输出:deque([2, 3, 4, 5, 1])
Demo:
>>> from collections import deque
>>> d = deque('ghi') # make a new deque with three items
>>> for elem in d: # iterate over the deque's elements
print(elem.upper())
G
H
I
>>> d.append('j') # add a new entry to the right side
>>> d.appendleft('f') # add a new entry to the left side
>>> d # show the representation of the deque
deque(['f', 'g', 'h', 'i', 'j'])
>>> d.pop() # return and remove the rightmost item
'j'
>>> d.popleft() # return and remove the leftmost item
'f'
>>> list(d) # list the contents of the deque
['g', 'h', 'i']
>>> d[0] # peek at leftmost item
'g'
>>> d[-1] # peek at rightmost item
'i'
>>> list(reversed(d)) # list the contents of a deque in reverse
['i', 'h', 'g']
>>> 'h' in d # search the deque
True
>>> d.extend('jkl') # add multiple elements at once
d
>>> deque(['g', 'h', 'i', 'j', 'k', 'l'])
>>> d.rotate(1) # right rotation
>>> d
deque(['l', 'g', 'h', 'i', 'j', 'k'])
>>> d.rotate(-1) # left rotation
>>> d
deque(['g', 'h', 'i', 'j', 'k', 'l'])
>>> deque(reversed(d)) # make a new deque in reverse order
deque(['l', 'k', 'j', 'i', 'h', 'g'])
>>> d.clear() # empty the deque
>>> d.pop() # cannot pop from an empty deque
Traceback (most recent call last):
File "<pyshell#6>", line 1, in -toplevel-
d.pop()
IndexError: pop from an empty deque
>>> d.extendleft('abc') # extendleft() reverses the input order
>>> d
deque(['c', 'b', 'a'])
一些使用:
- 返回一个deque对象,包含了指定文件的最后n行(调用者可以使用for循环来遍历队列中的元素,并对他们进行处理)(类似于linux tail 命令)
def tail(filename, n=10): 'Return the last n lines of a file' with open(filename) as f: return deque(f, n)
- 计算给定可迭代对象的移动平均值
使用deque对象来存储最近n个元素,并使用sum()函数计算这些元素的总和。然后,它从可迭代对象中获取下一个元素,并将其添加到队列的右侧。接下来,它从队列的左侧删除最旧的元素,并将其从总和中减去。然后,它将新元素添加到队列的右侧,并将更新后的总和除以n,得到当前的移动平均值,并使用yield语句将其返回。
itertools.isclie()
: 从可迭代对象中获取前 n-1 个元素,并将它们添加到队列中def moving_average(iterable, n=3): # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0 # https://en.wikipedia.org/wiki/Moving_average it = iter(iterable) d = deque(itertools.islice(it, n-1)) d.appendleft(0) s = sum(d) for elem in it: s += elem - d.popleft() d.append(elem) yield s / n
- round-robin scheduler 轮循
将多个可迭代对象中的元素交替返回,直到所有可迭代对象都被耗尽。这个函数的实现基于一个算法,称为“轮流取元素”(Round-robin)。在这个算法中,我们从每个可迭代对象中取出一个元素,然后再从下一个可迭代对象中取出一个元素,直到所有可迭代对象都被耗尽。map()
:将可迭代对象转换为迭代器def roundrobin(*iterables): "roundrobin('ABC', 'D', 'EF') --> A D E B F C" iterators = deque(map(iter, iterables)) while iterators: try: while True: yield next(iterators[0]) iterators.rotate(-1) except StopIteration: # Remove an exhausted iterator. iterators.popleft()
- del d[n]的双端队列实现
def delete_nth(d, n): d.rotate(-n) d.popleft() d.rotate(n) # [1, 2, 3, 4, 5] # delete_nth(5, 2) # [3, 4, 5, 1, 2] # [4, 5, 1, 2] # [1, 2, 4, 5]
高级使用(Advanced):
① 实现循环缓冲区(Circular Buffer):使用 deque 可以轻松地实现循环缓冲区,即一个固定大小的缓冲区,可以在其中添加和删除元素,当缓冲区已满时,新元素会覆盖最旧的元素。可以使用 deque 的 maxlen 参数来限制队列的最大长度,从而实现循环缓冲区的功能。
② 实现 LRU 缓存(Least Recently Used Cache):LRU 缓存是一种常用的缓存算法,用于限制缓存的大小,并自动删除最近最少使用的元素。可以使用 deque 来实现 LRU 缓存,将最近使用的元素添加到队列的右侧,将最近最少使用的元素从队列的左侧删除。
③ 实现滑动窗口(Sliding Window):滑动窗口是一种常见的数据处理技术,用于在序列中滑动一个固定大小的窗口,并对窗口中的元素进行处理。可以使用 deque 来实现滑动窗口,将窗口中的元素存储在队列中,并使用 rotate() 方法来移动窗口。
④ 实现线程安全的队列:deque 不是线程安全的,但可以使用 queue 模块中的 Queue 类来创建一个线程安全的队列,该类基于 deque 实现,并提供了锁机制来保证线程安全。
之所以说deque是非线程安全 → 准确说是部分线程安全,因为:append()、appendleft()、pop()、popleft() 是线程安全,剩下的操作,尤其是迭代是非线程安全。
注意:在未来,collections.deque会变成线程安全的。 --disable-gil
提案。3.13版本。 demo: 如下,会出现很多错误:untimeError: deque mutated during iteration
,原因是在producer线程进行append()时,是线程安全,同时consumer进行迭代,非线程安全,就会报错。 \
import threading
from collections import deque
import time
def producer(d):
for i in range(1000):
d.append(i)
time.sleep(0.001)
def consumer(d):
while True:
try:
for item in d:
print(item)
d.popleft() # This can cause RuntimeError
except RuntimeError:
print("RuntimeError: deque mutated during iteration")
time.sleep(0.001)
d = deque()
producer_thread = threading.Thread(target=producer, args=(d,))
consumer_thread = threading.Thread(target=consumer, args=(d,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
# run:
0
RuntimeError: deque mutated during iteration
1
RuntimeError: deque mutated during iteration
2
RuntimeError: deque mutated during iteration
3
RuntimeError: deque mutated during iteration
4
RuntimeError: deque mutated during iteration
5
...
...
解决办法是:添加同步机制,如加锁,保证在同一时间,只有一个线程可以访问到deque。
如下demo,符合预期。
import threading
from collections import deque
import time
def producer(d, lock):
for i in range(1000):
with lock:
d.append(i)
time.sleep(0.001)
def consumer(d, lock):
while True:
try:
with lock:
items = list(d) # Create a copy to iterate over
for item in items:
print(item)
d.popleft()
except IndexError:
pass # deque is empty
time.sleep(0.001)
d = deque()
lock = threading.Lock()
producer_thread = threading.Thread(target=producer, args=(d, lock))
consumer_thread = threading.Thread(target=consumer, args=(d, lock))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
# run:
0
1
2
3
4
5
...
...
⑤ 实现优先级队列:可以使用 deque 来实现优先级队列,将元素按照优先级顺序存储在队列中。
如下是collections.deque实现的优先级序列demo,实际上有很多实现方式,这里只是展示一种使用方式。
复杂度:O(n)
insert(): 插入元素时,按照优先级顺序插入
pop(): 弹出元素时,弹出优先级最高的元素
peek(): 查看优先级最高的元素
from collections import deque
class PriorityQueue:
def __init__(self):
self.queue = deque()
def is_empty(self):
return len(self.queue) == 0
def insert(self, priority, value):
# Create a new item as a tuple (priority, value)
item = (priority, value)
# If the queue is empty, simply append the item
if self.is_empty():
self.queue.append(item)
else:
# Iterate through the queue to find the correct position for the new item
for index, current_item in enumerate(self.queue):
if current_item[0] > priority:
# Insert the new item before the current item
self.queue.insert(index, item)
break
else:
# If we reach the end of the queue, append the item
self.queue.append(item)
def pop(self):
if self.is_empty():
raise IndexError("pop from an empty priority queue")
# Pop the item with the highest priority (which is at the front of the deque)
return self.queue.popleft()
def peek(self):
if self.is_empty():
raise IndexError("peek from an empty priority queue")
# Peek at the item with the highest priority (which is at the front of the deque)
return self.queue[0]
def __str__(self):
return str(self.queue)
# Example usage:
pq = PriorityQueue()
pq.insert(3, 'task 3')
pq.insert(1, 'task 1')
pq.insert(2, 'task 2')
pq.insert(1, 'task 1 again')
print("Priority Queue:", pq)
print("Peek:", pq.peek())
print("Pop:", pq.pop())
print("Priority Queue after pop:", pq)
# run:
Peek: (1, 'task 1')
Pop: (1, 'task 1')
Priority Queue after pop: deque([(1, 'task 1 again'), (2, 'task 2'), (3, 'task 3')])
还可以通过heapq来实现优先级队列,复杂度O(log n),参见heapq模块。
⑥ 实现双端队列的切片操作:可以使用 deque 的 rotate() 方法和切片操作来实现双端队列的切片操作,从而实现类似列表的切片操作。例如,可以使用 rotate() 方法将队列中的元素向左或向右移动,然后使用切片操作获取需要的元素。
⑦ 实现数据结构的快速删除:可以使用 deque 来实现一些数据结构的快速删除操作,例如删除链表中的中间节点、删除二叉树中的节点等。这些操作可以使用 deque 来存储需要删除的元素,并使用 popleft() 方法来从队列的左侧删除元素,从而实现快速删除。