Thursday, June 30, 2011

Concurrency Pitfalls and More?

Read the A Dozen Concurrency Pitfalls a while back and began to search and asking for correct answers. Yes, there are some people posting too like here. So this is mainly note to myself.

Here are the question and answer

val stopMe = new Runnable {
  private var stop = false
  override def run() { while (!stop) doSomething(); println("Stopped") }
  def stopTask() { stop = true }
}
new Thread(stopMe).start()
Answer:
  • no synchronization, no guarantee that the value changed for "stop" will ever be seen
  •  stop is not volatile so jvm spec cant guarantee the changed stop value to be visible among threads
class MyTask implements Runnable {
  private val done = new ArrayBlockingQueue[String](1)
  private val stop = new AtomicBoolean

  public void run() {
    while (!stop.get()) doSomething() 
    done.put("DONE")
  }

  public void stopNow() {
    stop.set(false)
    done.take() // Wait until run completes
  }
}
Answer:
  •  if doSomething() call stopNow(), then...deadlock?
  • stop.set(false) invocation should be stop.set(true)
class BackgroundTask(iters: Int) extends Runnable {
  override def run() { for (i <- 1 to iters) doSomething() }
}
new Thread(new BackgroundTask(1000)).run()
println("Started backgroundTask")
Answer:
  • should call "start()" instead of "run()". I do that all the time
val items = new ConcurrentHashMap[String, Int]
 ...
val keys = items.keySet().toArray
Answer:
  • values when keySet() is called may not be the same when toArray is finished, therefore, result maybe unintended
  • val keys = items.keySet().toArray has no sense if it is not run in keys.synchronized block - other threads may change items and toArray will throw ConcurrentModificationException
class Stack {
  private val myLock = "LOCK"  

