package rbccm.felix.objectdb.test; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.Random; import java.util.UUID; import javax.jdo.annotations.Index; import javax.persistence.Entity; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.persistence.Id; import javax.persistence.LockModeType; import javax.persistence.LockTimeoutException; import javax.persistence.NoResultException; import javax.persistence.Persistence; import javax.persistence.Query; import junit.framework.Assert; import net.sourceforge.groboutils.junit.v1.MultiThreadedTestRunner; import net.sourceforge.groboutils.junit.v1.TestRunnable; import org.junit.Test; import rbccm.felix.framework.ApplicationException; import rbccm.felix.objectdb.messaging.ObjectDbMessage; import rbccm.felix.objectdb.messaging.ObjectDbMessagePipe; public class TestMultiLock { private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); private static void log(String msg) { System.out.println(String.format("%s %s", format.format(new Date()), msg)); } @Test public void testMultiThreadedEntryPipe() { log("Starting..."); ObjectDbQueue queue = new ObjectDbQueue("$objectdb/db", "testMultiThreadedEntryPipe", true); try { int msgs = 1000; ArrayList runnables = new ArrayList(); int payloadSize = 1000000; //should create 2MB-ish string for(int i=1; i<6; i++) runnables.add(new ObjectDbProducer("p" + i, queue, msgs, payloadSize)); for(int i=1; i<6; i++) runnables.add(new ObjectDbConsumer("c" + i, queue, msgs)); TestRunnable[] trs = runnables.toArray(new TestRunnable[runnables.size()]); MultiThreadedTestRunner mttr = new MultiThreadedTestRunner(trs); long start = System.currentTimeMillis(); mttr.runTestRunnables(); long runTime = System.currentTimeMillis() - start; log(String.format("Total elapsed query time %sms", runTime)); log("done"); } catch (Throwable e) { e.printStackTrace(); Assert.fail(); } finally { queue.close(); } } static final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; static Random rnd = new Random(); String randomString( int len ) { StringBuilder sb = new StringBuilder( len ); for( int i = 0; i < len; i++ ) sb.append( AB.charAt( rnd.nextInt(AB.length()) ) ); return sb.toString(); } private class ObjectDbQueue { private EntityManagerFactory _queueEmf; private EntityManagerFactory _wipEmf; private boolean _useWip; public ObjectDbQueue(String url, String name, boolean useWip) { String queue = String.format("%s/%s.odb", url, name); String wip = String.format("%s/%s-WIP.odb", url, name); _queueEmf = Persistence.createEntityManagerFactory(queue); _wipEmf = Persistence.createEntityManagerFactory(wip); registerTypes(_queueEmf); registerTypes(_wipEmf); _useWip = useWip; } private void registerTypes(EntityManagerFactory emf) { EntityManager em = emf.createEntityManager(); em.find(IdPoint.class, IdPoint.class); em.close(); } public void close() { if(_queueEmf.isOpen()) _queueEmf.close(); if(_wipEmf.isOpen()) _wipEmf.close(); } public void put(IdPoint p) { synchronized(_queueEmf) { EntityManager em = _queueEmf.createEntityManager(); em.getTransaction().begin(); try { em.persist(p); em.getTransaction().commit(); } catch(Exception e) { if(em.getTransaction().isActive()) em.getTransaction().rollback(); throw new RuntimeException("Error putting message on objectdb queue", e); } finally { em.close(); } } } public IdPoint take() { IdPoint p = null; synchronized(_queueEmf) { EntityManager em = _queueEmf.createEntityManager(); em.getTransaction().begin(); try { Query q = em.createQuery("SELECT p FROM IdPoint p ORDER by p.priority DESC, p.createTime"); //Query q = em.createQuery("SELECT p FROM IdPoint p"); p = (IdPoint)q.setMaxResults(1).setLockMode(LockModeType.PESSIMISTIC_WRITE).getSingleResult(); em.remove(p); em.getTransaction().commit(); } catch(NoResultException nre) { //the queue is currently empty log("No point"); } catch(LockTimeoutException lte) { //another consumer is competing for the same object log("Locked point"); } catch(Exception e) { throw new RuntimeException("Error taking message from ObjectDB queue", e); } finally { if(em.getTransaction().isActive()) em.getTransaction().rollback(); em.close(); } } if(p != null) putOnWip(p); return p; } private void putOnWip(IdPoint p) { if(!_useWip) return; synchronized(_wipEmf) { EntityManager em = _wipEmf.createEntityManager(); em.getTransaction().begin(); try { em.persist(p); em.getTransaction().commit(); } catch(Exception e) { if(em.getTransaction().isActive()) em.getTransaction().rollback(); throw new RuntimeException("Error putting message on objectdb wip", e); } finally { em.close(); } } } public void removeFromWip(IdPoint p) { if(!_useWip) return; synchronized(_wipEmf) { EntityManager em = _wipEmf.createEntityManager(); em.getTransaction().begin(); try { IdPoint point = em.find(IdPoint.class, p.getId(), LockModeType.PESSIMISTIC_WRITE); em.remove(point); em.getTransaction().commit(); } catch(Exception e) { if(em.getTransaction().isActive()) em.getTransaction().rollback(); throw new ApplicationException("Error taking message from ObjectDB wip", e); } finally { em.close(); } } } } private class ObjectDbProducer extends TestRunnable { private String _name; private ObjectDbQueue _queue; private int _num; private String _payload; public ObjectDbProducer(String name, ObjectDbQueue queue, int num, int size) { _name = name; _queue = queue; _num = num; _payload = randomString(size); } @Override public void runTest() throws Throwable { for(int i=0; i<_num; i++) { log(String.format("Producer %s putting point %s", _name, i)); IdPoint p = new IdPoint(i, i+1, _payload); _queue.put(p); } log(String.format("Producer %s completed putting points", _name)); } } private class ObjectDbConsumer extends TestRunnable { private String _name; private ObjectDbQueue _queue; private int _num; public ObjectDbConsumer(String name, ObjectDbQueue queue, int num) { _name = name; _queue = queue; _num = num; } @Override public void runTest() throws Throwable { for(int i=0; i<_num; i++) { IdPoint p = null; while(p == null) { p = _queue.take(); if(p == null) { Thread.sleep(1000); } else { log(String.format("Consumer %s taking point %s - %s", _name, i, p.toString())); _queue.removeFromWip(p); } } } log(String.format("Consumer %s completed taking point", _name)); } } @Entity public static class IdPoint { @Id private String id; @Index private long createTime; @Index private int priority; private int x; private int y; private Object payload; IdPoint(int x, int y, Object payload) { this.id = UUID.randomUUID().toString(); this.createTime = System.currentTimeMillis(); this.priority = 5; this.x = x; this.y = y; this.payload = payload; } public String getId() { return id; } public int getX() { return x; } public int getY() { return y; } public Object getPayload() { return payload; } @Override public String toString() { return String.format("%s - (%d, %d)", this.id, this.x, this.y); } } }