package rbccm.felix.objectdb.test; import java.util.ArrayList; import java.util.Random; import java.util.UUID; import javax.persistence.Entity; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.persistence.Id; import javax.persistence.LockModeType; import javax.persistence.LockTimeoutException; import javax.persistence.NoResultException; import javax.persistence.Persistence; import javax.persistence.Query; import junit.framework.Assert; import net.sourceforge.groboutils.junit.v1.MultiThreadedTestRunner; import net.sourceforge.groboutils.junit.v1.TestRunnable; import org.junit.Test; public class TestMultiLock { @Test public void testMultiThreadedEntryPipe() { System.out.println("Starting..."); EntityManagerFactory emf = Persistence.createEntityManagerFactory("$objectdb/db/testMultiThreadedEntryPipe.odb"); registerTypes(emf); try { int msgs = 1000; ArrayList runnables = new ArrayList(); ObjectDbQueue queue = new ObjectDbQueue(emf); for(int i=1; i<6; i++) runnables.add(new ObjectDbProducer("p" + i, queue, msgs)); for(int i=1; i<6; i++) runnables.add(new ObjectDbConsumer("c" + i, queue, msgs)); TestRunnable[] trs = runnables.toArray(new TestRunnable[runnables.size()]); MultiThreadedTestRunner mttr = new MultiThreadedTestRunner(trs); long start = System.currentTimeMillis(); mttr.runTestRunnables(); long runTime = System.currentTimeMillis() - start; System.out.println(String.format("Total elapsed query time %sms", runTime)); System.out.println("done"); } catch (Throwable e) { e.printStackTrace(); Assert.fail(); } finally { emf.close(); } } private void registerTypes(EntityManagerFactory emf) { EntityManager em = emf.createEntityManager(); em.find(IdPoint.class, IdPoint.class); em.close(); } static final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; static Random rnd = new Random(); String randomString( int len ) { StringBuilder sb = new StringBuilder( len ); for( int i = 0; i < len; i++ ) sb.append( AB.charAt( rnd.nextInt(AB.length()) ) ); return sb.toString(); } private class ObjectDbQueue { private EntityManagerFactory _emf; public ObjectDbQueue(EntityManagerFactory emf) { _emf = emf; } public synchronized void put(IdPoint p) { EntityManager em = _emf.createEntityManager(); em.getTransaction().begin(); try { em.persist(p); em.getTransaction().commit(); } catch(Exception e) { if(em.getTransaction().isActive()) em.getTransaction().rollback(); throw new RuntimeException("Error putting message on objectdb queue", e); } finally { em.close(); } } public synchronized IdPoint take() { EntityManager em = _emf.createEntityManager(); em.getTransaction().begin(); try { Query q = em.createQuery("SELECT p FROM IdPoint p"); IdPoint p = (IdPoint)q.setMaxResults(1).setLockMode(LockModeType.PESSIMISTIC_WRITE).getSingleResult(); em.remove(p); em.getTransaction().commit(); return p; } catch(NoResultException nre) { //the queue is currently empty System.out.println("No point"); return null; } catch(LockTimeoutException lte) { //another consumer is competing for the same object System.out.println("Locked point"); return null; } catch(Exception e) { throw new RuntimeException("Error taking message from ObjectDB queue", e); } finally { if(em.getTransaction().isActive()) em.getTransaction().rollback(); em.close(); _emf.getCache().evictAll(); } } } private class ObjectDbProducer extends TestRunnable { private String _name; private ObjectDbQueue _queue; private int _num; private int _payloadSize = 1000000; //should create 2MB-ish string private String _payload; public ObjectDbProducer(String name, ObjectDbQueue queue, int num) { _name = name; _queue = queue; _num = num; _payload = randomString(_payloadSize); } @Override public void runTest() throws Throwable { for(int i=0; i<_num; i++) { System.out.println(String.format("Producer %s putting point %s", _name, i)); IdPoint p = new IdPoint(i, i+1, _payload); _queue.put(p); } System.out.println(String.format("Producer %s completed putting points", _name)); } } private class ObjectDbConsumer extends TestRunnable { private String _name; private ObjectDbQueue _queue; private int _num; public ObjectDbConsumer(String name, ObjectDbQueue queue, int num) { _name = name; _queue = queue; _num = num; } @Override public void runTest() throws Throwable { for(int i=0; i<_num; i++) { IdPoint p = null; while(p == null) { p = _queue.take(); if(p == null) Thread.sleep(1000); else System.out.println(String.format("Consumer %s taking point %s - %s", _name, i, p.toString())); } } System.out.println(String.format("Consumer %s completed taking point", _name)); } } @Entity public static class IdPoint { @Id private String id; private int x; private int y; private Object payload; IdPoint(int x, int y) { this.id = UUID.randomUUID().toString(); this.x = x; this.y = y; } IdPoint(int x, int y, Object payload) { this.id = UUID.randomUUID().toString(); this.x = x; this.y = y; this.payload = payload; } public int getX() { return x; } public int getY() { return y; } public Object getPayload() { return payload; } @Override public String toString() { return String.format("%s - (%d, %d)", this.id, this.x, this.y); } } }