💻 Python collections

基础

python内置库。

collections库是一个内置模块,提供了一系列高效且实用的数据结构,作为标准数据类型(如列表、字典、元组)的补充和增强。这些数据结构在特定场景下能显著提高代码的效率和可读性。

ChainMap

ChainMap用于将多个字典(或其他映射)组合在一起,形成一个逻辑上的单一字典视图。它按照传入映射的顺序查找键,优先返回第一个匹配的键值对。
ChainMap在Python3.3及以上版本中可用。

优点:

  • 动态视图:ChainMap 创建的是一个动态视图,不复制数据,节省内存。
  • 优先级查找:按顺序查找键,适合处理配置、命令行参数等场景。
  • 上下文管理:可以用于管理不同作用域的变量,如局部变量和全局变量。

使用场景:

  • 合并多个字典并保持查找顺序。
  • 管理多个配置源,如默认配置、用户配置和命令行参数。
  • 实现嵌套作用域的变量查找,如在编程语言解释器中模拟局部和全局变量。

常用方法和属性:

  • maps:一个列表,包含了 ChainMap 中所有映射的引用。
  • new_child(m=None):返回一个新的 ChainMap,包含一个新的映射(默认为空字典)在原有映射之前。
  • parents:返回一个新的 ChainMap,包含除第一个映射外的所有映射。
  • __getitem__(key):按顺序在映射中查找键。
  • __setitem__(key, value):在第一个映射中设置键值对。
  • __delitem__(key):在第一个映射中删除键。
  • get(key, default=None):获取键的值,如果键不存在,返回默认值。
  • keys():返回所有映射中键的并集。
  • values():返回所有映射中值的列表(可能包含重复)。
  • items():返回所有映射中键值对的列表(可能包含重复)。

注意:

  • ChainMap 是一个视图,不复制数据,对原始映射的修改会反映在ChainMap中。【ChainMap 并不是通过复制原始数据来创建一个新的独立对象,而是直接引用传入的原始映射(通常是字典),并将它们组合成一个逻辑上的视图。因此,任何对原始映射的修改都会直接反映在 ChainMap 中,反之亦然。这样的好处是:节省内存(不需要额外的存储空间来复制数据)和动态更新(原始映射的任何变化会实时反映在ChainMap中)】
  • 修改操作(如 __setitem____delitem__)只影响第一个映射。
  • keys()、values() 和 items() 方法返回所有映射的并集,可能包含重复的键。
  • ChainMap 不是线程安全的,多线程使用时需自行加锁。

使用Demo:

In [1]: from collections import ChainMap

In [2]: dict1 = {'a': 1, 'b': 2}

In [3]: dict2 = {'b': 3, 'c': 4}

In [4]: cm = ChainMap(dict1, dict2)

In [5]: cm
Out[5]: ChainMap({'a': 1, 'b': 2}, {'b': 3, 'c': 4})

# 查找键 'b',优先返回 dict1 中的值
In [6]: cm['b']
Out[6]: 2

# 查找键 'c',在 dict2 中找到
In [7]: cm['c']
Out[7]: 4

# 添加新键值对,只影响第一个映射(dict1)
In [8]: cm['d'] = 5

In [9]: cm
Out[9]: ChainMap({'a': 1, 'b': 2, 'd': 5}, {'b': 3, 'c': 4})

# 删除键 'a',只影响第一个映射(dict1)
In [10]: del cm['a']

In [11]: cm
Out[11]: ChainMap({'b': 2, 'd': 5}, {'b': 3, 'c': 4})

# 使用 new_child() 创建一个新的 ChainMap
In [12]: child_cm = cm.new_child({'e': 6})

In [13]: child_cm
Out[13]: ChainMap({'e': 6}, {'b': 2, 'd': 5}, {'b': 3, 'c': 4})

In [14]: child_cm['e']
Out[14]: 6

In [15]: child_cm['b']
Out[15]: 2

# 使用 parents 属性获取除第一个映射外的 ChainMap
In [16]: child_cm.parents['b']
Out[16]: 2

高级使用(Advanced)

配置管理
ChainMap 可以用于合并多个配置源,如默认配置、用户配置和命令行参数,优先级从高到低排列。

default_config = {'theme': 'light', 'language': 'en'}
user_config = {'theme': 'dark'}
command_line_config = {'language': 'fr'}

config = ChainMap(command_line_config, user_config, default_config)
print(config['theme'])  # 输出: 'dark' (来自 user_config)
print(config['language'])  # 输出: 'fr' (来自 command_line_config)

