分类目录归档:Scala-Others

[repost ]Typesafe Interview: Scala + Akka Is An IaaS For Your Process Architecture

original:http://highscalability.com/blog/2013/5/8/typesafe-interview-scala-akka-is-an-iaas-for-your-process-ar.html

This is an email interview with Viktor Klang, Director of Engineering at Typesafe, on the Scala Futures model &Akka, both topics on which is he is immensely passionate and knowledgeable.

How do you structure your application? That’s the question I explored in the article Beyond Threads And Callbacks. An option I did not talk about, mostly because of my own ignorance, is a powerful stack you may not be all that familiar with: Scala and Akka.

To remedy my oversight is our acting tour guide, Typesafe’s Viktor Klang, long time Scala hacker and Java enterprise systems architect. Viktor was very patient in answering my questions and was enthusiastic about sharing his knowledge. He’s a guy who definitely knows what he is talking about.

I’ve implemented several Actor systems along with the messaging infrastructure, threading, async IO, service orchestration, failover, etc, so I’m innately skeptical about frameworks that remove control from the programmer at the cost of latency.

So at the end of the interview am I ready to drink the koolaid? Not quite, but I’ll have a cup of coffee with the idea.

I came to think of Scala + Akka as a kind of a IaaS for your process architecture. Toss in Play for the web framework and you have a slick stack, with far more out of the box power than Go, Node, or plaino jaino Java.

The build or buy decision is surprisingly similar to every other infrastructure decision you make. Should you use a cloud or build your own? It’s the same sort of calculation you need to go through when deciding on your process architecture. While at the extremes you lose functionality and flexibility, but since they’ve already thought of most everything you would need to think about, with examples, and support, you gain a tremendous amount too. Traditionally, however, processes architecture has been entirely ad-hoc. That may be changing.

Now, let’s start the interview with Viktor…

 

HS:  What is an Actor?

So let’s start from the very beginning! An Actor in the Actor Model is comprised by 3 distinct pieces:

  • A behavior
  • An address
  • A mailbox

The Address is the thing you send messages to, they are then put into the Mailbox and the Behavior is applied to the messages in the mailbox—one at a time. Since only one message is processed at a time, you can view an Actor as an island of consistency, connected to other actors via their Addresses and by sending and receiving messages from them.

There are 3 core operations that an Actor needs to support in order for it to qualify as an Actor.

  1. CREATE—an Actor has to be able to create new Actors
  2. SEND—an Actor needs to be able to send messages to Actors
  3. BECOME—an Actor needs to be able to change its behavior for the next message

Since what you send messages to is an Address, there is an indirection which allows the Mailbox and Behavior to live essentially anywhere, as long as the message can get routed there. This is also referred to as Location Transparency.

HS: How does Akka implement the Actor model?

Like the Actor model but requests are served by a designated pool configured on a per-actor basis. This allows for fine-grained control over execution provisioning and a means of bulkheading parts of your application from other parts of the application. Akka also allows to configure the mailbox implementation on a per-actor basis, which means that some actors might need a bounded one, some might want a priority-based one, some might want a deduplicating one, or fine-tuning things like overflow protection with head-dropping vs. tail-dropping etc.

Comparing with Threads, Akka Actors are extremely light-weight, clocking in at around 500b per instance, allowing for running many millions of actors on a commodity machine. Like Erlang Processes, Akka Actors are location transparent which means that it is possible to scale out to multiple machines without changing the way the code is written.

Akka Actors do not block on a thread when not having anything to process, which allows for high throughput at low latency as wake-up lag for threads can be avoided. It is also possible to configure the number of messages to process before handing back the thread to the pool, it is also possible to specify a time slice which will allow for the actor to keep processing new messages as long as it hasn’t run out of its time slice before handing back the thread to the pool.

This allows to tune for fairness or for throughput. Akka Actors will not be preempted when a higher-priority message arrives, but it is possible to have multiple actors sharing the same mailbox, which can mitigate this if required.

Inspired by Process Linking from Erlang, Akka Actors form a strict hierarchy, where actors created by an actor from a child-parent relationship where the parent is responsible for handling the failure of the children by issuing directives on how to deal with the different types of failure that can occur, or choose to escalate the problem to its parent. This has the benefit of creating the same kind of self-healing capabilities exhibited by Erlang. It is also possible for an Akka Actor to observe when another Actor will not be available anymore, and handle that accordingly.

 

HS:  Can you give an example of how Process Linking works in practice?

Actor A receives message B, which entails a potentially risky operation C (could be contacting an external server or do a computation that might blow up) instead of doing that work itself, it may spawn a new actor and let that actor do this risky operation. If that operation fails, then the exception is propagated to A (being the “parent”) who can decide to restart the failed actor to retry, or perhaps log that it failed. No matter if it fails or not, A has not been at risk, as the dangerous operation was delegated and managed. In the case of a more serious error that A cannot manage, A would escalate that error to its parent who might then act upon it instead.

 

HS: Can you go into some more detail about bulkheading, why is it important and how it’s  accomplished in Akka?

The Bulkhead Stability Pattern is from EIP by Nygard. It’s about gaining stability by compartmentalization, just like bulkheads for a boat.

Bulkheading of Threads in Akka is accomplished by assigning different thread pools to different segments of your actor hierarchy, which means that if one thread pool is overloaded by either high load, DoS attempt or a logic error creating an infinite loop for instance, other parts of the application can proceed since their Threads cannot be “infected” by the failing thread pool.

HS: Tail-dropping?

When it comes to dealing with asynchronous message passing systems one needs to decide what contention management policies one should use. Back-pressure is one policy, dropping messages is another, and if you decide to drop messages, which ones do you drop. Usually this is something that needs to be decided on a “per service” basis, either you drop the oldest (the one at the front of the queue, i.e. front-dropping) or the newest (tail-dropping). Sometimes one wants to have a priority queue so that the important messages end up at the front of the queue.

 

HS: What about these abilities helps programmers develop better/faster/robuster systems?

In any system, when load grows to surpass the processing capability, one must decide how to deal with the situation. With configurable mailbox implementations you as the developer can decide how to deal with this problem on a case-by-case basis, exploiting business knowledge and constraints to make sure that performance and scalability is not compromised to get the robustness (which is more than likely the case for a one-size-fits-all solution like backpressure).

 

HS: How does the location transparency work?

Each Akka Actor is identified by an ActorRef which is similar to Erlang PIDs, a level of indirection between the instance of the Actor and the senders. So senders only ever interact with ActorRefs which allows the underlying Actor instance to live anywhere (in the world potentially).

 

HS: Is there latency involved in schedule an Akka thread to execute?

When an Actor doesn’t have any messages it is not scheduled for execution, and when it gets a message it will attempt to schedule itself with the thread pool if it hasn’t already done so. The latency is completely up to the implementation of the Thread Pool used, and this is also configurable and extensible/user replaceable. By default Akka uses a state-of-the-art implementation of a thread pool without any single point of contention.

 

HS: Given you can configure the number of messages to  process before handing back the thread to the pool, that makes it a sort of run to completion model and the CPU time isn’t bounded?

Exactly.

 

HS:  Can it be interrupted?

No, but as soon as one message is done, it will check if it still has time left, and if so it will pick the next message.

 

HS: Can you ensure some sort of fair scheduling so some work items can make some progress?

That is up to the ThreadPool implementation and the OS Scheduler, fortunately the user can affect both.

 

HS: When multiple Actors share the same mailbox, if some actor has the CPU, it won’t give up the CPU for the higher priority message to be executed? How does this work on multiple CPUs?

If you have 10 Actors sharing a single priority mailbox and a thread pool of 10 Threads,

there is more opportunity for an actor to be done to pick up the high-priority work than if it’s a single actor that is currently processing a slow and low priority message. So it’s not a watertight solution, but it improves the processing of high-prio messages under that circumstance.

By placing requirements on priority of messages increases lock contention and sacrifices throughput for latency.

 

HS: How do Actors know where to start in a distributed fabric?

That is done by configuration so that one can change the production infrastructure without having to rebuild the application, or run the same application on multiple, different infrastructures without building customized distributions.

 

HS: How do Actors know how to replicate and handle failover?

Also in configuration.

HS: How do you name Actors?

When you create an Akka Actor you specify its name, and the address of the actor is a URI of its place in the hierarchy.

Example: “akka.tcp://applicationName@host:port/user/yourActorsParentsName/yourActorsName”

 

HS: How do you find Actors?

There are a couple of different ways depending on the use-case/situation, either you get the ActorRef (every Akka Actor is referred to by its ActorRef, this is equivalent to Address in the Actor Model) injected via the constructor of the Actor, or you get it in a message or as the sender of a message. If you need to do look ups of Actors there are 2 different ways, 1 is to create an ActorSelection, which can be described as query of the hierarchy, to which you can send messages and all actors matching the query will get it. Or you can use “actorFor” which lets you look up a specific actor using its full URI.

 

HS: How do you know what an Actor can do?

You don’t. Well, unless you define such a protocol, which is trivial.

 

HS: Why is indirection an important capability?

The indirection is important because it clearly separates the location of the behavior from the location of the sender. An indirection that can even be rebound at runtime, migrating actors from one physical node to another without impacting the Address itself.

 

HS: How does you not have contention on you thread pools?

Every Thread in that pool has its own task-queue, and there is no shared queue. Tasks are randomly distributed to the work-queues and when a Thread doesn’t have any tasks it will randomly work steal from other Threads. Having no single point of contention allows for much greater scalability.

 

HS: Could you please give a brief intro into Scala and why it’s so wonderful?

Sure!

I come from a C them C++ then Java background and discovered Scala back in 2007.

For me Scala is about focusing on the business-end of the programming and removing repetition & “ritual” code.

Scala is a unifier of object orientation and functional programming, as well as it is trying to minimize specialized constructs in the language and instead giving powerful & flexible constructs for library authors to ad functionality with.

I personally enjoy that Scala is expression oriented rather than statement oriented, which simplifies code by avoiding a lot of mutable state which tend to easy turn into an Italian pasta dish.

A statement doesn’t “return”/”produce” a result (you could say that it returns void), but instead it “side-effects” by writing to memory locations that it knows about, whereas an expression is a piece of code that “returns”/”produces” a value.

So all in all Scala lets me write less code, with less moving parts making it cheaper to maintain and a joy to write. A great combination in my book!

And not to forget that it allows me to use all good Java libraries out there, and even be consumed by Java (Akka can be used by both Scala and Java as an example).

 

HS: How do Scala futures fit into the scheme of things?

Alright. So I was a co-author of the SIP-14 proposal that was included in Scala 2.10. So the following explanations and discussions will center around that.

A Future is a read-handle for a single value that may be available at some point in time. Once the value is available it cannot and will not be changed.

A Promise is a write-handle for a single value that should be set at some point in time. Once the value is available it cannot and will not be changed.

The value of a Future/Promise may either be a result or an exception.

(You can get the corresponding Future from a Promise (by calling the future()-method on Promise) but not vice versa)

The strength of this model is that it allows you to program as if you already have the result, and the logic is applied when the result is available, effectively creating a data-flow style of programming, a model which easily can take advantage of concurrent evaluation.

When you program with Futures you need to have an ExecutionContext which will be responsible for executing the logic asychronously, for all intents and purposes this is equivalent to a thread pool.

As an example in Scala:

import scala.concurrent.{ Future, ExecutionContext }

import ExecutionContext.Implicits.global // imports into scope the global default execution context

// lets first define a method that adds two Future[Int]s

// This method uses a Scala for-expression, but it is only sugar for:

