Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignite semaphore does not perform well at high concurrency. The API contract of tryAcquire is also violated and the method does not return within the supplied timeout . Ignite 2.16.0 #11421

Open
viphadke opened this issue Jul 5, 2024 · 1 comment

Comments

@viphadke
Copy link

viphadke commented Jul 5, 2024

We had need to use a distributed semaphore to control access to certain resources. eg. we need to limit number of concurrent downloads / uploads etc. We recently switched to a ignite semaphore because we like the property that the permits are release if a connection is lost. Our prior solution did not have this property and permits were lost when out server went down.
However, the problem with the ignite semaphore was that it does not work well when 100 threads hit it at a time. We also tried the tryAcquire(1, 100, TimeUnit.MILLISECONDS). We noticed that it started waiting for 10, 15, 20 secs (whether permits are available or not). When the number of threads is low the API works as expected.

I have provided a sample program which simulates the problem. The P95 on my system was around 6 sec. When i am expecting the method to return in 100ms all the time. Our production setup is usually a 3 node ingite nodes. 3 zookeeper nodes and 3 platform nodes (our tomcat process) . However to simplify i am running zookeeper and ignite on the same node.

@viphadke
Copy link
Author

viphadke commented Jul 5, 2024

Here is the source

package ignitetest;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

import javax.cache.Cache;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;

import com.thingworx.cache.values.Tuple2;

public class IgniteSemaphoreTest {

private static final int THREAD_POOL_SIZE = 100 ;

private static IgniteSemaphore semaphore = null ;

/**
 * This method can be used instead of semaphore.tryIgnite 
 * This improves the performance drastically. 
 * @param permits
 * @param timeout
 * @param tu
 * @return
 */
private static synchronized boolean tryAcquire(int permits, int timeout, TimeUnit tu) {
    return semaphore.tryAcquire(permits, timeout, tu);
}


public static void main(String[] args) throws Exception {
    // Preparing IgniteConfiguration using Java APIs
    IgniteConfiguration cfg = new IgniteConfiguration();

    // The node will be started as a client node.
    cfg.setClientMode(true);
    cfg.setPeerClassLoadingEnabled(true);
    cfg.setDeploymentMode(DeploymentMode.SHARED);
    // Classes of custom Java logic will be transferred over the wire from this app.
    CacheConfiguration<String, SomeFancyClass> cacheConfig = new CacheConfiguration<String, SomeFancyClass>("processortest");
    cacheConfig.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
    cfg.setCacheConfiguration(cacheConfig);
    // Setting up an IP Finder to ensure the client can locate the servers.
    TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
    ipFinder.setAddresses(Collections.singletonList("127.0.0.1:47500"));
    cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));

    // Starting the node
    Ignite ignite = Ignition.start(cfg);

    semaphore = ignite.semaphore("mySema12", // Distributed semaphore name.
        100, // Number of permits.
        true, // Release acquired permits if node, that owned them, left topology.
        true // Create if it doesn't exist.
    );

    ExecutorService ser = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
    int JOB_COUNT = 1000 ;
    long [] time_taken = new long[JOB_COUNT];
    for (int i = 0; i < JOB_COUNT; i++) {
        final int index= i ;
        ser.submit(() -> {
            boolean acquired = false;
            try {
                long tm1 = System.currentTimeMillis();
                acquired = semaphore.tryAcquire(1, 100, TimeUnit.MILLISECONDS);                        
                long tm2 = System.currentTimeMillis();
                long diff = tm2 - tm1;
                if (acquired) {
                    System.out.println("Acquired in " + diff);
                } else {
                    System.out.println("Not acquired in " + diff);
                }
                Thread.sleep((int)(Math.random()*1000));
                time_taken[index] = diff ;
            } catch (Exception e) {

            } finally {
                if (acquired) {
                    semaphore.release();
                }
            }

        });

    }

    ser.shutdown();

    while (!ser.awaitTermination(1, TimeUnit.SECONDS)) {
        // nothing to do
    }

    Arrays.sort(time_taken);
    System.out.println("P95 time = "+ percentile(time_taken, 95));
    System.out.println("semaphore released");

    System.out.println(">> Compute task is executed, check for output on the server nodes.");
    try {
        Thread.sleep(60000);
    } catch (Exception e) {
        e.printStackTrace();
    }
    // Disconnect from the cluster.
    ignite.close();
}

public static long percentile(long [] latencies, double percentile) {
    int index = (int) Math.ceil(percentile / 100.0 * latencies.length);
    return latencies[index];
}

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
1 participant