模拟嵌套作用域
在编程语言解释器或模板引擎中,ChainMap 可以模拟局部和全局变量的作用域。

global_vars = {'x': 10, 'y': 20}
local_vars = {'x': 100, 'z': 30}

scope = ChainMap(local_vars, global_vars)
print(scope['x'])  # 输出: 100 (局部变量)
print(scope['y'])  # 输出: 20 (全局变量)

上下文管理 使用 new_child() 方法可以在函数调用时创建新的局部变量作用域。

base = ChainMap({'a': 1})
local = base.new_child({'b': 2})
print(local['a'])  # 输出: 1
print(local['b'])  # 输出: 2

实现默认值
ChainMap 可以与 defaultdict 结合,为缺失的键提供默认值。

from collections import ChainMap, defaultdict

default_values = defaultdict(lambda: 'default')
cm = ChainMap({}, default_values)
print(cm['missing'])  # 输出: 'default'

线程安全的 ChainMap
ChainMap 本身不是线程安全的,可以通过加锁实现线程安全。

import threading
from collections import ChainMap

lock = threading.Lock()
cm = ChainMap({})

def thread_safe_set(key, value):
    with lock:
        cm[key] = value

def thread_safe_get(key):
    with lock:
        return cm.get(key)

实现 LRU 缓存
ChainMap 可以通过与 OrderedDict 结合实现简单的 LRU 缓存。

from collections import ChainMap, OrderedDict

cache = ChainMap(OrderedDict())
def lru_get(key):
    if key in cache:
        value = cache[key]
        del cache[key]
        cache[key] = value
        return value
    return None

LRU:Least Recently Used,最近最少使用
实现:将元素按照访问顺序存储在队列中,当缓存满时,删除最久未使用的元素。

Counter

UserDict

UserList

UserString

defaultdict

内置dict字典的子类。
它的主要特点是为不存在的键自动提供一个默认值,从而避免在访问缺失键时抛出KeyError。
defaultdict在需要动态初始化键值对的场景中非常实用。

优点:

  • 避免 KeyError:无需显式检查键是否存在,直接访问即可得到默认值
  • 简化代码:在需要累加、分组或初始化复杂数据结构的场景中,代码更简洁
  • 高效灵活:默认值通过工厂函数动态生成,支持各种类型(如 int、list、set 等)

使用场景:

  • 统计元素出现次数(类似 Counter)
  • 分组数据(例如按键收集值列表)
  • 动态构建嵌套数据结构(如字典中的列表或集合)

常用方法:

  • 继承了dict的所有方法,如get()、set()、update()、pop()、keys()、values()、items()等。
  • 新增了:’
    • default_factory:默认值工厂函数。
    • __missing__(key):当键不存在时,返回默认值(使用default_factory生成默认值赋值给该键)

注意:

  • default_factory 必须是可调用对象:可以是内置类型(如 int、list)、自定义函数或 lambda 表达式,但不能直接是值(例如 0 或 [])。
  • 初始化时赋值:在构造 defaultdict 时传入初始键值对,不会调用 default_factory
    dd = defaultdict(int, {'a': 5})
    print(dd['a'])  # 输出: 5
    print(dd['b'])  # 输出: 0
    
  • 与普通字典的转换:可以用 dict(dd) 将 defaultdict 转为普通字典

Demo:

In [17]: from collections import defaultdict

# 使用int作为default_factory,默认值为0
In [18]: dd = defaultdict(int)

In [19]: dd['a'] += 1

In [20]: dd['a']
Out[20]: 1

In [21]: dd['b']
Out[21]: 0


# 使用list做默认值:
In [22]: d2 = defaultdict(list)

In [23]: d2['fruits'].append('apple')

In [24]: d2['fruits'].append('banana')

In [25]: d2['veggies'].append('carrot')

In [26]: d2
Out[26]: defaultdict(list, {'fruits': ['apple', 'banana'], 'veggies': ['carrot']})

In [27]: d2['rice']
Out[27]: []


# 可以自定义默认值:
In [28]: def default_value():
    ...:     return 'N/A'
    ...:

In [29]: d3 = defaultdict(default_value)

In [30]: d3['name']
Out[30]: 'N/A'

In [31]: d3['age']
Out[31]: 'N/A'

高级使用(Advanced)

统计频率
defaultdict(int) 可以用来统计元素出现次数:

from collections import defaultdict

text = "hello world"
freq = defaultdict(int)
for char in text:
    freq[char] += 1
