I tried to perform text similarity in batch, and my code works. However, when I want to speed up the calculation with multiprocessing, my Juypter notebook shows the following mistakes below. I am new to multiprocessing, and your help would be invaluable to me!
The original data looks like this:
grouped_df = pd.DataFrame({
'name_std': ['apple', 'apple Inc', 'Color', 'color A', 'Enfficient', 'Enfficient corp', 'apple C', 'A apple Inc', ]
'group_id': [1, 1, 2, 2, 3, 3, 4, 4],
})
Within each group, the texts are similar. Now I want to loop through all the groups and cluster the groups with similarity higher than 90%.
The below code works.
def calculate_similarity(name1, name2):
return jaro_winkler_similarity(name1, name2)
# Function to create cross pairs with similarity check
def create_cross_pairs(df, batch_size, threshold=0.90, output_file='D:\\Dropbox\\cross_pairs.csv'):
group_ids = df['group_id'].unique()
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(output_file), exist_ok=True)
# If the file doesn't exist, create it and write the header
if not os.path.exists(output_file):
with open(output_file, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(['group1', 'group2'])
# Process in batches
for start in tqdm(range(0, len(group_ids), batch_size), desc="Processing group batches"):
batch1 = group_ids[start:start + batch_size]
batch_cross_pairs = [] # Store cross pairs for the current batch
for i, group1 in enumerate(batch1):
for group2 in group_ids[start + i + 1:]:
names1 = df[df['group_id'] == group1]['name_std'].tolist()
names2 = df[df['group_id'] == group2]['name_std'].tolist()
all_pairs_similar = True
for name1 in names1:
for name2 in names2:
score = calculate_similarity(name1, name2)
if score < threshold:
all_pairs_similar = False
break
if not all_pairs_similar:
break
if all_pairs_similar:
batch_cross_pairs.append((group1, group2))
# Append current batch results to file
with open(output_file, 'a', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerows(batch_cross_pairs)
# Calculate cross pairs in batches
batch_size = 1 # Adjust batch size based on memory size
create_cross_pairs(grouped_df1, batch_size)
The below code is my attempt to use multiprocessing
; it does not work:
from concurrent.futures import ProcessPoolExecutor, as_completed
def calculate_similarity(name1, name2):
return jaro_winkler_similarity(str(name1), str(name2))
def process_batch(start, batch_size, group_ids, df_dict, threshold, temp_dir):
try:
batch1 = group_ids[start:start + batch_size]
batch_cross_pairs = []
for i, group1 in enumerate(batch1):
for group2 in group_ids[start + i + 1:]:
names1 = df_dict[group1]
names2 = df_dict[group2]
all_pairs_similar = True
for name1 in names1:
for name2 in names2:
score = calculate_similarity(name1, name2)
if score < threshold:
all_pairs_similar = False
break
if not all_pairs_similar:
break
if all_pairs_similar:
batch_cross_pairs.append((group1, group2))
temp_file = os.path.join(temp_dir, f'batch_{start}.csv')
with open(temp_file, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerows(batch_cross_pairs)
except Exception as e:
print(f"Batch starting at {start} failed with exception: {e}")
raise
def create_cross_pairs(df, batch_size, threshold=0.90, output_file='D:\\Dropbox\\cross_pairs.csv'):
group_ids = df['group_id'].unique().tolist()
df_dict = df.groupby('group_id')['name_std'].apply(list).to_dict()
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(output_file), exist_ok=True)
# If the file doesn't exist, create it and write the header
if not os.path.exists(output_file):
with open(output_file, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(['group1', 'group2'])
# Create a temporary directory for storing intermediate results
temp_dir = 'D:\\Dropbox\\cross_temp'
os.makedirs(temp_dir, exist_ok=True)
# Process in batches using ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(process_batch, start, batch_size, group_ids, df_dict, threshold, temp_dir)
for start in range(0, len(group_ids), batch_size)
]
for future in tqdm(as_completed(futures), total=len(futures), desc="Processing group batches"):
try:
future.result() # Ensure each future is completed
except Exception as e:
print(f"Future failed with exception: {e}")
# Combine all temporary files into the final output file
with open(output_file, 'a', newline='', encoding='utf-8') as outfile:
writer = csv.writer(outfile)
for temp_file in sorted(os.listdir(temp_dir)):
temp_file_path = os.path.join(temp_dir, temp_file)
with open(temp_file_path, 'r', encoding='utf-8') as infile:
reader = csv.reader(infile)
writer.writerows(reader)
# Calculate cross pairs in batches
batch_size = 1 # Adjust batch size based on memory size
create_cross_pairs(grouped_df1, batch_size)
The errors are listed below:
Processing group batches: 4%|▎ | 1/27 [00:00<00:03, 7.18it/s]
Traceback (most recent call last):
File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\queues.py", line 246, in _feed
send_bytes(obj)
File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\connection.py", line 184, in send_bytes
self._check_closed()
File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\connection.py", line 137, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
Traceback (most recent call last):
File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\queues.py", line 246, in _feed
send_bytes(obj)
File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\connection.py", line 184, in send_bytes
self._check_closed()
File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\connection.py", line 137, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
Processing group batches: 100%|██████████| 27/27 [00:00<00:00, 166.81it/s]
Traceback (most recent call last):
File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\queues.py", line 246, in _feed
send_bytes(obj)
File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\connection.py", line 184, in send_bytes
self._check_closed()
File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\connection.py", line 137, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
ProcessPoolExecutor
(github.com/python/cpython/issues/80462). It seems to have been fixed from Python 3.9 onwards.