// f1.flatMap(left => f2.map(right => left + right))

// it asynchronously and non-blockingly adds the result of future1 to the result of future2

def add(f1: Future[Int], f2: Future[Int]): Future[Int] = for(result1 <- f1; result2 <- f2) yield result1 + result2

 

// Then lets define a method that produces random integers

def randomInteger() = 4 // Determined by fair dice roll

val future1 = Future(randomInteger()) //Internally creates a Promise[Int] and returns its Future[Int] immediately and calls “randomInteger()” asynchronously and completes the promise with the result which is then accessible from its Future.

val future2 = Future(randomInteger()) // same as above

 val future3 = add(future1, future2)

None of the code above is blocking any thread, and the code is declarative and doesn’t prescribe _how_ the code will be executed. The ExecutionContext can be switched without changing any of the logic.

So what happens if the value is exceptional?

val future3 = add(Future(throw new BadThingsHappenedException), Future(randomInteger()))

Then the exceptional completion of future1 will be propagated to future3.

So lets say we know a way to recover from BadThingsHappenedExceptions, let’s use the recover method:

val future1a = Future(throw new BadThingsHappenedException)

val future1b = future1a recover { case e: BadThingsHappenedException => randomInteger() }

val future2 = Future(randomInteger())

val future3 = add(future1b, future2)

So here we first create future1a, which will be completed exceptionally with a BadThingsHappenedException,

then we call the “recover” method on future1a, and provide a (partial) function literal that can convert BadThingsHappenedExceptions to an Int by calling our amazing randomInteger() method, the result of “recover” is a new future, which we call future1b.

So here we can observe that futures are only completed once, and the way to transform the results or exceptions of a future is to create a new Future which will hold the result of the transformation.

So from a less contrived example standpoint, we can do things like:

val future1 = Future(callSomeWebService) recover { case _: ConnectException => callSomeBackupWebService() }

val future2 = Future(callSomeOtherWebService) recover { case _: ConnectException => callSomeOtherBackupWebService() }

val future3 = for(firstResult <- future1; secondResult <- future2) yield combineResults(firstResult, secondResult)

future3 map { result => convertToHttpResponse(result) }

          recover { case _ => HttpResponse(400) } // underscore means “anything”

          foreach { response => sendResponseToClient(response) }

So what we do here is that we asynchronously call a couple of web services, and if any of them fail with a ConnectException we try to call some backup webservice, then we combine the results of those web-service responses into some intermediate result, then we convert that result into some HttpResponse, if there has been any exceptional things happened this far, we’ll recover to a HttpResponse which will have a 400-status and as the very last step we send our HttpResponse to some client that requested it.

So in our code we never wait for anything, what we do is to declare what we want to happen when/if we have a result, and there is a clear flow of data.

 

HS: Is a Future a scalar or can it have structure (arrays, maps, stucts,  etc)?

It is a single memory slot that can only be written once. So what you write to it should be a value (i.e. immutable) but can be a struct, a Map or what have you.

HS: How do you implement more interesting state machines where results from one state are used in another? I think that’s what I have a problem with a lot of times. I would prefer to go to clear error state where errors handled, for example. In the linkedin example they parallelize three separate calls and have a bit of error handling code somewhere that doesn’t seem to know where the error came from or why, which makes crafting specific error response difficult.

I understand what you mean, but I view it differently. With Futures you deal with the failure where you can, just as you deal with exceptions in Java where you can. This may or may not be in the method that produces the exception, or in the caller, or in the callers caller or otherwise.

You could view Futures (with exceptional results) as an on-heap version of exception handling (in contrast to plain ex

Exception handling which is on stack, meaning that any thread can choose to deal with the exception and not only the thread that causes it).

 

HS: A lot of the never wait for anything seems normal to me in C++. Send a message. All IO is async. Replies comes back. Gets dropped into the right actor queue.

I hear you! A lot of the good things we learned from C/C++ still applies, i.e. async IO is more resource efficient than blocking IO etc.

 

HS: The actor state machine makes sense of what to do. Thread contexts are correct. In your example there’s no shared state, which is the simplest situation, but when shared state is involved it’s not so clean, especially when many of these are bits of code are execution simultaneously.

Of course, but it depends on what one means by shared state. Something that I find useful is “what would I do if the actors were people and they’d be in different locations?”

Sharing state (immutable values) via message-passing is perfectly natural and in reality mimics how we as humans share knowledge (we don’t flip each others neurons directly :) )

Related Articles

[repost ]Neo4j and Scala hacking notes

original:http://digifesto.com/2011/09/15/neo4j-and-scala-hacking-notes/

This week FOSS4G, though it has nothing in particular to do with geospatial (…yet), I’ve started hacking around graph database Neo4j in Scala because I’m convinced both are the future. I’ve had almost no experience with either.

Dwins kindly held my hand through this process. He knows a hell of a lot about Scala and guided me through how some of the language features could help me work with the Neo4j API. In this post, I will try to describe the process and problems we ran into and parrot his explanations.

I wrote some graphical hello world code to test things out in a file called Krow.scala (don’t ask). I’ll walk you through it:

import org.neo4j.kernel.EmbeddedGraphDatabase
import org.neo4j.graphdb._
import collection.JavaConversions._

I wanted to code against an embedded database, rather than code against the Neo4j server, because I have big dreams of combining Neo4j with some other web framework and don’t like have to start and stop databases. So I needed EmbeddedGraphDatabase, which implements the GraphDatabaseService interface, and persists its data to a directory of files.

I’ll talk about the JavaConversions bit later.

object Krow extends Application {

I am a lazy programmer who only bothers to encapsulate things into software architecture at the last minute. I’m also spoiled by Python and JavaScript and intimidated by the idea of code compilation. So initially I wanted to write this as an interpreted script so I wouldn’t have to think about it. But I’ve heard great things aboutsbt (simple-build-tool) so I figured I’d try it out.

Using sbt was definitely worth it, if only because it is really well documented and starting up my project with it got me back into the mindset of Java development enough to get Dwins to explain Maven repositories to me again. Adding dependencies to an sbt project involves writing Scala itself, which is a nice way to ease into the language.

But running my project in sbt meant I needed a main method on by lame-o script. Ugh. That sounded like too much work for me, and args: Array[String] looks ugly and typing it ruins my day.

Dwins recommended I try using Scala’s Application trait. He explained that this would take code from an object’s body and do some magic to turn it into a main method. Rock on!

Of course, I didn’t bother to check the documentation or anything. Otherwise, I would have seen this:

The Application trait can be used to quickly turn objects into executable programs, but is not recommended.

For a language that is so emphatically Correct in its design, I give a lot of credit to whoever it was that had the balls to include this language feature so that newbs could hang themselves on it. If they hadn’t, I wouldn’t have had to confront hard truths about threading. (That’s foreshadowing)

  println("start")

  val neo: GraphDatabaseService = new EmbeddedGraphDatabase("var/graphdb")

Sweet, a database I don’t have to start and stop on the command line! Thisvar/graphdb directory is made in the directory in which I run the program (for me, using sbt run).

Next:

  var tx: Transaction = neo.beginTx()

  var first : Node = null
  var second : Node = null

  try {
    first = neo.createNode()
    first.setProperty("name","first")

    second = neo.createNode()
    second.setProperty("name","second")

    first.createRelationshipTo(second, "isRelatedTo")

    tx.success()
  } finally {
    println("finished transaction 1")
  }

What I’m trying to do with this code is make two nodes and a relationship between them. Should be simple.

But it turns out that with Neo4j, all modifications to the database have to be done in a transaction context, and for that you have to do this business of creating a newTransaction:

A programmatically handled transaction. All modifying operations that work with the node space must be wrapped in a transaction. Transactions are thread confined. Transactions can either be handled programmatically, through this interface, or by a container through the Java Transaction API (JTA). The Transaction interface makes handling programmatic transactions easier than using JTA programmatically. Here’s the idiomatic use of programmatic transactions in Neo4j:

 Transaction tx = graphDb.beginTx();
 try
 {
        ... // any operation that works with the node space
     tx.success();
 }
 finally
 {
     tx.finish();
 }

No big deal.

This bit of code was a chance for Dwins to show me a Scala feature that makes the language shine. Check out this line:

first.createRelationshipTo(second, "isRelatedTo")

If you check the documentation for this method, you can see that I’m not using this method as expect. The Java type signature is:

Relationship createRelationshipTo(Node otherNode, RelationshipType type)

where RelationshipType is a Neo4j concept that’s what it sounds like. I suppose it is important to set apart from mere Properties for performance on traversals something. RelationshipTypes can be created dynamically and seem to more or less exist in the either, but you need to provide them when you create a relationship. All relationships are of a type.

In terms of their data content, though, RelationshipTypes are just wrappers around strings. Rather than doing this wrapping in the same line that IcreateRelationship, Scala lets me establish a conversion from strings to RelationshipTypes in an elegant way.

You see, I lied. The above code would not have compiled had I not also included this earlier in the object’s definition:

  implicit def string2relationshipType(x: String) = DynamicRelationshipType.withName(x)

This code uses Scala’s implicit conversions to define a conversion between Strings and RelationshipTypes.

DynamicRelationshipType.withName(x) is one of Neo4j’s ways of making a new RelationshipType. Scala’s type inference means that the compiler knows thatstring2relationshipType returns a RelationshipType.

Since I used the implicit keyword, Scala knows that when a String is used in a method that expects a RelationshipType, it can use this function to convert it on the fly.

Check out all that majesty. Thanks, Scala!

Ok, so now I want to show that I was actually able to get something into the database. So here’s my node traversal and printing code.

  tx = neo.beginTx()

  try{

    val trav : Traverser = first.traverse(Traverser.Order.BREADTH_FIRST,
                                          StopEvaluator.END_OF_GRAPH,
                                          ReturnableEvaluator.ALL,
                                          "isRelatedTo",
                                          Direction.BOTH)

    for(node <- trav){
      println(node.getProperty("name"))
    }
    tx.success()
  } finally {
    tx.finish()
    println("finished transaction 2")
  }

  neo.shutdown()

  println("done")

}

Two observations:

  • traverse takes a lot of arguments, most of which seem to be these awkwardly specified static variables. I bet there’s a way to use Scala features to wrap that and make it more elegant.
  • Check out that for loop. Concise syntax that takes an iterator. There’s one catch: Traverser is a Java.lang.Iterable iterator, whereas the loop syntax requires a scala.collection.Iterable. Remember that import scala.collection.JavaConversions._ line? That imported an implicit conversion from Java to Scala iterables.

All in all, pretty straightforward stuff, I thought. Here’s what I got when I used sbt to run this project:

> run
[info] Compiling 1 Scala source to /home/sb/dev/krow/target/scala-2.9.1.final/classes...
[warn] there were 1 deprecation warnings; re-run with -deprecation for details
[warn] one warning found
[info] Running Krow 
start
finished transaction 1
finished transaction 2

That’s not what I wanted! Not only did I not get any printed acknowledgement of the nodes that I had made in the database, but program hangs and doesn’t finish.

What the hell?!

Asking Dwins about it, he tells me sagely about threads. Transactions need to be run in a single thread. The Application trait does a lot of bad stuff with threads. To be technically specific about it, it does…some really bad stuff with threads. I thought I had a handle on it when I started writing this blog post but instead I’m just going to copy/paste from the Application trait docs, which I should have read in the first place.

In practice the Application trait has a number of serious pitfalls:

* Threaded code that references the object will block until static initialization is complete. However, because the entire execution of an object extending Application takes place during static initialization, concurrent code will always deadlock if it must synchronize with the enclosing object.