print(freq)  # 输出: defaultdict(<class 'int'>, {'h': 1, 'e': 1, 'l': 3, 'o': 2, ' ': 1, 'w': 1, 'r': 1, 'd': 1})

分组数据
使用 defaultdict(list) 将数据按键分组

from collections import defaultdict

data = [('fruit', 'apple'), ('veggie', 'carrot'), ('fruit', 'banana')]
groups = defaultdict(list)
for category, item in data:
    groups[category].append(item)
print(groups)  # 输出: defaultdict(<class 'list'>, {'fruit': ['apple', 'banana'], 'veggie': ['carrot']})

嵌套结构
创建嵌套的 defaultdict,用于多级数据

from collections import defaultdict

# 嵌套 defaultdict
nested_dd = defaultdict(lambda: defaultdict(int))
nested_dd['user1']['login'] += 1
nested_dd['user2']['logout'] += 1
print(nested_dd)  # 输出: defaultdict(<function <lambda> at ...>, {'user1': defaultdict(<class 'int'>, {'login': 1}), 'user2': defaultdict(<class 'int'>, {'logout': 1})})

与 ChainMap 结合
将 defaultdict 作为 ChainMap 的默认层,提供兜底值

from collections import ChainMap, defaultdict

defaults = defaultdict(lambda: "Unknown")
cm = ChainMap({'name': 'Alice'}, defaults)
print(cm['name'])      # 输出: Alice
print(cm['address'])   # 输出: Unknown

线程安全考虑
defaultdict 本身不是线程安全的,多线程环境下需要加锁

import threading
from collections import defaultdict

dd = defaultdict(int)
lock = threading.Lock()

def thread_safe_increment(key):
    with lock:
        dd[key] += 1

实现计数器替代品
可以用 defaultdict 代替 Counter,虽然功能不如 Counter 丰富,但可以实现类似的功能

from collections import defaultdict

items = ['a', 'b', 'a', 'c']
counter = defaultdict(int)
for item in items:
    counter[item] += 1
print(counter['a'])  # 输出: 2

deque

double-ended queue 双端队列(缩写 -> deque)
class collections.deque([iterable[, maxlen]])
即:可以在队列的两端进行添加和删除元素的操作。
比普通列表更高效的插入和删除,在实现栈、队列等数据结构时非常有用。

常用方法:

  • append(x)
  • appendleft(x)
  • clear()
  • copy()
  • count(x)
  • extend(iterable)
  • extendleft(iterable)
  • index(x, [start[, stop]])
  • insert(i, x)
  • pop()
  • popleft()
  • revmoe(value)
  • reverse()
  • rotate(n=1)
  • maxlen: 用于限制队列的最大长度。如果队列已满,再添加元素时,会从队列的左侧删除元素。

rotate()
用于将队列中的元素向左或向右循环移动。它的作用是将队列中的元素移动指定的步数,如果步数为正数,则向右移动,如果步数为负数,则向左移动。移动后,队列中的元素会重新排列,但不会改变队列中元素的数量。

from collections import deque
# 创建一个 deque
d = deque([1, 2, 3, 4, 5])
# 向右移动 2 步
d.rotate(2)
# 打印移动后的 deque
print(d)  # 输出:deque([4, 5, 1, 2, 3])
# 向左移动 3 步
d.rotate(-3)
# 打印移动后的 deque
print(d)  # 输出:deque([2, 3, 4, 5, 1])

Demo:

>>> from collections import deque
>>> d = deque('ghi')                 # make a new deque with three items
>>> for elem in d:                   # iterate over the deque's elements
        print(elem.upper())
G
H
I
>>> d.append('j')                    # add a new entry to the right side
>>> d.appendleft('f')                # add a new entry to the left side
>>> d                                # show the representation of the deque
deque(['f', 'g', 'h', 'i', 'j'])
>>> d.pop()                          # return and remove the rightmost item
'j'
>>> d.popleft()                      # return and remove the leftmost item
'f'
>>> list(d)                          # list the contents of the deque
['g', 'h', 'i']
>>> d[0]                             # peek at leftmost item
'g'
>>> d[-1]                            # peek at rightmost item
'i'
>>> list(reversed(d))                # list the contents of a deque in reverse
['i', 'h', 'g']
>>> 'h' in d                         # search the deque
True
>>> d.extend('jkl')                  # add multiple elements at once
d
>>> deque(['g', 'h', 'i', 'j', 'k', 'l'])
>>> d.rotate(1)                      # right rotation
>>> d
deque(['l', 'g', 'h', 'i', 'j', 'k'])
>>> d.rotate(-1)                     # left rotation
>>> d
deque(['g', 'h', 'i', 'j', 'k', 'l'])
>>> deque(reversed(d))               # make a new deque in reverse order
deque(['l', 'k', 'j', 'i', 'h', 'g'])
>>> d.clear()                        # empty the deque
>>> d.pop()                          # cannot pop from an empty deque
Traceback (most recent call last):
    File "<pyshell#6>", line 1, in -toplevel-
        d.pop()
