-1

I tried to perform text similarity in batch, and my code works. However, when I want to speed up the calculation with multiprocessing, my Juypter notebook shows the following mistakes below. I am new to multiprocessing, and your help would be invaluable to me!

The original data looks like this:

grouped_df = pd.DataFrame({
    'name_std': ['apple', 'apple Inc', 'Color', 'color A', 'Enfficient', 'Enfficient corp', 'apple C', 'A apple Inc', ]    
    'group_id': [1, 1, 2, 2, 3, 3, 4, 4],    
})

Within each group, the texts are similar. Now I want to loop through all the groups and cluster the groups with similarity higher than 90%.

The below code works.

def calculate_similarity(name1, name2):
    return jaro_winkler_similarity(name1, name2)

# Function to create cross pairs with similarity check
def create_cross_pairs(df, batch_size, threshold=0.90, output_file='D:\\Dropbox\\cross_pairs.csv'):
    group_ids = df['group_id'].unique()
    
    # Create directory if it doesn't exist
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    
    # If the file doesn't exist, create it and write the header
    if not os.path.exists(output_file):
        with open(output_file, 'w', newline='', encoding='utf-8') as file:
            writer = csv.writer(file)
            writer.writerow(['group1', 'group2'])
    
    # Process in batches
    for start in tqdm(range(0, len(group_ids), batch_size), desc="Processing group batches"):
        batch1 = group_ids[start:start + batch_size]
        batch_cross_pairs = []  # Store cross pairs for the current batch

        for i, group1 in enumerate(batch1):
            for group2 in group_ids[start + i + 1:]:
                names1 = df[df['group_id'] == group1]['name_std'].tolist()
                names2 = df[df['group_id'] == group2]['name_std'].tolist()
                
                all_pairs_similar = True
                for name1 in names1:
                    for name2 in names2:
                        score = calculate_similarity(name1, name2)
                        if score < threshold:
                            all_pairs_similar = False
                            break
                    if not all_pairs_similar:
                        break
                
                if all_pairs_similar:
                    batch_cross_pairs.append((group1, group2))
        
        # Append current batch results to file
        with open(output_file, 'a', newline='', encoding='utf-8') as file:
            writer = csv.writer(file)
            writer.writerows(batch_cross_pairs)

# Calculate cross pairs in batches
batch_size = 1  # Adjust batch size based on memory size
create_cross_pairs(grouped_df1, batch_size)

The below code is my attempt to use multiprocessing; it does not work:

from concurrent.futures import ProcessPoolExecutor, as_completed

def calculate_similarity(name1, name2):
    return jaro_winkler_similarity(str(name1), str(name2))

def process_batch(start, batch_size, group_ids, df_dict, threshold, temp_dir):
    try:
        batch1 = group_ids[start:start + batch_size]
        batch_cross_pairs = []

        for i, group1 in enumerate(batch1):
            for group2 in group_ids[start + i + 1:]:
                names1 = df_dict[group1]
                names2 = df_dict[group2]

                all_pairs_similar = True
                for name1 in names1:
                    for name2 in names2:
                        score = calculate_similarity(name1, name2)
                        if score < threshold:
                            all_pairs_similar = False
                            break
                    if not all_pairs_similar:
                        break

                if all_pairs_similar:
                    batch_cross_pairs.append((group1, group2))

        temp_file = os.path.join(temp_dir, f'batch_{start}.csv')
        with open(temp_file, 'w', newline='', encoding='utf-8') as file:
            writer = csv.writer(file)
            writer.writerows(batch_cross_pairs)
    except Exception as e:
        print(f"Batch starting at {start} failed with exception: {e}")
        raise

def create_cross_pairs(df, batch_size, threshold=0.90, output_file='D:\\Dropbox\\cross_pairs.csv'):
    group_ids = df['group_id'].unique().tolist()
    df_dict = df.groupby('group_id')['name_std'].apply(list).to_dict()

    # Create directory if it doesn't exist
    os.makedirs(os.path.dirname(output_file), exist_ok=True)

    # If the file doesn't exist, create it and write the header
    if not os.path.exists(output_file):
        with open(output_file, 'w', newline='', encoding='utf-8') as file:
            writer = csv.writer(file)
            writer.writerow(['group1', 'group2'])

    # Create a temporary directory for storing intermediate results
    temp_dir = 'D:\\Dropbox\\cross_temp'
    os.makedirs(temp_dir, exist_ok=True)

    # Process in batches using ProcessPoolExecutor
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = [
            executor.submit(process_batch, start, batch_size, group_ids, df_dict, threshold, temp_dir)
            for start in range(0, len(group_ids), batch_size)
        ]

        for future in tqdm(as_completed(futures), total=len(futures), desc="Processing group batches"):
            try:
                future.result()  # Ensure each future is completed
            except Exception as e:
                print(f"Future failed with exception: {e}")

    # Combine all temporary files into the final output file
    with open(output_file, 'a', newline='', encoding='utf-8') as outfile:
        writer = csv.writer(outfile)
        for temp_file in sorted(os.listdir(temp_dir)):
            temp_file_path = os.path.join(temp_dir, temp_file)
            with open(temp_file_path, 'r', encoding='utf-8') as infile:
                reader = csv.reader(infile)
                writer.writerows(reader)

# Calculate cross pairs in batches
batch_size = 1  # Adjust batch size based on memory size
create_cross_pairs(grouped_df1, batch_size)

The errors are listed below:

Processing group batches:   4%|▎         | 1/27 [00:00<00:03,  7.18it/s]
Traceback (most recent call last):
  File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\queues.py", line 246, in _feed
    send_bytes(obj)
  File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\connection.py", line 184, in send_bytes
    self._check_closed()
  File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\connection.py", line 137, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
Traceback (most recent call last):
  File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\queues.py", line 246, in _feed
    send_bytes(obj)
  File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\connection.py", line 184, in send_bytes
    self._check_closed()
  File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\connection.py", line 137, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
Processing group batches: 100%|██████████| 27/27 [00:00<00:00, 166.81it/s]
Traceback (most recent call last):
  File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\queues.py", line 246, in _feed
    send_bytes(obj)
  File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\connection.py", line 184, in send_bytes
    self._check_closed()
  File "C:\Users\AppData\Local\anaconda3\Lib\multiprocessing\connection.py", line 137, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
7
  • There's a lot of unrelated code here. Can you please edit to provide a minimal reproducible example?
    – tripleee
    Commented Jul 2 at 5:59
  • It might be an issue related to your Python version that may ship a buggy ProcessPoolExecutor (github.com/python/cpython/issues/80462). It seems to have been fixed from Python 3.9 onwards.
    – michaeldel
    Commented Jul 2 at 6:02
  • You don't seem to have accepted any answers to your previous questions. Awarding the accept mark is one of the important rewards you can return to the people who help you here. Also, accepting an answer helps future visitors by marking your problem as resolved. See also help.
    – tripleee
    Commented Jul 2 at 6:02
  • Have you tried to eliminate Jupyter?
    – SIGHUP
    Commented Jul 2 at 6:20
  • I provide a data example. My python is 3.11.9. This code does not work.
    – leilei
    Commented Jul 2 at 6:31

0

Browse other questions tagged or ask your own question.