Oh. Huh. That’s interesting.

It is recommended to use the App trait instead.

Now you’re talking. Let me just change that line to object Krow extends App {and I’ll be cooking in no…

> run
[info] Compiling 1 Scala source to /home/sb/dev/krow/target/scala-2.9.1.final/classes...
[warn] there were 1 deprecation warnings; re-run with -deprecation for details
[warn] one warning found
[info] Running Krow 
start
finished transaction 1
finished transaction 2

…time.

God dammit. There’s something else about App, which runs all the object code at initialization, which is causing a problem I guess. I asked Dwins what he thought.

Too much magic.

I guess I’m going to have to write a main method after all.


After some further messing around with the code, I have something that runs and prints the desired lines.

While the code would compile, I got I wound up having to explicitly name theRelationshipType type in the calls where I was trying to implicitly convert the strings; otherwise I got exceptions like this:

java.lang.IllegalArgumentException: Expected RelationshipType at var args pos 0, found isRelatedTo

Does that make it an explicit conversion?

Overall, hacking around with this makes me excited about both Scala and Neo4j despite the setbacks and wrangling.

Complete working code appended below.


import org.neo4j.kernel.EmbeddedGraphDatabase
import org.neo4j.graphdb._
import collection.JavaConversions._

object Krow {

  println("start")

  def main(args: Array[String]){

    var first : Node = null
    var second : Node = null

    val neo: GraphDatabaseService = new EmbeddedGraphDatabase("var/graphdb")
    var tx: Transaction = neo.beginTx()

    implicit def string2relationshipType(x: String) = DynamicRelationshipType.withName(x)

    try {
      first = neo.createNode()
      first.setProperty("name","first")

      second = neo.createNode()
      second.setProperty("name","second")

      first.createRelationshipTo(second, "isRelatedTo" : RelationshipType)

      tx.success()
      println("added nodes")
    } catch {
      case e: Exception => println(e)
    } finally {
      tx.finish() // wrap in try, finally   

      println("finished transaction 1")
    }

    tx = neo.beginTx()

    try{

      val trav : Traverser = first.traverse(Traverser.Order.BREADTH_FIRST,
                                            StopEvaluator.END_OF_GRAPH,
                                            ReturnableEvaluator.ALL,
                                            "isRelatedTo" : RelationshipType,
                                            Direction.BOTH)

      for(node <- trav){
        println(node.getProperty("name"))
      }
      tx.success()
    } finally {
      tx.finish()
      println("finished transaction 2")
    }

    neo.shutdown()

    println("done")
  }
}

[repost ]Neo4j’s Cypher internals – Part 2: All clauses, more Scala’s Parser Combinators and query entry point

original:http://ahalmeida.com/2011/09/22/neo4js-cypher-internals-part-2-all-clauses-more-scalas-parser-combinators-and-query-entry-point/

Recalling the first post

During the previous post, I’ve explained what is Neo4j and then, explained how graph traversal could be done on Neo4j using the Java API. Next, I’ve introduced Cypher and how it helped write queries, in order to retrieve data from the graph. After introducing Cypher’s syntax, we dissected the Start Clause, which is the start point (duh) for any query being written on Cypher. If you hadn’t read it, go there, and then come back to read this one.

In this second part, I’ll show the other clauses existents in Cypher, the Match, Where, Return, Skip and Limit, OrderBy and Return. Some will be simple, some not and I’ll go in a more detailed way on those clauses that aren’t so trivial. After that, we will take a look at the Cypher query entry point, and how the query parsing is unleashed.

Nuff said, let’s get down to business.

The other clauses

At the moment of this writing, Neo4j is composed by 6 clauses: Start (seen in the first part of this series), Match, Where, Return, OrderBy and Skip Limit. Recalling the first post, we used a sample Cypher query to show its syntax and it also showed all 6 clauses in action. Let’s see it again:

start programmer=(3) match (programmer)-[:PAIRED]->(pair) where pair.age > 30 return pair, count(*) order by age skip 5 limit 10

Since start clause was already explained, let’s go, clause by clause, form left-to-right, starting by the match clause.

The Match Clause

Probably the most complicated clause. It’s implementation may be somewhat confusing at a first sight, however, looking calmly, it is not that scary. So, let’s go step by step.

First of all, the Match Clause is where we define patterns of paths we want to query information for. These paths can be directed, we can identify them, it is possible to define the relationship types of these paths and so on. Taking a look at our sample query, we can highlight only the match part of the query:

match (programmer)-[:PAIRED]->(pair)

This is a simple case, where we are defining the path from the programmer with the PAIRED relationship incoming at a pair. So far so good, however, this is easy to get complex, for instance, if we don’t care about one side of the relationship, we can omit its identifier:

match (programmer)-[:PAIRED]->()

And if we care about the relationship, as in case of returning it (more about returns in the future), we can identify it:

match (programmer)-[relationship:PAIRED]->()

It is possible to define several patterns and multiple relationships, such as:

match (programmer)-[:PAIRED]->(pair)<-[:FRIEND]-(rose)

Which in this case matches the programmer that paired with someone who is Rose’s friend.

In order to we get it in a simple way, we check the definition of the methods at the MatchClause trait, so we can see how this grammar is defined:

 

01 trait MatchClause extends JavaTokenParsers with Tokens {
02
03   def matching: Parser[(Match, NamedPaths)] = ignoreCase("match") ~> rep1sep(path,",")
04
05   def path: Parser[Any] = pathSegment | parenPath | noParenPath
06
07   def parenPath: Parser[NamedPath] = identity ~ "=" "(" ~ pathSegment ~ ")"
08
09   def noParenPath: Parser[NamedPath] = identity ~ "=" ~ pathSegment
10
11   def pathSegment: Parser[List[Pattern]] = node ~ rep1(relatedTail)
12
13   def node: Parser[Option[String]] = parensNode | relatedNode
14
15   def parensNode: Parser[Option[String]] = "(" ~> opt(identity)
16   def relatedNode: Parser[Option[String]] = opt(identity)
17
18   def relatedTail = opt(" relationshipInfo
19   def relationshipInfo = opt(identity) ~ opt("?") ~ opt(":" ~> identity) ~ opt("^" ~ wholeNumber ~ ".." ~ wholeNumber)
20 }

 

We can see the entry point of the match clause, defined by the matching method, at line 3, which expects a matchkeyword followed by a repetition of path, whose definition is at line 5, which is defined by either a pathSegment,parenPath or a noParenPath.

For those that don’t know Scala very well, the Option means that the object is optional, and if it not exists, when we have the possibility to return an alternative object.

The parenPath and noParenPath are easy, they are paths defined either with or without parenthesis. While thepathSegment is defined by a node followed by a repetition of a relatedTail. The node is defined by optional parenthesis and identifier.

The relatedTail parser is where the relationship itself is defined. With an optional incoming (<) direction, a mandatory – (dash), an optional relationshipInfo, which must appear inside brackets and followed by another dash, an optional outgoing (>) direction, also followed by a node definition.

Finishing the grammar definition, the relationshipInfo is defined by an optional relationship identifier (in case you want to refer to it in the future, for instance, returning it), followed by an also optional question mark, which identify an optional relationship. Following, you can define the relationship name you want to match and it is followed by an optional ^ (circumflex) sign with two numbers seperated by “..”, which is used to identify hops that can be matched, for instance ^1..4 (from 1 to 4 hops).

Phew, huge syntax, means… huge transformations into an AST. Let’s get to it, starting from the bottom to the top.

First, let’s take a look at the relationshipInfo method’s definition:

 

01 def relationshipInfo: Parser[(Option[String], Option[String], Option[(Int, Int)], Boolean)] = opt(identity) ~ opt("?") ~ opt(":" ~> identity) ~ opt("^" ~ wholeNumber ~ ".." ~ wholeNumber) ^^ {
02
03   case relName ~ optional ~ Some(relType) ~ None => (relName, Some(relType), None, optional.isDefined)
04
05   case relName ~ optional ~ Some(relType) ~ Some("^" ~ minHops ~ ".." ~ maxHops) => (relName, Some(relType), Some((minHops.toInt, maxHops.toInt)), optional.isDefined)
06
07   case relName ~ optional ~ None ~ Some("^" ~ minHops ~ ".." ~ maxHops)  => (relName, None, Some((minHops.toInt, maxHops.toInt)), optional.isDefined)
08
09   case relName ~ optional ~ None ~ None => (relName, None, None, optional.isDefined)
10 }

 

The method returns a parser of a tuple containing 4 informations, which will be retrieved through the transformation applied on the input. The pattern on the first case, checks if the relationship name is passed, the question mark and the relationship type have been informed and the hops part of the match was omitted. If so, it builds a tuple containing all these informations retrieved in the case. The other 3 cases are variations of the first one, checking the presence and absence of other informations and transforming them into the same Scala tuple.

The next one is the relatedTail method, which defines the relationship direction and the right side node. This parser returns a tuple with informations regarding the direction and also all of the previous relationshipInfoparser.

 

1 def relatedTail = opt(" relationshipInfo     case back ~ "-" ~ relInfo ~ "-" ~ forward ~ end => relInfo match {
2     case Some((relName, relType, varLength, optional)) => (back, relName, relType, forward, end, varLength, optional)
3     case None => (back, None, None, forward, end, None, false)
4   }
5 }

 

The node itself is defined by the following parsers:

 

1 def node: Parser[Option[String]] = parensNode | relatedNode
2
3 def parensNode: Parser[Option[String]] = "(" ~> opt(identity)  throw newSyntaxException("Matching nodes without identifiers have to have parenthesis: ()")
4   case => x
5 }

 

At above’s code, we can see that a node can be defined with or without parenthesis. However, when not caring about the node, thus, not identifying it, you must at least use the parenthesis, otherwise, SyntaxException will be thrown.

Now, it is possible to take a look at how the paths are transformed. At path definition, we saw that it can be composed by either path with and without parenthesis or a pathSegment.

 

1 def path: Parser[Any] = pathSegment | parenPath | noParenPath
2
3 def parenPath: Parser[NamedPath] = identity ~ "=" "(" ~ pathSegment ~ ")" ^^ {
4   case p ~ "=" "(" ~ pathSegment ~ ")" => NamedPath(p, pathSegment: _*)
5 }
6
7 def noParenPath: Parser[NamedPath] = identity ~ "=" ~ pathSegment ^^ {
8   case p ~ "=" ~ pathSegment => NamedPath(p, pathSegment: _*)
9 }

 

Looking at the code, both parenPath and noParenPath parsers are doing simple transformations to createNamedPath objects with the identity (the p variable) and the pathSegment, which contains all the information of the path.

The pathSegment parser is a little more complicated and it parses for the node followed by the repetition of therelatedTail (already defined above).

 

01 def pathSegment: Parser[List[Pattern]] = node ~ rep1(relatedTail) ^^ {
02   case head ~ tails => {
03     var fromNode = namer.name(head)
04
05     val list = tails.map(_ match {
06       case (back, rel, relType, forward, end, varLength, optional) => {
07         val toNode = namer.name(end)
08           val dir = getDirection(back, forward)
09
10         val result: Pattern = varLength match {
11           case None => RelatedTo(fromNode, toNode, namer.name(rel), relType, dir, optional)
12           case Some((minHops, maxHops)) => VarLengthRelatedTo(namer.name(None), fromNode, toNode, minHops, maxHops, relType, dir, optional)
13         }
14
15         fromNode = toNode
16
17         result
18       }
19     })
20
21     list
22   }
23 }

 