  def push(newValue: Int) {
    myLock.synchronized {
      ...
    }
  }
}
Answer:
  • String constant is shared in Java, all Stack will use the same "LOCK"
  • "LOCK" String is pointing to instance in JVM`s internal strings cache (somewhere in String class) - this synchronized method may cause much "wider" synchronization than it seems (global lock on all non-new "Lock" Strings)
class Stack {
  ...
  def push(newValue: Int) {
    new String("LOCK").synchronized {
      ...
    }
  }
}
Answer:
  • new.......every push call creates a new lock object......nothing is really synchronized with it
class Stack {
  private var values = new Array[Int](10)
  private var size = 0

  def push(newValue: Int) {
    values.synchronized {
      if (size > values.size) reallocate()
      values(size) = newValue
      size += 1
    }
  }
  ...
  private def reallocate() {
    values = values ++ new Array[Int](values.size)
  }
}
Answer:
  • everytime "reallocate()" is called the reference of "values" changes, therefore similar to "releasing" the lock unintentionally
class Stack {
  private val myLock = new ReentrantLock
  private val cond = myLock.newCondition()
  ...
  def pop() = {
    myLock.lock()
    try {    
      do { cond.await() } while (size == 0) 
      size -= 1
      values(size)
    } finally { myLock.unlock() }
  }
}
Answer:
  • should always do while-do instead of do-while
  • A do/while is used for the condition variable rather than just while. If cond is in a signaled state initially, the condition that size is zero may not be checked
class Stack {
  private val myLock = new ReentrantLock
  private var values = new Array[Int](10)
  private var size = 0 
  ...
  def write(fileName: String) {   
    myLock.lock()
    val out = new PrintWriter(fileName)
    try {
      out.println(size + " " + values.mkString(" "))
    } finally {  
      out.close()
      myLock.unlock()
    }
  }
}
Answer:
  • out.close() might throw exception, causing myLock.unlock() to never get called
val queue = new ArrayBlockingQueue[String](10)
val button = new JButton("Start")
button.addActionListener(new ActionListener {
  override def actionPerformed(event: ActionEvent) {
    queue.put(event.toString)
  }
}
Answer:
  • if queue.put is blocked, as queue is full, the main UI thread will be blocked
class Model {
  private val myLock = new ReentrantLock
  private def withMyLock(block: => Unit) { 
    myLock.lock(); try { block } finally { myLock.unlock() ; }
  }

  private val listeners = new ArrayBuffer[ChangeListener]
  
  def addListener(l: ChangeListener) { withMyLock { listeners += l } }
  def removeListener(l: ChangeListener) { withMyLock { listeners -= l } }
  def fireListeners() {
    withMyLock {
      for (l <- listeners) l.stateChanged(new ChangeEvent(this))
    }
  }
  ...
}
Answer:
  • since ChangeListener is implemented by other ppl, if they call addListener or removeListener in there listener then unexpected behavior might occur (in fireListeners)
  • should use CopyOnWriteArrayList in order to allow a listener to call add/remove listener when the event is being dispatched without causing a CME and/or deadlock
val queue = new ArrayBlockingQueue[String](10)
val formatter = new SimpleDateFormat("MMM dd HH:mm:ss")
class MyTask extends Runnable {
  override def run() {
    Object result = doSomething()
    queue.put(formatter.format(new Date) + " " + result)
  }
}
for (i <- 1 to 10) { new Thread(new MyTask).start() }
Answer:
  • simple....................because SimpleDateFormat is too simple to run in multi-thread.......lol. refer to JavaDoc

class Stack(val maxSize: Int) {
  private val array = new ArrayDeque[Int]

  def put(value: Int) {
    array.synchronized {
      while(array.size == maxSize) {
        wait()
      }
      array.add(value)
    }
  }
  ...
}
Answer:
  •   synchronizes on this.array but waits on this = IllegalMonitorStateException


Some Other:

public class MyServlet implements Servlet{
    private Object something;

    public void service(ServletRequest request, ServletResponse response)
        throws ServletException, IOException{
        this.something = request.getAttribute("something");
        doSomething();
    }

    private void doSomething(){
        this.something ...
    }
}
Answer:
  • Servlet is Singleton in a container. Mutating a instance field will result in unexpected behavior of other process on running this Servlet

Double-Checked Locking idiom


// Broken multithreaded version
// "Double-Checked Locking" idiom
class Foo {
  private Helper helper = null;
  public Helper getHelper() {
    if (helper == null) {
       synchronized(this) {
         if (helper == null) {
            helper = new Helper();
         }
       }
    }
    return helper;
  }
  // other functions and members...
}

Answer:
  • When thread A is creating a new instance of Helper, thread B may come in and see that "helper" is no longer null and uses the not-fully-instantiated instance

Tuesday, June 28, 2011

Squeryl with Java Experiment

This is an attempt to use the popular Scala ORM/DSL Squeryl with plain Java code. I had been watching Squeryl grow for a while and loved the syntax and style since the beginning, but never had a chance to actually use it. The first attempt I had with it was trying to use it with Squeryl-Record module in Lift. However at the time, it was just too much to handle for someone who is quite new to Scala, Record and Squeryl. So, I took a step back and start with just Squeryl and try to connect with Java, which am familiar with, and see if it's really that "straight forward" as it says on the site.

I use Squeryl 0.9.4 built against Scala 2.9.0.

First thing, it is not directly possible to make Squeryl looks 100% Java, due to the nature that Squeryl requires its model class to be defined in Scala. Of course, you can always map the Scala model to a Java counter part.

Alright let's get on it.
(I am using a real world DAO Java interface from one of my project, by only changing the model to be implemented by Scala)

According to point 2 of "Using in a Java project" (oh wait, all 3 points are labeled "1"), define Schema class in Scala.
Here is my Schema:
abstract class BaseEntity extends KeyedEntity[Long] {
  @BeanProperty
  var id: Long = 0
  @BeanProperty
  var createDate: Timestamp = new Timestamp(System.currentTimeMillis)
}

@BeanInfo
class Members(@BeanProperty var name: String, @BeanProperty var email: String) extends BaseEntity

@BeanInfo
class Categories(@BeanProperty var code: String, 
    @BeanProperty var name: String, 
    @BeanProperty var sequence: Int, 
    @BeanProperty var parentId: Long) extends BaseEntity

@BeanInfo
class Articles(@BeanProperty var categoryId: Long, 
    @BeanProperty var name: String, 
    @BeanProperty var sound1Url: String, 
    @BeanProperty var sound2Url: String, 
    @BeanProperty var imageUrl: String, 
    @BeanProperty var estimatedDuration: Int, 
    @BeanProperty var clickCount: Int) extends BaseEntity {
    def this() = this(null.asInstanceOf[Long], null, null, null, 
          null, null.asInstanceOf[Int], null.asInstanceOf[Int])
}

object Models extends Schema {
  
  val members = table[Members]
  
  val categories = table[Categories]
  
  val articles = table[Articles]
}


Here I am making all model properties var instead of val so it behaves like what Java objects usually behaves, mutable.
One thing to notice here is that, in my original Java version of the models, I have all the Int as java.lang.Integer, and Long as java.lang.Long. Therefore, I could have NULL for my number properties. However, in Scala Int and Long actually maps to Java's int and long, so there will always be the default value of 0. In the Articles model's zero-argument constructor, I am simply demonstrating that setting the values to null won't set the field to NULL.
In the site it says
"@BeanInfo annotations, this will cause the compiler to generate Java style getters and setters for every property, the class will then look exactly like POJOs in the Java code"
However this will only make the model class usable to by Java but does not actually properly generate the Java getter/setter, so @BeanProperty is still necessary for each property. Here is what you get for not using @BeanProperty







Also implement a zero-argument constructor for each model class, so other Java code can instantiate the model class like the way usual Java code does.

Next, define the DAO. Here I use a DAO interface taken from my project and just changed the model class to the new Scala implemented model

public interface ArticlesDao {

  public boolean existsByCategory(Long categoryId);
  
  public Articles findById(Long id);
  
  public int deleteById(Long id);
  
  public List<Articles> findByCategory(Long categoryId);
  
  public int insert(Articles articles);
  
  public int update(Articles articles);
  
  public List<Articles> findTopCountArticles(int top);
  
  public int addClickCount(Long id, int count);
}

And let's start the fun by implementing it with Squeryl
package com.netgents.dw.dao.scala

import java.lang.{Long => JLong}
import java.util.List
import scala.collection.JavaConversions._
import org.squeryl.PrimitiveTypeMode._
import org.squeryl.SessionFactory
import org.squeryl.Session
import com.netgents.dw.dao.ArticlesDao
import com.netgents.dw.model.Models._
import com.netgents.dw.model.Articles
import org.springframework.stereotype.Repository

@Repository("scala.articlesDao")
class ArticlesDaoImpl extends ArticlesDao {
  
  def existsByCategory(categoryId: JLong): Boolean = 
    byCategory(categoryId).headOption.map(_ => true).getOrElse(false)
  
  def findById(id: JLong): Articles =
    articles.lookup(id.longValue).getOrElse(null)
  
  def deleteById(id: JLong): Int = 
    if(articles.delete(id.longValue)) 1 else 0
  
  def findByCategory(categoryId: JLong): List[Articles] = 
    byCategory(categoryId).toList
  
  private def byCategory(categoryId: JLong) =
    from(articles) (a => 
      where(a.categoryId === categoryId.longValue) 
      select(a) 
      orderBy(a.id))
  
  def insert(a: Articles): Int = {
    articles.insert(a)
    1
  }
  
  def update(a: Articles): Int = {
    articles.update(a)
    1
  }
  
  def findTopCountArticles(top: Int): List[Articles] =
    from(articles)(a => 
      select(a) 
      orderBy(a.clickCount desc)).take(top).toList

  def addClickCount(id: JLong, count: Int): Int = {
    org.squeryl.PrimitiveTypeMode.update(articles)(a => //dont know why i cant just use update without specifying full class package
      where(a.id === id.longValue)
      set(a.clickCount := a.clickCount.~ + count))
  }
}

Here are some basic and small things to notice. Because we are basically working with Java , we need to give java.util.Long an alias, so we can work with it easier. And add import scala.collection.JavaConversions._ to make Scala List convert to Java List.

Here are some problems I encountered
  1. Due to the requirement of original API both insert and update are required to return the number of records affected. However, both insert and update in the Table don't return the number of records affected. Insert returns the inserted instance with id populated. Update simply returns Unit. I am not sure if what will happen if something goes wrong in database and the database doesn't give any error and just returned 0 as number of affected records (I have encountered that with a version of MS SQL2005), or when update did not find any record to update, so for now I can only assume 1 record is affected as long as there is no exception (actual effect explained later).
  2. not sure how to do top/limit SQL directly, except by using "take". (I am sure it's just because I didn't look hard enough)

Alright, you might have noticed I did not use any transaction {} or inTransaction {}. This is because I am planning to put my transaction control in my Service level.

As you may have noticed my usage of the @Repository annotation, yes I will be using Spring to take care of my transaction. In order to make Squeryl work with Spring or any outside transaction support, you have to use the SessionFactory.externalTransactionManagementAdapter instead of the good old SessionFactory.concreteFactory. And this is the way to use Squeryl without wrapping inside transaction/inTransaction {}. However, there are more things to be done than just implementing this method. According to the document, Session.cleanupResources needs to be called manually when your done with the connection. Since I use Spring's annotated transaction, I decide to write my own TransactionManager.
Here is my implementation:
class MyDataSourceTransactionManager extends DataSourceTransactionManager {

  @PostConstruct
  def init() {
    SessionFactory.externalTransactionManagementAdapter = Some(() => {
      if(Session.hasCurrentSession) {
        Session.currentSessionOption.get
      }
      else {
 
        val s = new Session(DataSourceUtils.getConnection(getDataSource), 
                    new MySQLAdapter, None){
          override def cleanup = {
            super.cleanup
            unbindFromCurrentThread
          }
        }
        //Session.create(DataSourceUtils.getConnection(getDataSource), new MySQLAdapter)
        s.bindToCurrentThread
        s
      }
    })
  
  }
  
  override def doCleanupAfterCompletion(transaction: AnyRef) {

    super.doCleanupAfterCompletion(transaction)
    Session.cleanupResources //clean up resources when done, following the doc
  }

}

The problems I faced here are
  1. Session needs to be controlled by myself so that all query made in the same transaction will get the same Session. Here I leverage the built-in Session storing mechanism, bindToCurrentThread. 
  2. Since I am using s.bindToCurrentThread, I need to unbind when I am done. However Session.cleanupResources doesn't do that and I don't have direct access to unbindFromCurrentThread, so I override the Session's implementation to call unbindFromCurrentThread when cleanup is called.
One small tip here is that since I am using Spring's transaction control, when getting Connection, I have to use DataSourceUtils.getConnection(getDataSource) instead of simply  getDataSource.getConnection.

There, that's about it, the rest can be plain Java and not to worry that the DAO is in Scala

Let's do a little test.

I will leave out the detail of Service implement because it's mostly mapping of 1 to 1 method call to DAO.

Here is my test class

@Test(expected = RuntimeException.class)
public void test1() {

  Articles a = new Articles();
  a.setId(234);
  a.setCategoryId(1L);
  a.setEstimatedDuration(100);
  a.setName("Article 1");
  a.setSound1Url("sound url 1");
  a.setSound2Url("sound url 2");
  a.setImageUrl("image url");
  a.setClickCount(6);
  articlesService.insert(a);
    
  assertEquals(1L, a.getId());//id should be set to 1 after insert
    
  a.setSound1Url(null);
  a.setImageUrl("image url updated");
    
  articlesService.update(a);
    
  a = articlesService.findById(new Long(1));
    
  assertNotNull(a);//a should not be null
  assertNull(a.getSound1Url());//should be null now
  assertEquals("image url updated", a.getImageUrl());
  
  articlesService.deleteById(new Long(1));
  
  a = articlesService.findById(new Long(1));
    
  assertNull(a);//should be deleted

  //lets see if not record is found to update
  a = new Articles();
  a.setId(artId);
  a.setCategoryId(new Long(34));
  a.setEstimatedDuration(321);
  a.setImageUrl("another image url");
  articlesService.update(a);//will throw a RuntimeException: failed to update
}
  
@Test
public void transaction() {
    
  articlesService.goodInsertsActions();
  
  List<articles> as = articlesService.findByCategory(new Long(1));
  
  assertEquals(60, as.size());//inserted 60 records

  //test to see if transaction works
  try {
    articlesService.badInsertsActions();
  }
  catch(Exception e) {
    System.out.println("error message: "+e.getMessage());
  }
  as = articlesService.findByCategory(new Long(1));
    
  assertEquals(60, as.size());//worked, number of records stayed at 60    
}
Here is what I do in goodInsertsAction and badInsertsAction
@Override
public void goodInsertsActions() {
  doInserts();
}

@Override
public void badInsertsActions() {
  doInserts();
  throw new DataIntegrityViolationException("hell with it");
}

private void doInserts() {
  Articles a;
  for(int i = 1; i <= 60; i++) {
    a = new Articles();
    a.setCategoryId(1L);
    a.setEstimatedDuration(100);
    a.setName("Article " + i);
    a.setSound1Url("sound url 1");
    a.setClickCount(6 + i);
    articlesDao.insert(a);    
  }  
}
The test demonstrated:
  1. id will be updated once data is inserted and new id retrieved
  2. values can be simply NULL without the use of Scala's convention of Option
  3. when update cannot find a record to update a RuntimeException is raised
  4. transaction is working good