fuzzyset源码解读

起因

最近懒癌犯的有点长,学了一些东西正好再系统梳理下。

之前需要对大量字符串进行模糊匹配,网上查了查发现fuzzyset第三方库刚好满足我的需求,而且原理简单(核心代码只有100行不到!)。但是由于业务自身需求,需要对源码进行一定的修改,这里记录下对源码的学习,方便自己以后查阅。

背景知识

Q: 如何度量两个字符串之间的相似度或者差异程度?
Levenshtein距离便是一种方法,又叫编辑距离(Edit Distance),是指两个字符串之间,通过替换插入删除的方式(每次仅限编辑一个字符),从一个转换成另一个所需的最小编辑次数。

如将abcd一字转成acdb可以有很多种方法:

abcd->accd->acdd->acdb
abcd->acd->acdb

最小次数显然是第二种,所以从abcd转成acdb的Levenshtein距离就是2,具体如何求解可通过动态规划的思想来完成,这里不深入下去。

fuzzyset算法思想

通过n-grams模型把字符串简单拆分,计算两个字符串之间的余弦相似度。若选择use_levenshtein(需要手动安装python-Levenshtein 包),则在余弦相似度的基础上再通过Levenshtein距离计算出一个新的得分,取得分最高的字符串输出(具体细节实现与技巧优化见源码分析)

源码解析

1. 切分字符串

给定一个英文单词(string),是如何切分的?作者给了例子说明,比如有个字符串michaelich,我们设置gram_size=3,则切分成如下形式:

‘-mi’
‘mic’
‘ich’
‘cha’
‘hae’
‘ael’
‘eli’
‘lic’
‘ich’
‘ch-‘

对应代码也容易懂,同时为了节省内存,产生了一个生成器。

1
2
3
4
5
6
7
def _iterate_grams(value, gram_size=2):
simplified = '-' + _non_word_re.sub('', value.lower()) + '-'
len_diff = gram_size - len(simplified)
if len_diff > 0:
simplified += '-' * len_diff
for i in range(len(simplified) - gram_size + 1):
yield simplified[i:i + gram_size]

接着,作者对每个被切分出来的最小单元进行词频统计(text='a-CAc9f'),形式如下:

{‘-a’: 1, ‘ac’: 2, ‘ca’: 1, ‘c9’: 1, ‘9f’: 1, ‘f-‘: 1}

代码easy

1
2
3
4
5
def _gram_counter(value, gram_size=2):
result = collections.defaultdict(int)
for value in _iterate_grams(value, gram_size):
result[value] += 1
return result

2. 添加语料库(字符串)

当我输入一个英文单词(str),内部发生了什么?来看代码

1
2
3
4
5
6
7
8
9
10
11
def __init__(self, iterable=(), gram_size_lower=2, gram_size_upper=3, use_levenshtein=True):
self.exact_set = {}
self.match_dict = collections.defaultdict(list)
self.items = {}
self.use_levenshtein = use_levenshtein
self.gram_size_lower = gram_size_lower
self.gram_size_upper = gram_size_upper
for i in range(gram_size_lower, gram_size_upper + 1):
self.items[i] = []
for value in iterable:
self.add(value)

这里只是初始化,那我们具体看看add()__add()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def add(self, value):
lvalue = value.lower()
if lvalue in self.exact_set:
return False
for i in range(self.gram_size_lower, self.gram_size_upper + 1):
self.__add(value, i)
def __add(self, value, gram_size):
lvalue = value.lower()
items = self.items[gram_size] #这里要注意:items的改动会传递给 self.items
idx = len(items)
items.append(0)
grams = _gram_counter(lvalue, gram_size)
norm = math.sqrt(sum(x**2 for x in grams.values()))
for gram, occ in grams.items():
self.match_dict[gram].append((idx, occ))
items[idx] = (norm, lvalue)
self.exact_set[lvalue] = value

如果默认n-grams是从2到3的情况下:

  • self.items会存放小写的英文字符串(输入)与对应被2-grams3-grams切分后的模,形式如下:

    { 2: [(3.0 , ‘a-cac9f’) , (., .) , (., .)],
    3: [(2.449489742783178 , ‘a-cac9f’) , (. , .), (. , .)]}

  • self.match_dict形式如下:

    { ‘-a’: [(0, 1)],
    ‘ac’: [(0, 2)], #注意:若之后另一个单词被同样切出’ac’,这里可能为 ‘ac’:[(0,2), (1,1)]
    ‘ca’: [(0, 1)],
    ‘c9’: [(0, 1)],
    ‘9f’: [(0, 1)],
    ‘f-‘: [(0, 1)],
    ‘-ac’: [(0, 1)],
    ‘aca’: [(0, 1)],
    ‘cac’: [(0, 1)],
    ‘ac9’: [(0, 1)],
    ‘c9f’: [(0, 1)],
    ‘9f-‘: [(0, 1)]}

  • self.exact_set为原输入的英文字符串与小写之后的字符串形式,如:

    {‘a-cac9f’: ‘a-CAc9f’}

3. 模糊匹配

我们来看一下是如何对目标字符串进行模糊匹配的,主要关注__get()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def _distance(str1, str2):
distance = Levenshtein.distance(str1, str2)
if len(str1) > len(str2):
return 1 - float(distance) / len(str1)
else:
return 1 - float(distance) / len(str2)
def __get(self, value, gram_size):
lvalue = value.lower()
matches = collections.defaultdict(float)
grams = _gram_counter(lvalue, gram_size)
items = self.items[gram_size]
norm = math.sqrt(sum(x**2 for x in grams.values()))
for gram, occ in grams.items(): #(切分最小单元,出现次数)
for idx, other_occ in self.match_dict.get(gram, ()): #属于第几个输入字符串,出现次数
matches[idx] += occ * other_occ
if not matches:
return None
# cosine similarity
results = [(match_score / (norm * items[idx][0]), items[idx][1])
for idx, match_score in matches.items()]
results.sort(reverse=True, key=operator.itemgetter(0))
if self.use_levenshtein:
results = [(_distance(matched, lvalue), matched)
for _, matched in results[:50]]
results.sort(reverse=True, key=operator.itemgetter(0))
return [(score, self.exact_set[lval]) for score, lval in results
if score == results[0][0]]

先对目标字符串进行同样的切分方式,再分别与每个输入字符串进行“点乘”,并计算出余弦相似度。

我们看到,若不使用Levenshtein距离,最后输出的是余弦相似度取值最大的一组或多组字符串。若使用Levenshtein距离,首先为了减少运算量,只取余弦相似度排名前50个字符串,再然后对其计算Levenshtein距离,并通过简单的运算得出另一个分值,最后输出得分最高的一组或多组字符串。

后续的源码不难,但还是一并贴出来比较方便看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def get(self, key, default=None):
try:
return self[key] # 调用 __getitem__
except KeyError:
return default
def __getitem__(self, value):
lvalue = value.lower()
result = self.exact_set.get(lvalue) #若完全一样,返回(1,原词语)
if result:
return [(1, result)]
for i in range(self.gram_size_upper, self.gram_size_lower - 1, -1):
results = self.__get(value, i)
if results is not None:
return results
raise KeyError(value)

参考资料

官方github