The first thing is to match the node part and the relatedTail, interestingly identified as head and tails, respectively. Then it finds the name of the node. However, remember that it can be unnamed, which in this case, a little trick is done inside the name method at line 3.

 

01 class NodeNamer {
02   var lastNodeNumber = 0
03
04   def name(s: Option[String]): String = match {
05     case None => {
06       lastNodeNumber += 1
07       "  UNNAMED" + lastNodeNumber
08     }
09     case Some(x) => x
10   }
11 }

 

The NodeNamer class creates a counter for the unnamed nodes, so it can uniquely name them, and if the node is already named, it will return the given name itself. Small curiosity here, is that one might identify a node as UNNAMED1 and also not identify another node in the query, which will make two nodes being called UNNAMED1 ;) .

Continuing on the pathSegment method, between line 5 and 19 the list of the related tails is transformed into a list of Pattern (the result val created at line 10). These results are aggregated by the map method and in the end, is returned to the list val at line 5, which is returned at line 21.

All of these operations are unleashed by the matching clause, which transforms all the previously parsed pieces in a tuple of Match and NamedPaths:

 

1 def matching: Parser[(Match, NamedPaths)] = ignoreCase("match") ~> rep1sep(path,",") ^^ {
2   case matching => {
3     val unamedPaths: List[Pattern] =matching.filter(_.isInstanceOf[List[Pattern]]).map(_.asInstanceOf[List[Pattern]]).flatten
4     val namedPaths: List[NamedPath] =matching.filter(_.isInstanceOf[NamedPath]).map(_.asInstanceOf[NamedPath])
5
6     (Match(unamedPaths: _*), NamedPaths(namedPaths: _*))
7   }
8 }

 

Phew… long one! Tired? C’mon, there is more clauses to tackle, but thankfully, the most extense is over.

The Where Clause

Ahh the where clause. With their longs 3 lines of code, seems very simple and think that can fool us, take a look:

 

1 trait WhereClause extends JavaTokenParsers with Tokens with Clauses {
2   def where: Parser[Clause] = ignoreCase("where") ~> clause
3 }

 

That’s it. But digging deeper, what the heck is clause? In fact, it is a definition derived by the Clauses trait, that is almost self explanatory, so let’s check just the main clause:

 

1 def clause: Parser[Clause] = (orderedComparison | not | notEquals | equals | regexp | hasProperty | parens | sequenceClause) * (
2   ignoreCase("and") ^^^ { (a: Clause, b: Clause) => And(a, b)  } |
3   ignoreCase("or") ^^^  { (a: Clause, b: Clause) => Or(a, b) }
4   )

 

It takes possibilities for the where clause, such as >, >=, not equals (that can be both <> or != and so on. The *combinator, means that it will be interleaved with the right side (the and/or) and the right side will determine how the transformation will be made, in that case, in a And or Or objects, with both left and right side clauses. By the way, the ^^^ transformation operator, means that it is changing the successful result by what is following it, in this case, the And or Or objects.

You can take a look at the Clauses trait in the repository for more details on the syntax. There is no other new things on the trait, so, by now, it might be a good exercise to do.

The Return Clause

How Cypher parses what we want our query to return? The answer for this question is in the ReturnClause trait. It is a simple parse, without tricks, so let’s get into the trait:

 

1 trait ReturnClause extends JavaTokenParsers with Tokens with ReturnItems {
2
3   def returns: Parser[(Return, Option[Aggregation])] = ignoreCase("return") ~> opt("distinct") ~ rep1sep((aggregate | returnItem), ",")

 

The returns method is defined by a return keyword, followed by an optional distinct definition with a repetition of an aggregate or a returnItem, which are defined in the ReturnItems trait, which defines the aggregate functions, such as min(), max(), avg() and so on and the possible returnValues, such as an entity, a value or anullableProperty.

 

01 trait ReturnItems extends JavaTokenParsers with Tokens with Values {
02   def returnItem: Parser[ReturnItem] = returnValues ^^ {
03     case value => ValueReturnItem(value)
04   }
05
06   def returnValues: Parser[Value] = nullableProperty | value | entityValue
07
08   def countFunc: Parser[AggregationItem] = ignoreCase("count") ~> parens(returnValues) ^^ { case inner => ValueAggregationItem(Count(inner)) }
09   def sumFunc: Parser[AggregationItem] = ignoreCase("sum") ~> parens(returnValues) ^^ { case inner => ValueAggregationItem(Sum(inner)) }
10   def minFunc: Parser[AggregationItem] = ignoreCase("min") ~> parens(returnValues) ^^ { case inner => ValueAggregationItem(Min(inner)) }
11   def maxFunc: Parser[AggregationItem] = ignoreCase("max") ~> parens(returnValues) ^^ { case inner => ValueAggregationItem(Max(inner)) }
12   def avgFunc: Parser[AggregationItem] = ignoreCase("avg") ~> parens(returnValues) ^^ { case inner => ValueAggregationItem(Avg(inner)) }
13   def countStar: Parser[AggregationItem] = ignoreCase("count") ~> parens("*") ^^ {case "*" => CountStar()  }
14
15   def aggregate:Parser[AggregationItem] = (countStar | countFunc | sumFunc | minFunc | maxFunc | avgFunc)
16 }

 

Getting back to the ReturnClause trait, let’s take a look at the complete code and examine the transformation:

 

01 def returns: Parser[(Return, Option[Aggregation])] = ignoreCase("return") ~> opt("distinct") ~ rep1sep((aggregate | returnItem), ",") ^^
02   case distinct ~ items => {
03     val list =items.filter(_.isInstanceOf[AggregationItem]).map(_.asInstanceOf[AggregationItem])
04
05     val none: Option[Aggregation] = distinct match {
06       case Some(x) => Some(Aggregation())
07       case None => None
08     }
09
10     (
11       Return(items.filter(!_.isInstanceOf[AggregationItem]): _*),
12       list match {
13         case List() => none
14         case _ => Some(Aggregation(list : _*))
15       }
16     )
17   }}

 

The transformation creates a list of AggregationItems and assign it to the list val at line 3. Then it does a matching against the distinct part, and if informed, assign an aggregation to the none val, otherwise, it will assign aNone, which is an object that represents no value, in Scala.

At line 11, it prepares the result produced by the parser, which must by a tuple between a Return object and theOption[Aggregation] indicating that the distinct was or wasn’t set.

In regard to the Return, it takes all items defined to be returned and returns only those that aren’t aggregation items. While all the aggregation items are store now in the list val, which is matched at line 12. In case of being an empty list, just return the result of the aggregation checked, otherwise, take all the AggregationItems, and store it inside the Aggregation object to be returned within the tuple. The result will be a (Return, Option[Aggregation]) tuple.

The OrderBy Clause

We can always define how the results will be ordered and by which results. Parsing the order by part of the query is the job of the OrderByClause trait, which defines the order method.

 

1 def order: Parser[Sort] = ignoreCase("order by")  ~> rep1sep(sortItem, ",") ^^
2     {
3       case items => Sort(items:_*)
4     }

 

Nothing new here, therefore, let’s check the sortItem definition.

 

1 def sortItem :Parser[SortItem] = (aggregate | returnItem) ~ ascOrDesc ^^ {
2   case returnItem ~ reverse => {
3     returnItem match {
4       case ValueReturnItem(EntityValue(_)) =throw new SyntaxException("Cannot ORDER BY on nodes or relationships")
5       case _ => SortItem(returnItem, reverse)
6     }
7   }
8 }

 

The aggregate is where we can use a sum() or max() function to return something, while a returnItem can be anullableProperty or an identifier, and after it, is verified if the ordering must be asc or desc.

In the case of the returnItem being a node or a relationship, the SyntaxException is thrown, since it doesn’t make sense on ordering in anything that is not an aggregate or a property.

In the end, the Sort object with the items to where the sorting will be performed is generated to the AST.

The Skip and Limit Clause

A simple one to finish this combo. The <>SkipLimitClause is the simplest clause amongst all the other ones. It’s composed by to definitions, the skip and the limit:

 

1 trait SkipLimitClause extends JavaTokenParsers with Tokens {
2   def skip: Parser[Int] = ignoreCase("skip") ~> positiveNumber ^^ { case startAt => startAt.toInt }
3
4   def limit: Parser[Int] = ignoreCase("limit") ~> positiveNumber ^^ { case count => count.toInt }
5 }

 

Taking a look at the code, we can see in both instructions, the keywords (skip and limit) being ignored and we can also see that it demands a positive number. And the transformation only converts the result, which is a String, to an Int.

The entry point, where everything starts. Also, where everything ends

Now that we know how each clause is defined, it is possible put them in sequence. So, in the query first will come the Start clause, then the Match clause and so on. This is one of the responsibilities of the CypherParser class, which is defined as:

 

01 class CypherParser extends JavaTokenParsers
02 with StartClause
03 with MatchClause
04 with WhereClause
05 with ReturnClause
06 with SkipLimitClause
07 with OrderByClause
08 with StringExtras {
09    // code will come here
10 }

 

Note that the CypherParser class is composed by all of the clause’s before discussed traits, therefore, making possible the use of the definitions made on each clause trait. Continuing on the CypherParser class, we have thequery method, which uses the pieces defined before:

 

1 def query: Parser[Query] = start ~ opt(matching) ~ opt(where) ~ returns ~ opt(order) ~ opt(skip) ~ opt(limit)

 

Taking a look at the query method, we can see that the whole query itself, is defined by a mandatory start clause, an optional match clause and where, followed by a mandatory return clause and an optional skip and limitclauses. All of these small methods were explained before in this post (again, the start was explained on the first post).

Now that the query is defined, we should transform it in a AST and the way this is done in Cypher is as the following:

 

01 def query: Parser[Query] = start ~ opt(matching) ~ opt(where) ~ returns ~ opt(order) ~ opt(skip) ~ opt(limit) ^^ {
02
03   case start ~ matching ~ where ~ returns ~ order ~ skip ~ limit => {
04     val slice = (skip,limit) match {
05       case (None,None) => None
06       case (s,l) => Some(Slice(s,l))
07     }
08
09     val (pattern:Option[Match], namedPaths:Option[NamedPaths]) = matching match {
10       case Some((p,NamedPaths())) => (Some(p),None)
11       case Some((Match(),nP)) => (None,Some(nP))
12       case Some((p,nP)) => (Some(p),Some(nP))
13       case None => (None,None)
14     }
15
16     Query(returns._1, start, pattern, where, returns._2, order, slice, namedPaths)
17   }
18 }

 

One step at a time. First thing that is done is the pattern matching against the result, therefore, checking that everything is in place. Then, some specific matches are done. The first one against a (skip, limit) tuple at line 4. If none of them is passed, a None object is returned and assigned to the slice val at line 4. Otherwise, a Someobject containing a Slice with the skip and limit values is assigned to slice. The Slice class definition is as simple as:

 

1 case class Slice(from: Option[Int], limit: Option[Int])

 

The second operation being done on the code, starting at line 9, deals with the match clause that was done. It matches if the match clause was done with and/or without named paths and patterns. If both have been used, it creates a Some object containing both the pattern and the named paths used, if just one of them is defined, the unused one will become a None object. In the end, both will be assigned to a tuple representing the pattern used and the named paths used.

Then, a Query object is constructed, with all the results of the before applied parsers. And we now have a complete AST, with the Query as its starting point. We can see the Query class definition:

 

1 case class Query(returns: Return, start: Start, matching: Option[Match], where:Option[Clause], aggregation: Option[Aggregation],
2                  sort: Option[Sort], slice: Option[Slice], namedPaths:Option[NamedPaths])

 

The only missing piece to reach at the CypherParser class is the parse(queryText: String) method, whose work is to simply delegate the queryText processing to Scala’s Parser Combinators library. In case of success, the result (the Query object) is returned. If it is not successful, the appropriate error message is used to indicate that the parsing process failed. Below, is the complete code for the parse method:

 

01 @throws(classOf[SyntaxException])
02 def parse(queryText: String): Query = {
03   val MissingQuoteError = """`\.' expected but `.' found""".r
04   val MissingStartError = """string matching regex `\(\?i\)\\Qstart\\E' expected.*""".r
05   val WholeNumberExpected = """string matching regex `\\d\+' expected.*""".r
06   val StringExpected = """string matching regex `'\(\[\^'\\p\{Cntrl\}\\\\\]\|\\\\\[\\\\\/bfnrt\]\|\\\\u\[a-fA-F0-9\]\{4\}\)\*'' .*""".r
07
08   parseAll(query, queryText) match {
09     case Success(r, q) => r
10     case NoSuccess(message, input) => message match {
11       case MissingQuoteError() => fail(input, "Probably missing quotes around a string")
12       case MissingStartError() => fail(input, "Missing START clause")
13       case WholeNumberExpected() => fail(input, "Whole number expected")
14       case StringExpected() => fail(input, "String literal expected")
15       case "string matching regex `-?\\d+' expected but `)' found" => fail(input,"Last element of list must be a value")
16       case "string matching regex `(?i)\\Qreturn\\E' expected but end of source found" =throw new SyntaxException("Missing RETURN clause")
17       case _ =throw new SyntaxException(message)
18     }
19   }
20 }

 

Now we have a complete parser. Which does parse the whole query and generates an AST or an error message if something went wrong during the parsing process.

Coming up next

After the parsing process happened successfully, we have built all the AST. Which means, it is now possible to take some action over it. However, we are not sure that the generated AST will be what we were expecting. We are not sure that the parser works correctly (it works, but just pretend by now that we are not sure ;) ). In the next part, I’ll cover how the unit testing of the AST was made and how Neo4j guys did an amazing job to generate documentation through the tests.