IndexError: pop from an empty deque
>>> d.extendleft('abc')              # extendleft() reverses the input order
>>> d
deque(['c', 'b', 'a'])

一些使用:

  • 返回一个deque对象,包含了指定文件的最后n行(调用者可以使用for循环来遍历队列中的元素,并对他们进行处理)(类似于linux tail 命令)
    def tail(filename, n=10):
      'Return the last n lines of a file'
      with open(filename) as f:
          return deque(f, n)
    
  • 计算给定可迭代对象的移动平均值
    使用deque对象来存储最近n个元素,并使用sum()函数计算这些元素的总和。然后,它从可迭代对象中获取下一个元素,并将其添加到队列的右侧。接下来,它从队列的左侧删除最旧的元素,并将其从总和中减去。然后,它将新元素添加到队列的右侧,并将更新后的总和除以n,得到当前的移动平均值,并使用yield语句将其返回。
    itertools.isclie(): 从可迭代对象中获取前 n-1 个元素,并将它们添加到队列中
    def moving_average(iterable, n=3):
      # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
      # https://en.wikipedia.org/wiki/Moving_average
      it = iter(iterable)
      d = deque(itertools.islice(it, n-1))
      d.appendleft(0)
      s = sum(d)
      for elem in it:
          s += elem - d.popleft()
          d.append(elem)
          yield s / n
    
  • round-robin scheduler 轮循
    将多个可迭代对象中的元素交替返回,直到所有可迭代对象都被耗尽。这个函数的实现基于一个算法,称为“轮流取元素”(Round-robin)。在这个算法中,我们从每个可迭代对象中取出一个元素,然后再从下一个可迭代对象中取出一个元素,直到所有可迭代对象都被耗尽。 map():将可迭代对象转换为迭代器
    def roundrobin(*iterables):
      "roundrobin('ABC', 'D', 'EF') --> A D E B F C"
      iterators = deque(map(iter, iterables))
      while iterators:
          try:
              while True:
                  yield next(iterators[0])
                  iterators.rotate(-1)
          except StopIteration:
              # Remove an exhausted iterator.
              iterators.popleft()
    
  • del d[n]的双端队列实现
    def delete_nth(d, n):
      d.rotate(-n)
      d.popleft()
      d.rotate(n)
      # [1, 2, 3, 4, 5]  
      # delete_nth(5, 2)
      # [3, 4, 5, 1, 2]
      # [4, 5, 1, 2]
      # [1, 2, 4, 5]
    

高级使用(Advanced)
实现循环缓冲区(Circular Buffer):使用 deque 可以轻松地实现循环缓冲区,即一个固定大小的缓冲区,可以在其中添加和删除元素,当缓冲区已满时,新元素会覆盖最旧的元素。可以使用 deque 的 maxlen 参数来限制队列的最大长度,从而实现循环缓冲区的功能。

实现 LRU 缓存(Least Recently Used Cache):LRU 缓存是一种常用的缓存算法,用于限制缓存的大小,并自动删除最近最少使用的元素。可以使用 deque 来实现 LRU 缓存,将最近使用的元素添加到队列的右侧,将最近最少使用的元素从队列的左侧删除。

实现滑动窗口(Sliding Window):滑动窗口是一种常见的数据处理技术,用于在序列中滑动一个固定大小的窗口,并对窗口中的元素进行处理。可以使用 deque 来实现滑动窗口,将窗口中的元素存储在队列中,并使用 rotate() 方法来移动窗口。

实现线程安全的队列:deque 不是线程安全的,但可以使用 queue 模块中的 Queue 类来创建一个线程安全的队列,该类基于 deque 实现,并提供了锁机制来保证线程安全。
之所以说deque是非线程安全 → 准确说是部分线程安全,因为:append()、appendleft()、pop()、popleft() 是线程安全,剩下的操作,尤其是迭代是非线程安全。
注意:在未来,collections.deque会变成线程安全的。 --disable-gil提案。3.13版本。 demo: 如下,会出现很多错误:untimeError: deque mutated during iteration,原因是在producer线程进行append()时,是线程安全,同时consumer进行迭代,非线程安全,就会报错。 \

