kse-01/search-data.py

218 lines
7.5 KiB
Python

import argparse
import logging
import os
import re
import typing
from collections import defaultdict
from dataclasses import dataclass
from typing import Optional
import coloredlogs
import nltk
import numpy as np
import pandas as pd
from gensim.corpora import Dictionary
from gensim.models import TfidfModel, LsiModel
from gensim.models.doc2vec import TaggedDocument, Doc2Vec
from gensim.similarities import SparseMatrixSimilarity
from nltk.corpus import stopwords
nltk.download('stopwords', quiet=True)
SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__))
IN_DATASET = os.path.join(SCRIPT_DIR, "data.csv")
DOC2VEC_MODEL = os.path.join(SCRIPT_DIR, "doc2vec_model.dat")
# using nltk stop words and example words for now
STOP_WORDS = set(stopwords.words('english')) \
.union(['test', 'tests', 'main', 'this', 'self', 'int', 'get', 'set', 'new', 'return', 'list'])
def find_all(regex: str, word: str, lower=True) -> list[str]:
matches = re.finditer(regex, word)
return [m.group(0).lower() if lower else m.group(0) for m in matches]
# https://stackoverflow.com/a/29920015
def camel_case_split(word: str) -> list[str]:
return find_all('.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)', word)
def identifier_split(identifier: str) -> list[str]:
return [y for x in identifier.split("_") for y in camel_case_split(x)]
def comment_split(comment: Optional[float | str], is_comment=True) -> list[str]:
if (type(comment) == float and np.isnan(comment)) or comment is None:
return []
# Consider only first line of each comment. Increases performance significantly
if is_comment:
comment = str(comment).split("\n", maxsplit=2)[0]
# Camel case split within "words" found takes care of referenced type names in the docstring comment
return [s for word in find_all('[A-Za-z]+', comment, lower=False) for s in camel_case_split(word)]
def remove_stopwords(input_bow_list: list[str]) -> list[str]:
return [word for word in input_bow_list if word not in STOP_WORDS and len(word) > 2]
def get_bow(data: Optional[float | str], split_f) -> list[str]:
if data is None or (type(data) == float and np.isnan(data)):
return []
return remove_stopwords(split_f(data))
def pick_most_similar(corpus, query, dictionary) -> list[tuple[int, float]]:
index = SparseMatrixSimilarity(corpus, num_features=len(dictionary))
sims = index[query]
pick_top = 5
return sorted(enumerate(sims), key=lambda x: x[1], reverse=True)[:pick_top]
def print_results(indexes_scores: list[tuple[int, float]], df):
print("\n===== RESULTS: =====")
for idx, score in indexes_scores:
row = df.loc[idx]
comment = row["comment"]
if type(comment) != str:
desc = ""
else:
comment = re.sub(re.compile(r'[\s\n]+', re.MULTILINE), ' ', comment)
desc = "Description: {c}\n".format(c=comment)
desc = (desc[:75] + '...\n') if len(desc) > 75 else desc
print("\nSimilarity: {s:2.02f}%".format(s=score * 100))
print("Python {feat}: {name}\n{desc}File: {file}\nLine: {line}"
.format(feat=row["type"], name=row["name"], desc=desc, file=row["file"], line=row["line"]))
def train_doc2vec(corpus_list):
dvdocs = [TaggedDocument(text, [i]) for i, text in enumerate(corpus_list)]
model = Doc2Vec(vector_size=300, epochs=50, sample=0)
model.build_vocab(dvdocs)
model.train(dvdocs, total_examples=model.corpus_count, epochs=model.epochs)
model.save(DOC2VEC_MODEL)
return model
def load_data(print_frequent=False) -> pd.DataFrame:
df = pd.read_csv(IN_DATASET, index_col=0)
df["name_bow"] = df["name"].apply(lambda n: get_bow(n, identifier_split))
df["comment_bow"] = df["comment"].apply(lambda c: get_bow(c, comment_split))
if print_frequent:
freq = defaultdict(int)
for bow in df["name_bow"].tolist():
for i in bow:
freq[i] += 1
for bow in df["comment_bow"].tolist():
for i in bow:
freq[i] += 1
for key, value in sorted(freq.items(), key=lambda k: k[1], reverse=True)[:100]:
print(f"{value}: {key}")
return df
SparseVector = list[tuple[int, float]]
DenseVector = np.array
def to_dense(vector: SparseVector) -> DenseVector:
dense = [0.0] * len(vector)
for idx, value in vector:
dense[idx] = value
return np.array(dense)
@dataclass
class SearchResults:
indexes_scores: list[tuple[int, float]]
vectors: Optional[list[DenseVector]]
query_vector: Optional[DenseVector]
def __init__(self, indexes_values: list[tuple[int, float]], vectors: Optional[list[DenseVector]],
query_vector: Optional[DenseVector]):
self.indexes_scores = indexes_values
self.vectors = vectors
self.query_vector = query_vector
def search(query: str, method: str, df: pd.DataFrame) -> SearchResults:
corpus_list = []
for _, row in df.iterrows():
document_words = row["name_bow"] + row["comment_bow"]
corpus_list.append(document_words)
query_w = comment_split(query, is_comment=False)
dictionary = None
corpus_bow = None
query_bow = None
if method != "doc2vec":
dictionary = Dictionary(corpus_list)
corpus_bow = [dictionary.doc2bow(text) for text in corpus_list]
query_bow = dictionary.doc2bow(query_w)
if method == "tfidf":
tfidf = TfidfModel(corpus_bow)
return SearchResults(pick_most_similar(tfidf[corpus_bow], tfidf[query_bow], dictionary), None, None)
elif method == "freq":
return SearchResults(pick_most_similar(corpus_bow, query_bow, dictionary), None, None)
elif method == "lsi":
lsi = LsiModel(corpus_bow, num_topics=50)
corpus = typing.cast(list[SparseVector], lsi[corpus_bow])
results = pick_most_similar(corpus, lsi[query_bow], dictionary)
result_vectors: list[DenseVector] = [to_dense(corpus[idx]) for idx, _ in results]
return SearchResults(results, result_vectors, to_dense(lsi[query_bow]))
elif method == "doc2vec":
if os.path.exists(DOC2VEC_MODEL):
model = Doc2Vec.load(DOC2VEC_MODEL)
else:
model = train_doc2vec(corpus_list)
dv_query = model.infer_vector(query_w)
results = model.dv.most_similar([dv_query], topn=5)
result_vectors = [model.infer_vector(corpus_list[idx]) for idx, _ in results]
return SearchResults(results, result_vectors, dv_query)
else:
raise ValueError("method unknown")
def main():
methods = ["tfidf", "freq", "lsi", "doc2vec"]
parser = argparse.ArgumentParser()
parser.add_argument("method", help="the method to compare similarities with", type=str,
choices=methods + ["all"])
parser.add_argument("query", help="the query to search the corpus with", type=str)
parser.add_argument("-v", "--verbose", help="enable verbose logging", action='store_true')
args = parser.parse_args()
if args.verbose:
coloredlogs.install()
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
df = load_data()
if args.method == "all":
for method in methods:
print(f"Applying method {method}:")
results = search(args.query, method, df)
print_results(results.indexes_scores, df)
print()
else:
results = search(args.query, args.method, df)
print_results(results.indexes_scores, df)
if __name__ == "__main__":
main()