[repost ]How Neo4j uses Scala’s Parser Combinator: Cypher’s internals – Part 1

original:http://ahalmeida.com/2011/09/06/how-neo4j-uses-scalas-parser-combinator-cyphers-internals-part-1/

Introduction

I think that most of us, software developers, while kids, always wanted to know how things were made by inside. Since I was a child, I always wanted to understand how my toys worked. And then, what I used to do? Opened’em, sure. And of course, later, I wasn’t able to re-join its pieces properly, but this is not this post subject ;) . Well, understanding how things works behind the scenes can teach us several things, and in software this is no different, and we can study how an specific piece of code was created and mixed together with other code.

In this series of posts I’ll share what I’ve found insideNeo4J implementation, specifically, at Cypher’s code (its query language).

In this first part, I’ll briefly introduce Neo4J and Cypher and then I’ll start to explain the internals of its parser and how it works. Since it is a long (very very long subject, in fact), part 2 and subsequents are coming very very soon.

And BTW, this is a very long post, so, if you prefer, go grab a nice cup of coffee or tea, a jar of juice, something to eat, and then, come back to read.

First of all, what is Neo4J?

We are all very used to the relational paradigm, however, everyday, non relational databases receives more and more attention from the community. That happens for several reasons, including, better data modeling, improved scalability, sky rocketing performance enhancements and so on. Non relational databases became popular by the name of NOSQL, and several implementations exists, such as RiakRedisCouchDBMongoDB and lots of others.

Each one of these implementations do have its particularities and some are better suited for a kind of problem and others are better suited to another kind of problems. Within these implementations, we can highlight Neo4J, which is a fully ACID compliant graph database, which means your data are as graphs (with nodes, relationships and properties on them), thus, having high speed through graph traversals (and we all know that storing graph like data inside a relational database is not a good idea).

For a more interesting introduction to Neo4J, you can check Emil Eifrem’s presentation and this nice InfoQ post.

How to find data on the graph?

So, let’s consider the following problem: “I want to know who are all the employees that paired programmed with a given programmer”.

In order to solve retrieve informations from the graph, Neo4J provides an API which allows us to traverse the graph and retrieve this information. With this traversal API, we can write Java code to describe how we want to traverse the graph:

 

TraversalDescription description = Traversal.description()
    .breadthFirst()
    .relationships(Relationships.PAIRED, Direction.OUTGOING)
    .evaluator(Evaluators.excludeStartPosition());

description.traverse( startNode ); // Retrieves the traverser

 

Besides the traversal described above works pretty well, people may seem confused with all these Java code describing the traversal, and those who want to use Neo4J within JRuby, for instance, might have to adapt its code in order to achieve the same result. Another issue may be a Sysadmin wanting to perform ad-hoc queries on the database. How make this easily possible, without having to write Java code? Huumm, ohh well, why not we have a single query language which we can define traversals. Then comes Cypher, the Neo4J query language.

With Cypher, above’s traversal would be described as:

start programmer=(3) match (programmer)-[:PAIRED]->(pair) return pair

In this example, 3 means the starting node’s id of our query. We are also matching the relationships called PAIRED outgoing from the programmer and labeling the incoming node as pair. In the end, we’re telling we want to return all the retrieved pairs. That’s it.

We can even do more complex things on our traversal, for instance, we can apply filters to values, ordering, aggregation, pagination and so on. Thus, we can retrieve only the pairs whose age are more than 30 years (considering that we have an age property).

start programmer=(3) match (programmer)-[:PAIRED]->(pair) where pair.age > 30 return pair, count(*) order by age skip 5 limit 10

Imagine doing all these things, writing the Java code. Sure it is possible, however, not so straightforward as Cypher. And believe me, there is much more combinations and complex queries you can do.

Defining Cypher

Cypher is an external DSL, just like HQL (Hibernate Query Language) and the AspectJ syntax, for instance. Therefore, somewhere it needs to be parsed, its correctness must be checked and then it should be executed, in Cypher’s case, retrieving the results we asked for.

But, in order to write an external DSL, we should define its grammar, which will give the guidelines of how the language is supposed to be structured and what is and isn’t valid. In order to express this definition, we can use some variation of EBNF syntax, which provides us a clear way to expose the language definition.

First of all, let’s choose a small subset of Cypher to describe, which will be the focus of this first post, in this case, the start clause, where we have defined the id we wanted to retrieved and we named our starting pointprogrammer. Taking a look at the EBNF definition, we would have something like the following:

start ::= "start" {"," nodeByIds | nodeByIndex | nodeByIndexQuery | relsByIds | relsByIndex }

nodeByIds ::= identity "=" "(" {"," number} ")"

nodeByIndex ::= identity "=" "(" identity "," identity "," string ")"

nodeByIndexQuery ::= identity "=" "(" identity "," string ")"

relsByIds ::= identity "=" "<" { "," number } ">"

relsByIndex ::= identity "=" "<" identity "," identity "," string ">"

Above, identity is an identifier, for instance, the “programmer” identifier, “{}” means that whatever is inside, can appears repeatedly and with a separator, while | is denoting alternative production.

With that information, now we know that in order to write a proper start clase, we can’t use semi-colon as a separator, we must use parenthesis to group the ids, and we can have as much ids as we want. Easy right?

Now that we have a grammar defined for the start clause, we must have a way put it into action, which mean, we must parse a given instruction, check its validity and then take some action with what instruction was informed.

Parsing, Cypher and Parser Combinators

One possibility to parse the query is to use a parser generator tool, like Antlr or YACC. However, some functional programming languages, such as Scala and Haskell, provides an embedded tool to make possible to parse these languages. That tool is known as Parser Combinator. In this case, Cypher uses Scala’s Parser Combinator library. If you don’t have any familiarity with Parsers and Parser Combinators, you can check this excellent article from Odersky and Moors.

Remember the start clause previously described? We can describe it in a very similar way in Scala. In fact, Neo4J’s Cypher codebase have a trait, which is very similar to a Java interface, called StartClause, with the definition we did above, with just a few changes on the syntax we used.

 

def start: Parser[Start] = ignoreCase("start") ~> rep1sep(nodeByIds | nodeByIndex | nodeByIndexQuery | relsByIds | relsByIndex, ",")

def nodeByIds = identity ~ "=" ~ "(" ~ rep1sep(wholeNumber, ",") ~ ")"

def nodeByIndex = identity ~ "=" ~ "(" ~ identity ~ "," ~ identity ~ "," ~ string ~ ")"

def nodeByIndexQuery = identity ~ "=" ~ "(" ~ identity ~ "," ~ string ~ ")"

def relsByIds = identity ~ "=" ~ "
def relsByIndex = identity ~ "=" ~ "

 

The code above looks very similar to the EBNF alike we did before, we just translated the syntax to Scala language, except about a few nuances. For instance, in this example, we use the ~ method, that represents a sequence, before we represented the sequence, simply placing elements from left to right. We also used the rep1sep method means a repetition with a separator (the same effect of “{}” on the EBNF example).

However, where are these ~ and rep1sep methods defined? In fact, looking at the StartClause trait definition, we have:

 

trait StartClause extends JavaTokenParsers with Tokens {
  // code here
}

 

The JavaTokenParsers trait itself, comes from the scala.util.parsing.combinator.Parsers trait (keep that in mind, it will be very important in the future), which defines ~ and the rep1sep amongst some other methods which we’ll talk later.

So far, so good, but I’ve cheated a little bit, the history (and the code) is not yet complete.

While parsing the given input (a query sample) against the defined grammar, we can get back the results, so we can process it later, for instance, we can grab the identifier used in the start clause, node ids and so on, and then, with these information, it is possible to perform the desired traversal.

Scala’s Parser Combinator allows us to grab all these informations in the form of a Semantic Model, which can better describe these informations, thus, instead of grabbing bare Strings and a List of node ids, it is possible transform them in an object, representing these informations, for instance, in the case of a query by ids, we could get an instance of a NodeById class, or in the case of querying by index, we could get a NodeByIndex instance. And Hell Yeah, we can do this, by using the ^^, which allow us to apply a function against the parser result, allowing us to build the Semantic Model we want as the AST (Abstract Syntax Tree). In this case, the portion of the tree that represents querying by a node id, is an object of the NodeById class.

We can use the ^^ operator in a pattern matching way, matching the parts of the query we want, in order to form any other result, which in the case of the nodeByIds method in Cypher’s StartClause, is the following:

 

def nodeByIds = identity ~ "=" ~ "(" ~ rep1sep(wholeNumber, ",") ~ ")" ^^ {
  case varName ~ "=" ~ "(" ~ id ~ ")" => NodeById(varName, id.map(_.toLong).toSeq: _*)
}

 

What happens in above code is that, it matches against what was got during the parsing, and varName will be the variable name used on the query, while the id is a List of ids, which later gets transformed to an splat (varargs) of Longs, in the somewhat strange _* if your are not used to Scala, and passed to the NodeById constructor. TheNodeById instance is then returned from the nodeByIds method.

However, since the class StartClause is composed by the scala.util.parsing.combinator.Parsers trait, an implicit conversion defined there, will promote the NodeById object that is returning to an object of Parser[NodeById], so it can keep parsing the rest of the input (the match clause, where clause and so on, recursively, other parts, that will be covered on the next post).

Taking a look at the NodeById class, we can see its definition, inside the StartItem.scala file:

 