import threading
from collections import deque
import time


def producer(d):
    for i in range(1000):
        d.append(i)
        time.sleep(0.001)

def consumer(d):
    while True:
        try:
            for item in d:
                print(item)
                d.popleft()  # This can cause RuntimeError
        except RuntimeError:
            print("RuntimeError: deque mutated during iteration")
        time.sleep(0.001)

d = deque()
producer_thread = threading.Thread(target=producer, args=(d,))
consumer_thread = threading.Thread(target=consumer, args=(d,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()

# run:
0
RuntimeError: deque mutated during iteration
1
RuntimeError: deque mutated during iteration
2
RuntimeError: deque mutated during iteration
3
RuntimeError: deque mutated during iteration
4
RuntimeError: deque mutated during iteration
5
...
...

解决办法是:添加同步机制,如加锁,保证在同一时间,只有一个线程可以访问到deque。
如下demo,符合预期。

import threading
from collections import deque
import time


def producer(d, lock):
    for i in range(1000):
        with lock:
            d.append(i)
        time.sleep(0.001)


def consumer(d, lock):
    while True:
        try:
            with lock:
                items = list(d)  # Create a copy to iterate over
                for item in items:
                    print(item)
                    d.popleft()
        except IndexError:
            pass  # deque is empty
        time.sleep(0.001)


d = deque()
lock = threading.Lock()
producer_thread = threading.Thread(target=producer, args=(d, lock))
consumer_thread = threading.Thread(target=consumer, args=(d, lock))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()

# run:
0
1
2
3
4
5
...
...

实现优先级队列:可以使用 deque 来实现优先级队列,将元素按照优先级顺序存储在队列中。
如下是collections.deque实现的优先级序列demo,实际上有很多实现方式,这里只是展示一种使用方式。
复杂度:O(n)
insert(): 插入元素时,按照优先级顺序插入
pop(): 弹出元素时,弹出优先级最高的元素
peek(): 查看优先级最高的元素

from collections import deque

class PriorityQueue:
    def __init__(self):
        self.queue = deque()
    def is_empty(self):
        return len(self.queue) == 0
    def insert(self, priority, value):
        # Create a new item as a tuple (priority, value)
        item = (priority, value)
        # If the queue is empty, simply append the item
        if self.is_empty():
            self.queue.append(item)
        else:
            # Iterate through the queue to find the correct position for the new item
            for index, current_item in enumerate(self.queue):
                if current_item[0] > priority:
                    # Insert the new item before the current item
                    self.queue.insert(index, item)
                    break
            else:
                # If we reach the end of the queue, append the item
                self.queue.append(item)
    def pop(self):
        if self.is_empty():
            raise IndexError("pop from an empty priority queue")
        # Pop the item with the highest priority (which is at the front of the deque)
        return self.queue.popleft()
    def peek(self):
        if self.is_empty():
            raise IndexError("peek from an empty priority queue")
        # Peek at the item with the highest priority (which is at the front of the deque)
        return self.queue[0]
    def __str__(self):
        return str(self.queue)


# Example usage:
pq = PriorityQueue()
pq.insert(3, 'task 3')
pq.insert(1, 'task 1')
pq.insert(2, 'task 2')
pq.insert(1, 'task 1 again')
print("Priority Queue:", pq)
print("Peek:", pq.peek())
print("Pop:", pq.pop())
print("Priority Queue after pop:", pq)

# run:
Peek: (1, 'task 1')
Pop: (1, 'task 1')
Priority Queue after pop: deque([(1, 'task 1 again'), (2, 'task 2'), (3, 'task 3')])

还可以通过heapq来实现优先级队列,复杂度O(log n),参见heapq模块。

实现双端队列的切片操作:可以使用 deque 的 rotate() 方法和切片操作来实现双端队列的切片操作,从而实现类似列表的切片操作。例如,可以使用 rotate() 方法将队列中的元素向左或向右移动,然后使用切片操作获取需要的元素。

实现数据结构的快速删除:可以使用 deque 来实现一些数据结构的快速删除操作,例如删除链表中的中间节点、删除二叉树中的节点等。这些操作可以使用 deque 来存储需要删除的元素,并使用 popleft() 方法来从队列的左侧删除元素,从而实现快速删除。

namedtuple