Streaming Wikipedia edits with Spark and Clojure

Wikipedia edits and IRC

The Wikimedia project has an IRC server with raw feeds of changes to the different Wikimedia wikis, and we can join #en.wikipedia to see all edits to the English language Wikipedia in real-time.

The Apache Flink project has a streaming source for those edits, which is great for getting started with streaming data processing. Apache Spark does not have such a source, so we'll make one. To be precise, we'll implement a Spark Streaming custom receiver, and use clj-bean from the previous post to do it.

The Receiver interface

We only need to implement three methods: onStart, onStop and receive, but first we need to create a Java class with gen-class.

Since we want to play nice with the rest of the JVM ecosystem (Java, Scala, etc.) and Clojure doesn't support interfaces with generics, we specify AbstractWikipediaEditReceiver in Java.

public abstract class AbstractWikipediaEditReceiver extends Receiver<WikipediaEditEvent> {
    public AbstractWikipediaEditReceiver(StorageLevel storageLevel) {
        super(storageLevel);
    }
}

From the previous post you may recall that :constructors is a map from the types of our constructor methods to the types of the superclass's constructor. We have three constructors, one without arguments in which case we'll choose a random nickname when connecting, one with the nickname specified, and one with both the nickname and the storage level for our RDDs.

(:gen-class
 :name com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver
 :extends com.wjoel.spark.streaming.wikiedits.AbstractWikipediaEditReceiver
 :init init
 :state state
 :prefix "receiver-"
 :constructors {[] [org.apache.spark.storage.StorageLevel]
                [String] [org.apache.spark.storage.StorageLevel]
                [String org.apache.spark.storage.StorageLevel] [org.apache.spark.storage.StorageLevel]}
 :main false)

The IRC library we use has a default adapter for receiving events. All the messages we are interested in will be sent as private messages, so we can use proxy to implement only onPrivMsg to call a message handling function for each message received.

(defn make-irc-events-listener [message-fn]
  (proxy [IRCEventAdapter] []
    (onPrivmsg [target user msg]
      (message-fn msg))))

It's easy but tedious to connect to the IRC server, but once we have a connection we can add this listener and use the store method of our Receiver to pass the events back to Spark.

(.addIRCEventListener
 (make-irc-events-listener
  (fn [msg]
    (when-let [edit-event (edit-event-message->edit-event msg)]
      (.store this edit-event)))))

edit-event-message->edit-event uses a regexp to extract the different fields from the message and create a WikipediaEditEvent JavaBean which we have created using clj-bean, as described at the end of the previous post.

We need to start a thread in onStart and clean up in onStop.

(defn connect-as [^com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver this nick]
  (when-let [conn (IRCConnection. wikimedia-irc-host
                                  (int-array [wikimedia-irc-port]) "" nick nick nick)]
    (.put ^java.util.HashMap (.state this)
          "connection" (init-connection this conn))))

(defn receiver-onStart [^com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver this]
  (.start (Thread. (fn []
                     (connect-as this ^String (get-from-state this :nick))))))

(defn receiver-onStop [^com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver this]
  (let [conn ^IRCConnection (get-from-state this :connection)]
    (when (and conn (.isConnected conn))
      (doto conn
        (.send (str "PART " wikimedia-channel))
        (.interrupt)
        (.join 3000)))))

Trying it out

That's almost all there is to it. You can find the source code for spark-streaming-wikiedits on Github to get the few remaining details. You can run this in spark-shell by loading spark-shell-top10.scala

If you haven't used Apache Spark before it's a simple matter of downloading the package from their website, unpacking it and then (in the Spark directory) starting it as follows.

$ ./bin/spark-shell --master local[4] \
  --packages "org.clojure:clojure:1.8.0,\
org.schwering:irclib:1.10,\
com.wjoel:clj-bean:0.2.0,\
com.wjoel:spark-streaming-wikiedits:0.1.3"

scala> :load /path/to/spark-streaming-wikiedits/examples/spark-shell-top10.scala

In 20 seconds you will start getting results with updates every five seconds.

+--------------------+-----------+
|               title|sumByteDiff|
+--------------------+-----------+
|   Patrick Boucheron|        393|
|      Sharon Pincott|        347|
|Water Education F...|       -297|
|     Renormalization|        191|
|   Amal (given name)|        182|
|            BuzzFeed|        111|
| Personalized search|        107|
|      David Pastrňák|        107|
|  Jeepers Creepers 2|        102|
|Indo-Aryan languages|        -90|
+--------------------+-----------+

The JavaBeans we spent so much time making last time make this possible in a type safe manner. This line is what makes it possible.

implicit val encoder = org.apache.spark.sql.Encoders.bean(classOf[WikipediaEditEvent])

Next time we'll use Apache Zeppelin with this connector to analyze and visualize Wikipedia edits in real-time.