case class NodeById(varName:String, id: Long*) extends NodeStartItem(varName)

abstract class NodeStartItem(varName:String) extends StartItem(varName)

abstract sealed class StartItem(val variable:String)

 

Notice 2 important things here: First, that these classes has no behavior, they just hold the data for the Semantic Model. And looking at the same file, is possible to find all the other classes used to form the Semantic Model of the Start Clause, such as, NodeByIndexRelationshipById and so on. And second, there is a hierarchy between them. The NodeById class extends the NodeStartItem which comes from the StartItem. Looking further in the StartItem.scala file, we see that all classes that form this Semantic Model, follow this hierarchy. The complete code of the StartItem.scala file, looks like:

 

abstract sealed class StartItem(val variable:String)

abstract class RelationshipStartItem(varName:String) extends StartItem(varName)

abstract class NodeStartItem(varName:String) extends StartItem(varName)

case class RelationshipById(varName:String, id: Long*) extends RelationshipStartItem(varName)

case class NodeByIndex(varName:String, idxName: String, key:String, value: Any) extends NodeStartItem(varName)

case class NodeByIndexQuery(varName:String, idxName: String, query: Any) extends NodeStartItem(varName)

case class RelationshipByIndex(varName:String, idxName: String, key:String, value: Any) extends RelationshipStartItem(varName)

case class NodeById(varName:String, id: Long*) extends NodeStartItem(varName)

 

This complete code, makes easier to see the hierarchy existing between the classes involved in the AST definition.

Back to the StartClause trait

Taking a further look at the other methods inside the StartClause trait, such as, the nodeByIndex,nodeByIndexQuery, the rules are the same (except for the start method, that I’ll talk next).

The start method follow the same rules as the other ones, however, some soe far unexplained things appears on it, which is the ~> and | combinators.

 

def start: Parser[Start] = ignoreCase("start") ~> rep1sep(nodeByIds | nodeByIndex | nodeByIndexQuery | relsByIds | relsByIndex, ",") ^^ (Start(_: _*))

 

The ~> is the selective sequence combinator, which in this case, means, “ignore what is immediately in the left”, in this case, the “start” keyword, which doesn’t mean anything on our Semantic Model. After this, we have a repetition of nodeByIds or nodeByIndex or nodeByIndexQuery and so on, all using a comma separator. Notice that we are combining the five parsers right now (Someone told Parser Combinators?) in order to create the whole startclause.

The last thing the method does is, to transform the list of instances of StartItem (remember the hierarchy defined on the StartItem.scala file?) into a splat (varargs) and pass them to the Start constructor.

A small clarification, for those not used to Scala, is the Start(_: _*). What this code does is, call the Start class constructor, passing to it an splat (did you see the _*?) of _, which in this case, represents a list of StartItems.

And that’s it, now we have the start clause of Cypher demystified. And we even understand how it is composed and works. Now we are half way to understand how the other parts of Cypher are composed. Phew!!!

1st part’s conclusion

At this first part, I hope it was possible to understand how a small subset of Cypher is defined, and how it uses Scala’s parser combinator for it. But there’s a lot of other things to see, such as, how the others clauses of Cypher are tied together, how error reporting is done, how the query is executed after the parse process happens and so on.

Special thanks to @porcelli, for some kind suggestions on this post.

[repost ]Scalding for the Impatient

original:http://sujitpal.blogspot.com/2012/08/scalding-for-impatient.html

Few weeks ago, I wrote about Pig, a DSL that allows you to specify a data processing flow in terms of PigLatin operations, and results in a sequence of Map-Reduce jobs on the backend. Cascading is similar to Pig, except that it provides a (functional) Java API to specify a data processing flow. One obvious advantage is that everything can now be in a single language (no more having to worry about UDF integration issues). But there are others as well, as detailed here and here.

Cascading is well documented, and there is also a very entertaining series of articles titled Cascading for the Impatient that builds up a Cascading application to calculate TF-IDF of terms in a (small) corpus. The objective is to showcase the features one would need to get up and running quickly with Cascading.

Scalding is a Scala DSL built on top of Cascading. As you would expect, Cascading code is an order of magnitude shorter than equivalent Map-Reduce code. But because Java is not a functional language, implementing functional constructs leads to some verbosity in Cascading that is eliminated in Scalding, leading to even shorter and more readable code.

I was looking for something to try my newly acquired Scala skills on, so I hit upon the idea of building up a similar application to calculate TF-IDF for terms in a corpus. The table below summarizes the progression of the Cascading for the Impatient series. I’ve provided links to the original articles for the theory (which is very nicely explained there) and links to the source codes for both the Cascading and Scalding versions.

Article[1] Description #-mappers #-reducers Code[2]
Part 1 Distributed File Copy 1 0
Part 2 Word Count 1 1
Part 3 Word Count with Scrub 1 1
Part 4 Word Count with Scrub and Stop Words 1 1
Part 5 TF-IDF 11 9

[1] – points to “Cascading for the Impatient Articles”
[2] – Cascading version links points to Paco Nathan’s github repo, Scalding versions point to mine.

The code for the Scalding version is fairly easy to read if you know Scala (somewhat harder, but still possible if you don’t). The first thing to note is the relative sizes – Scalding code is shorter and more succint than the Cascading version. The second thing to note is that the Scalding based code uses method calls that are not Cascading methods. You can read about the Scalding methods in the API Reference (I used the Fields-based reference exclusively). The tutorial and example code in the Scalding distribution is also helpful.

Project Setup

As you can see, I created my own Scala project and used Scalding as a dependency. I describe the steps here so you can do the same if you are so inclined.

Assuming you are going to be using Scalding in your applications, you need to download and build the Scalding JAR, then publish it to your local (or corporate) code repository (sbt uses ivy2). To do this, run the following sequence of commands:

1
2
3
4
sujit@cyclone:scalding$ git clone https://github.com/twitter/scalding.git
sujit@cyclone:scalding$ cd scalding
sujit@cyclone:scalding$ sbt assembly # build scalding jar
sujit@cyclone:scalding$ sbt publish-local # to add to local ivy2 repo.

Scalding also comes with a ruby script scald.rb that you use to run Scalding jobs. It is quite convenient to use – it forces all arguments to be named (resulting in cleaner/explicit argument handling code) and allows switching from local development to hadoop mode using a single switch. It is available in the scripts subdirectory of your scalding download. To use it outside Scalding (ie, in your own project), you will need to soft link it to a directory in your PATH. Copying it does not work because it has dependencies to other parts of the Scalding download.

The next step is to generate and setup your Scalding application project. Follow these steps:

  1. Generate your project using g8iter - type g8 typesafehub/scala-sbt at the command line, and answer the prompts. Your project is created as a directory named by Scala Project Name.
  2. Move to the project directory - type cd scalding-impatient (in my case).
  3. Build a basic build.sbt - create a file build.sbt in the project base directory and populate it with the key value pairs for name, version and scalaVersion (blank lines between pairs are mandatory).
  4. Copy over scalding libraryDependencies - copy over the libraryDependencies lines from the scalding build.sbt file and drop it in to your project’s build.sbt. I am not sure if this is really necessary, or whether scalding declares its transitive dependencies which will be picked up with a single dependency declaration to scalding (see below). You may want to try omitting this step and see – if you succeed please let me know and I will update accordingly.
  5. Add the scalding libraryDependency - define the libraryDependency to the scalding JAR you just built and published. The line to add is libraryDependencies += “com.twitter” % “scalding_2.9.2″ % “0.7.3″.
  6. Rebuild your Eclipse files - Check my previous post for details about SBT-Eclipse setup. If you are all set up, then type sbt eclipse to generate these. Your project is now ready for development using Eclipse.

And thats pretty much it. Hope you have as much fun coding in Scala/Scalding as I did.

[repost ]Indexing into ElasticSearch with Akka and Scala

original:http://sujitpal.blogspot.com/2012/11/indexing-into-elasticsearch-with-akka.html
I just completed the Functional Programming Principles with Scala course on Coursera, taught by Dr Martin Odersky, the creator of Scala. Lately I’ve been trying to use Scala (instead of Java) for my personal projects, so I jumped at the opportunity to learn the language from its creator. While I did learn things about Scala I didn’t know before, the greatest gain for me was what I learned about Functional Programming principles. Before the course, I had thought of Scala as a better, leaner Java, but after the course, I am beginning to appreciate also the emphasis on immutability and recursive constructs that one can see in Scala code examples on the Internet.

Some time after I finished the course, I heard about the Typesafe Developer Contest. Developers are invited to submit applications that use one or more of the components in the Typesafe Stack (Scala, Akka and Play). While I have no illusion of winning any prizes here, I figured it would be a good opportunity for to practice what I had learnt, and to also expand my knowledge of Scala to include two of its most popular libraries.

The application I came up with was a Scala front end to ElasticSearch. ElasticSearch is a distributed, RESTful search engine built on top of Apache Lucene, and has been on my list of things to check out for a while now. Communication with ElasticSearch is via its very comprehensive JSON over HTTP REST interface.

This post will describe the indexing portion of the application. Indexing by nature is embarassingly parallel, so its a perfect candidate for an Akka Actor based concurrency. The system consists of a single Master actor which spawns a fixed number of Worker actors and a Reaper actor which shuts down the system once all documents are processed.

Data is supplied to the indexing system as a arbitarily nested tree of files on the local filesystem. For each dataset, one must provide a parser to convert the file into a Map of name value pairs, a FileFilter that decides which of the files to pick for indexing, and the schema to use for the dataset. I used the Enron Email Dataset for my development and testing.

The Master Actor is responsible for crawling the local filesystem and distributing the list of files among the Worker Actors. Each Worker Actor processes one file at a time and POSTs the results as a JSON string to ElasticSearch, then sends back the result to the Master, which updates its count of successes and failures. Once the number of successes and failures equal the number of original files sent for processing to the workers, the Master sends a signal to the Reaper and shuts itself down. The Reaper then sends a signal to shut down the entire system. The structure is based heavily on the Akka tutorial and the Shutdown Patterns in Akka2 blog post.

The diagram below shows the different actors and the message sequence. Actors are represented by ovals and the solid colored lines represent the messages (and their sequence) being passed to them. The dotted green lines with the diamond heads show how the components are instantiated.

Additionally, I use Play’s WebServices API to do the HTTP POST and PUT requests to ElasticSearch, and Play’s JSON API to parse and create JSON requests out of native Scala data structures. Here is the code for the Actor system.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package esc.index

import java.io.{FileFilter, File}

import scala.Array.canBuildFrom
import scala.collection.immutable.Stream.consWrapper
import scala.io.Source

import akka.actor.actorRef2Scala
import akka.actor.{Props, ActorSystem, ActorRef, Actor}
import akka.routing.RoundRobinRouter
import play.api.libs.json.Json
import play.libs.WS

object Indexer extends App {

  /////////////// start main /////////////////

  val props = properties(new File("conf/indexer.properties"))
  val server0 = List(props("serverName"), 
      props("indexName")).foldRight("")(_ + "/" + _)
  val server1 = List(props("serverName"), 
      props("indexName"), 
      props("mappingName")).foldRight("")(_ + "/" + _)

  indexFiles(props)

  /////////////// end main /////////////////

  def indexFiles(props: Map[String,String]): Unit = {
    val system = ActorSystem("ElasticSearchIndexer")
    val reaper = system.actorOf(Props[Reaper], name="reaper")
    val master = system.actorOf(Props(new IndexMaster(props, reaper)), 
      name="master")
    master ! StartMsg
  }

  //////////////// actor and message definitions //////////////////

  sealed trait EscMsg
  case class StartMsg extends EscMsg
  case class IndexMsg(file: File) extends EscMsg
  case class IndexRspMsg(status: Int) extends EscMsg

  class IndexMaster(props: Map[String,String], reaper: ActorRef) 
      extends Actor {
    val numIndexers = props("numIndexers").toInt
    val schema = Class.forName(props("schemaClass")).
      newInstance.asInstanceOf[Schema]
    val router = context.actorOf(Props(new IndexWorker(props)).
      withRouter(RoundRobinRouter(numIndexers)))

    var nreqs = 0
    var succs = 0
    var fails = 0

    def createIndex(): Int = sendToServer(server0, """
      {"settings": 
        {"index": 
          {"number_of_shards": %s,
           "number_of_replicas": %s}
      }}""".format(props("numShards"), props("numReplicas")), 
      false)

    def createSchema(): Int = sendToServer(server1 + "_mapping", 
      """{ "%s" : { "properties" : %s } }""".
      format(props("indexName"), schema.mappings), false)

    def receive = {
      case StartMsg => {
        val filefilter = Class.forName(props("filterClass")).
          newInstance.asInstanceOf[FileFilter]
        val files = walk(new File(props("rootDir"))).
          filter(f => filefilter.accept(f))
        createIndex()
        createSchema()
        for (file <- files) {
          nreqs = nreqs + 1
          router ! IndexMsg(file) 
        }
      }
      case IndexRspMsg(status) => {
        if (status == 0) succs = succs + 1 else fails = fails + 1
        val processed = succs + fails
        if (processed % 100 == 0)
          println("Processed %d/%d (success=%d, failures=%d)".
            format(processed, nreqs, succs, fails))
        if (nreqs == processed) {
          println("Processed %d/%d (success=%d, failures=%d)".
            format(processed, nreqs, succs, fails))
          reaper ! IndexRspMsg(-1)
          context.stop(self)
        }
      }
    }
  }

  class IndexWorker(props: Map[String,String]) extends Actor {

    val parser = Class.forName(props("parserClass")).
      newInstance.asInstanceOf[Parser]
    val schema = Class.forName(props("schemaClass")).
      newInstance.asInstanceOf[Schema]

    def addDocument(doc: Map[String,String]): Int = {
      val json = doc.filter(kv => schema.isValid(kv._1)).
        map(kv => if (schema.isMultiValued(kv._1)) 
          Json.toJson(kv._1) -> Json.toJson(kv._2.split(",").
            map(e => e.trim).toSeq)
          else Json.toJson(kv._1) -> Json.toJson(kv._2)).
        foldLeft("")((s, e) => s + e._1 + " : " + e._2 + ",")
      sendToServer(server1, "{" + json.substring(0, json.length - 1) + "}", true)
    }

    def receive = {
      case IndexMsg(file) => {
        val doc = parser.parse(Source.fromFile(file))
        sender ! IndexRspMsg(addDocument(doc))
      }
    }
  }

  class Reaper extends Actor {
    def receive = {
      case IndexRspMsg(-1) => {
        println("Shutting down ElasticSearchIndexer")
        context.system.shutdown 
      }
    }  
  }

  ///////////////// global functions ////////////////////

  def properties(conf: File): Map[String,String] = {
    Map() ++ Source.fromFile(conf).getLines().toList.
      filter(line => (! (line.isEmpty || line.startsWith("#")))).
      map(line => (line.split("=")(0) -> line.split("=")(1)))
  }  

  def walk(root: File): Stream[File] = {
    if (root.isDirectory) 
      root #:: root.listFiles.toStream.flatMap(walk(_))
    else root #:: Stream.empty
  }

  def sendToServer(server: String, payload: String, 
      usePost: Boolean): Int = {
    val rsp = if (usePost) WS.url(server).post(payload).get
              else WS.url(server).put(payload).get
    val rspBody = Json.parse(rsp.getBody)
    (rspBody \ "ok").asOpt[Boolean] match {
      case Some(true) => 0
      case _ => -1
    }
  }
}

Of course, the indexing application is intended to be useful beyond the Enron dataset. To that end, I define a set of extension points which can be implemented by someone intending to index some new data with my code above. Its modeled as a set of traits for which concrete implementations need to be provided.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package esc.index

import scala.io.Source
import play.api.libs.json.Json

/////////////// Parser /////////////////

/**
 * An implementation of the Parser trait must be supplied
 * by the user for each new data source. The parse() method
 * defines the parsing logic for the new content.
 */
trait Parser {

  /**
   * @param s a Source representing a file on local filesystem.
   * @return a Map of field name and value.
   */
  def parse(s: Source): Map[String,String]
}

/////////////// Schema /////////////////

/**
 * An implementation of the Schema trait must be supplied 
 * by the user for each new data source. The mappings() 
 * method is a JSON string containing the fields and their
 * properties. It can be used to directly do a put_mapping
 * call on elastic search. The base trait defines some 
 * convenience methods on the mapping string.
 */
trait Schema {

  /**
   * @return a JSON string representing the field names
   * and properties for the content source.
   */
  def mappings(): String

  /**
   * @param fieldname the name of the field.
   * @return true if field exists in mapping, else false.
   */
  def isValid(fieldname: String): Boolean = {
    lazy val schemaMap = Json.parse(mappings)
    (schemaMap \ fieldname \ "type").asOpt[String] match {
      case Some(_) => true
      case None => false
    }
  }

  /**
   * @param fieldname the name of the field.
   * @return true if field is declared as multivalued, else false.
   */
  def isMultiValued(fieldname: String): Boolean = {
    lazy val schemaMap = Json.parse(mappings)
    (schemaMap \ fieldname \ "multi_field").asOpt[String] match {
      case Some("yes") => true
      case Some("no") => false
      case None => false
    }
  }
}

And finally here are concrete implementation of these traits for the Enron dataset.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package esc.index

import scala.io.Source
import scala.collection.immutable.HashMap
import java.io.FileFilter
import java.io.File
import java.util.Date
import java.text.SimpleDateFormat

/**
 * User-configurable classes for the Enron data. These are
 * the classes that will be required to be supplied by a user
 * for indexing a new data source. 
 */

class EnronParser extends Parser {

  override def parse(source: Source): Map[String,String] = {
    parse0(source.getLines(), HashMap[String,String](), false)
  }

  def parse0(lines: Iterator[String], map: Map[String,String], 
      startBody: Boolean): Map[String,String] = {
    if (lines.isEmpty) map
    else {
      val head = lines.next()
      if (head.trim.length == 0) parse0(lines, map, true)
      else if (startBody) {
        val body = map.getOrElse("body", "") + "\n" + head
        parse0(lines, map + ("body" -> body), startBody)
      } else {
        val split = head.indexOf(':')
        if (split > 0) {
          val kv = (head.substring(0, split), head.substring(split + 1))
          val key = kv._1.map(c => if (c == '-') '_' else c).trim.toLowerCase
          val value = kv._1 match {
            case "Date" => formatDate(kv._2.trim)
            case _ => kv._2.trim
          }
          parse0(lines, map + (key -> value), startBody)
        } else parse0(lines, map, startBody)
      }
    }
  }

  def formatDate(date: String): String = {
    lazy val parser = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss")
    lazy val formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
    formatter.format(parser.parse(date.substring(0, date.lastIndexOf('-') - 1)))
  }
}

class EnronFileFilter extends FileFilter {
  override def accept(file: File): Boolean = {
    file.getAbsolutePath().contains("/sent/")
  }
}

class EnronSchema extends Schema {
  override def mappings(): String = """{
    "message_id": {"type": "string", "index": "not_analyzed", "store": "yes"},
    "from": {"type": "string", "index": "not_analyzed", "store": "yes"},
    "to": {"type": "string", "index": "not_analyzed", "store": "yes", 
           "multi_field": "yes"},
    "x_cc": {"type": "string", "index": "not_analyzed", "store": "yes", 
           "multi_field": "yes"},
    "x_bcc": {"type": "string", "index": "not_analyzed", "store": "yes", 
           "multi_field": "yes"},
    "date": {"type": "date", "index": "not_analyzed", "store": "yes"},
    "subject": {"type": "string", "index": "analyzed", "store": "yes"},
    "body": {"type": "string", "index": "analyzed", "store": "yes"}
  }"""
}

I did a very basic installation of ElasticSearch, just unzipping the distribution (my version is 0.19.11) and setting the cluster.name to “sujits_first_es_server” and network.bind_host to 127.0.0.1, then started on a separate terminal window with “bin/elasticsearch -f -Des.config.file=config/elasticsearch.yml”. You can verify that its up on http://localhost:9200.

This is most likely misconfiguration on my part, but whenever I had the network up (wired or wireless), ElasticSearch would try to find clusters to replicate with. Ultimately I had to turn off networking on my computer to get all the data to load.

One more thing is that while the code runs to completion, it does not exit, I have to manually terminate it with CTRL+C. I suspect it is some Future waiting on the sendToServer method, since if I replace that with a NOOP returning 0, it does run to completion normally. I need to investigate this further.

I will describe the search subsystem in a future post (once I finish it).

Update 2012-11-20 – I made some changes to the indexer to use the current GA version of Play2 (2.9.1/2.0.4) instead of a later yet to be released version that I was using previously. I also changed the code to use the Scala WebService libs instead of the Java one that I was using previously. I had hoped that this would allow the application to terminate, but no luck there. I ended up having to start a Play embedded server because the WS calls kept complaining about no running application being found. Somehow the Play WS API seems to be overkill for this application, I am considering switching to spray instead. The latest code can be found on GitHub.

[repost ]An ElasticSearch Web Client with Scala and Play2

original:http://sujitpal.blogspot.com/2012/11/an-elasticsearch-web-client-with-scala.html

In this post, I describe the second part of my submission for the Typesafe Developer Contest. This part is a rudimentary web based search client to query an ElasticSearch (ES) server. It is a Play2/Scala web application that communicates with the ES server via its JSON query DSL.

The webapp has a single form that allows you to specify a Lucene query and various parameters and returns a HTML or JSON response. It will probably remind Solr developers of the admin form. I find the Solr admin form very useful for trying out qeries before baking them into code, and I envision a similar use for this webapp for ES search developers.

Since ES provides a rich JSON based Query DSL, the form here has a few more features than the Solr admin form, such as allowing for faceting and sorting. Although in the interests of full disclosure, it provides only a subset of the variations possible via direct use of JSON and curl on the command line. But its good for quick and dirty verification of search ideas. In order to quickly get started with ES’s query DSL, I found this DZone article by Peter Kar and this blog post by Pulkit Singhal very useful (apart from the ES docs themselves, of course).

Since Play2 was completely new to me a week ago and now I am the proud author of a working webapp, I would like to share with you some of my insights into this framework. I typically learn new things by making analogies to stuff I already know, so I will explain Play2 by making analogies to Spring. If you know Spring, it may be helpful, and if you don’t, well, maybe it was not that terribly helpful anyway…

Routing in Play2 is done using the conf/routes file, which maps URL patterns and HTTP methods to Play2 controller actions. Actions can be thought of as @RequestMapping methods in a Multi-action Spring controller, and are basically functions that transform a Request into a Response. A response can be a String wrapped in an Ok() method or it can be a method call into a view with some data, which returns a templated string to Ok(). There, thats it – about everything you need to know about Play2 to get to using it.

Unlike the last time (with Akka), this time around I did not use the Typesafe Play tutorial. Instead I downloaded Play2 and used the play command to build a new web application template (play new search), then to compile and run it. The best tutorial I found was this one on flurdy.com, which covers everything from choice of IDE to deployment on Heroku and everything in between. Other useful sources are Play’s documentation (available with the Play2 download) and this example Play2 app on GitHub.

Here is my conf/routes file. I added the two entries under Search pages. They both respond to HTTP GET requests and call the form() and search() Actions respectively. The other two entries come with the generated project and are needed (so don’t delete them).

1
 2
 3
 4
 5
 6
 7
 8
 9
10
# conf/routes
# Home page
GET     /                           controllers.Application.index

# Search pages
GET     /form                       controllers.Application.form
GET     /search                     controllers.Application.search

# Map static resources from the /public folder to the /assets URL path
GET     /assets/*file               controllers.Assets.at(path="/public", file)

There is another file in the conf directory, called conf/application.conf. It contains properties required by the default application. I added a new property for the URL for the ES server in this file.

1
2
3
# conf/application.conf
...
es.server="http://localhost:9200/"

The Play2 “new” command also generates a skeleton controller app/controllers/Application.scala, into which we add the two new form and search Actions. Here is the completed Application.scala file.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// app/controllers/Application.scala
package controllers

import models.{Searcher, SearchParams}
import play.api.data.Forms.{text, number, mapping}
import play.api.data.Form
import play.api.libs.json.{Json, JsValue}
import play.api.libs.ws.WS
import play.api.mvc.{Controller, Action}
import play.api.Play

object Application extends Controller {

  // define the search form
  val searchForm = Form(
    mapping (
      "index" -> text,
      "query" -> text,
      "filter" -> text,
      "start" -> number,
      "rows" -> number,
      "sort" -> text,
      "writertype" -> text,
      "fieldlist" -> text,
      "highlightfields" -> text,
      "facetfields" -> text
    ) (SearchParams.apply)(SearchParams.unapply)
  )

  // configuration parameters from conf/application.conf
  val conf = Play.current.configuration
  val server = conf.getString("es.server").get

  // home page - redirects to search form
  def index = Action {
    Redirect(routes.Application.form)
  }

  // form page
  def form = Action {
    val rsp = Json.parse(WS.url(server + "_status").
      get.value.get.body)
    val indices = ((rsp \\ "indices")).
      map(_.as[Map[String,JsValue]].keySet.head)
    Ok(views.html.index(indices, searchForm))
  } 

  // search results action - can send view to one of
  // three different pages (xmlSearch, jsonSearch or htmlSearch)
  // depending on value of writertype
  def search = Action {request =>
    val params = request.queryString.
      map(elem => elem._1 -> elem._2.headOption.getOrElse(""))
    val searchParams = searchForm.bind(params).get
    val result = Searcher.search(server, searchParams)
    searchParams.writertype match {
      case "json" => Ok(result.raw).as("text/javascript")
      case "html" => Ok(views.html.search(result)).as("text/html")
    }
  }
}

We first define a Search form and map it to the SearchParams class (defined in the model, below). The index Action has been changed to redirect to the form Action. The form method makes a call to the ES server to get a list of indexes (ES can support multiple indexes with different schemas within the same server), and then delegates to the index view with this list and an empty searchForm.

The search Action binds the request to the searchParams bean, then sends this bean to the Searcher.search() method, which returns a SearchResult object containing the results of the search. Two different views are supported – the HTML view (delegating to the search view template) and the raw JSON view that just dumps the JSON response from ES.

The respective views for the form and search are shown below. Not much to explain here, except that its another templating language that you have to learn. Its set up like a function – you pass in parameters that you use in the template. I followed the lead of the flurdy.com tutorial referenced above and kept it as HTML-ish as possibly, but Play2 has an extensive templating language of its own that you may prefer.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@** app/views/index.scala.html **@
@(indices: Seq[String], searchForm: Form[SearchParams])

@import helper._

@main("Search with ElasticSearch") {

  <h2>Search with ElasticSearch</h2>
  @form(action = routes.Application.search) {  
    <fieldset>
      <legend>Index Name</legend>
      <select name="index">
      @for(index <- indices) {
        <option value="@index">@index</option>
      }
      </select>
    </fieldset>
    <fieldset>
      <legend>Lucene Query</legend>
      <input type="textarea" name="query" value="*:*" maxlength="1024" rows="10" cols="80"/>
    </fieldset>
    <fieldset>
      <legend>Filter Query</legend>
      <input type="textarea" name="filter" value="" maxlength="512" rows="5" cols="80"/>
    </fieldset>  
    <fieldset>
      <legend>Start Row</legend>
      <input type="text" name="start" value="0" maxlength="5"/>
    </fieldset>
    <fieldset>
      <legend>Maximum Rows Returned</legend>
      <input type="text" name="rows" value="10" maxlength="5"/>
    </fieldset>
    <fieldset>
      <legend>Sort Fields</legend>
      <input type="text" name="sort" value="" maxlength="80" size="40"/>
    </fieldset>
    <fieldset>
      <legend>Output Type</legend>
      <select name="writertype">
        <option value="html" selected="true">HTML</option>
        <option value="json">JSON</option>
      </select>
    </fieldset>
    <fieldset>
      <legend>Fields To Return</legend>
      <input type="text" name="fieldlist" value="" maxlength="80" size="40"/>
    </fieldset>
    <fieldset>
      <legend>Fields to Highlight</legend>
      <input type="text" name="highlightfields" value="" maxlength="80" size="40"/>
    </fieldset>
    <fieldset>
      <legend>Fields to Facet</legend>
      <input type="text" name="facetfields" value="" maxlength="80" size="40"/>
    </fieldset>
    <input type="submit" value="Search"/>
  }
}

The resulting input form looks like this:

 

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@** app/views/search.scala.html **@
@(result: SearchResult)

@import helper._

@main("Search with ElasticSearch - HTML results") {
  <h2>Search Results</h2>
  <p><b>@result.meta("start") to @result.meta("end") results of @result.meta("numFound") in @result.meta("QTime") ms</b></p>
  <hr/>
  <p><b>JSON Query: </b>@result.meta("query_json")</p>
  <hr/>
  @for(doc <- result.docs) {
    <fieldset>
      <table cellspacing="0" cellpadding="0" border="1" width="100%">
      @for((fieldname, fieldvalue) <- doc) {
        <tr valign="top">
          <td width="20%"><b>@fieldname</b></td>
          <td width="80%">@fieldvalue</td>
        </tr>
      }
      </table>
    </fieldset>
  }
  <hr/>
}

Finally, we come to the part of the application that is not autogenerated by Play2 and which contains all the business logic of the application – the model. Here is the code.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
// app/models/Searcher.scala
package models

import scala.Array.canBuildFrom

import play.api.libs.json.{Json, JsValue}
import play.api.libs.ws.WS

case class SearchResult(
  meta: Map[String,Any], 
  docs: Seq[Seq[(String,JsValue)]],
  raw: String
)

case class SearchParams(
  index: String,
  query: String,
  filter: String,
  start: Int,
  rows: Int,
  sort: String,
  writertype: String, 
  fieldlist: String,
  highlightfields: String,
  facetfields: String
)

object Searcher {

  def search(server: String, params: SearchParams): SearchResult = {
    val payload = Searcher.buildQuery(params)
    val rawResponse = WS.url(server + params.index + 
      "/_search?pretty=true").post(payload).value.get.body
    println("response=" + rawResponse)
    val rsp = Json.parse(rawResponse)
    val meta = (rsp \ "error").asOpt[String] match {
      case Some(x) => Map(
        "error" -> x,
        "status" -> (rsp \ "status").asOpt[Int].get
      )
      case None => Map(
        "QTime" -> (rsp \ "took").asOpt[Int].get,
        "start" -> params.start,
        "end" -> (params.start + params.rows),
        "query_json" -> payload,
        "numFound" -> (rsp \ "hits" \ "total").asOpt[Int].get,
        "maxScore" -> (rsp \ "hits" \ "max_score").asOpt[Float].get
      )
    }
    val docs = if (meta.contains("error")) Seq()
    else {
      val hits = (rsp \ "hits" \ "hits").asOpt[List[JsValue]].get
      val idscores = hits.map(hit => Map(
        "_id" -> (hit \ "_id"),
        "_score" -> (hit \ "_score")))
      val fields = hits.map(hit => 
        (hit \ "_source").asOpt[Map[String,JsValue]].get)
      idscores.zip(fields).
        map(tuple => tuple._1 ++ tuple._2).
        map(doc => doc.toSeq.sortWith((doc1, doc2) => doc1._1 < doc2._1))
    }
    new SearchResult(meta, docs, rawResponse)
  }

  def buildQuery(params: SearchParams): String = {
    val queryQuery = Json.toJson(
      if (params.query.isEmpty || "*:*".equals(params.query))
        Map("match_all" -> Map.empty[String,String])
      else Map("query_string" -> Map("query" -> params.query)))
    val queryFilter = if (params.filter.isEmpty) null
      else Json.toJson(Map("query_string" -> Json.toJson(params.filter)))
    val queryFacets = if (params.facetfields.isEmpty) null
      else {
        val fields = params.facetfields.split(",").map(_.trim)
        Json.toJson(fields.zip(fields.
          map(field => Map("terms" -> Map("field" -> field)))).toMap)
      }
    val querySort = if (params.sort.isEmpty) null
      else Json.toJson(params.sort.split(",").map(_.trim).map(field => 
        if (field.toLowerCase.endsWith(" asc") || 
            field.toLowerCase.endsWith(" desc")) 
          (field.split(" ")(0), field.split(" ")(1)) 
        else (field, "")).map(tuple => 
          if (tuple._2.isEmpty) Json.toJson(tuple._1)
          else Json.toJson(Map(tuple._1 -> tuple._2))))  
    val queryFields = if (params.fieldlist.isEmpty) null
      else Json.toJson(params.fieldlist.split(",").map(_.trim))
    val queryHighlight = if (params.highlightfields.isEmpty) null
      else {
        val fields = params.highlightfields.split(",").map(_.trim)
        Json.toJson(Map("fields" -> fields.zip(fields.
          map(field => Map.empty[String,String])).toMap))
      }
    Json.stringify(Json.toJson(Map(
      "from" -> Json.toJson(params.start),
      "size" -> Json.toJson(params.rows),
      "query" -> queryQuery,
      "filter" -> queryFilter,
      "facets" -> queryFacets,
      "sort" -> querySort,
      "fields" -> queryFields,
      "highlight" -> queryHighlight).
      filter(tuple => tuple._2 != null)))
  }
}

The first two are simple case classes, SearchParams and SearchResults are an FBO (Form Backing Object) and DTO (Data Transfer Object) respectively from the Spring world. The search() method takes the ES server URL and the filled in SearchParams object, calls buildQuery() to build the ES Query JSON, then hits the ES server. It then parses the JSON response from ES to create the SearchResult bean, which is passes back to the search Action. The SearchResults object contains a Map containing response metadata, a List of List of key-value pairs which contain the documents, and the raw JSON response from ES.

Here are some screenshots of the results for “hedge fund” from our Enron index that we built using the code from the previous post.

The one on the left shows HTML results (and also shows the JSON query that one would need to use to get the results. The one on the right shows the raw JSON results from the ES server.

Thats all I have for this week. Hope you found it interesting.

Update 2011-11-20 – There were some minor bugs caused by the fields parameter being blank. If the fields parameter is blank, the _source JSON field is returned by ES instead of an array of field objects. The fix is to pass in a “*” (all fields) as the default for the fields parameter. The updated code can be found on my GitHub page.