Streaming Wikipedia edits with Spark and Clojure
Wikipedia edits and IRC
The Wikimedia project has an IRC server with raw feeds of changes to the
different Wikimedia wikis, and we can join #en.wikipedia
to see all edits to
the English language Wikipedia in real-time.
The Apache Flink project has a streaming source for those edits, which is great
for getting started with streaming data processing. Apache Spark does not have
such a source, so we'll make one. To be precise, we'll implement a Spark
Streaming custom receiver, and use clj-bean
from the previous post to
do it.
The Receiver interface
We only need to implement three methods: onStart
, onStop
and receive
,
but first we need to create a Java class with gen-class
.
Since we want to play nice with the rest of the JVM ecosystem
(Java, Scala, etc.) and Clojure doesn't support interfaces with generics,
we specify AbstractWikipediaEditReceiver
in Java.
public abstract class AbstractWikipediaEditReceiver extends Receiver<WikipediaEditEvent> {
public AbstractWikipediaEditReceiver(StorageLevel storageLevel) {
super(storageLevel);
}
}
From the previous post you may recall that :constructors
is a map from
the types of our constructor methods to the types of the superclass's
constructor. We have three constructors, one without arguments in which
case we'll choose a random nickname when connecting, one with the nickname
specified, and one with both the nickname and the storage level for our
RDDs.
(:gen-class
:name com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver
:extends com.wjoel.spark.streaming.wikiedits.AbstractWikipediaEditReceiver
:init init
:state state
:prefix "receiver-"
:constructors {[] [org.apache.spark.storage.StorageLevel]
[String] [org.apache.spark.storage.StorageLevel]
[String org.apache.spark.storage.StorageLevel] [org.apache.spark.storage.StorageLevel]}
:main false)
The IRC library we use has a default adapter for receiving events. All the
messages we are interested in will be sent as private messages, so we can
use proxy to implement only onPrivMsg
to call a message handling function
for each message received.
(defn make-irc-events-listener [message-fn]
(proxy [IRCEventAdapter] []
(onPrivmsg [target user msg]
(message-fn msg))))
It's easy but tedious to connect to the IRC server, but once we have a
connection we can add this listener and use the store
method of our
Receiver
to pass the events back to Spark.
(.addIRCEventListener
(make-irc-events-listener
(fn [msg]
(when-let [edit-event (edit-event-message->edit-event msg)]
(.store this edit-event)))))
edit-event-message->edit-event
uses a regexp to extract the different
fields from the message and create a WikipediaEditEvent
JavaBean which
we have created using clj-bean, as described at the end of the previous
post.
We need to start a thread in onStart
and clean up in onStop
.
(defn connect-as [^com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver this nick]
(when-let [conn (IRCConnection. wikimedia-irc-host
(int-array [wikimedia-irc-port]) "" nick nick nick)]
(.put ^java.util.HashMap (.state this)
"connection" (init-connection this conn))))
(defn receiver-onStart [^com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver this]
(.start (Thread. (fn []
(connect-as this ^String (get-from-state this :nick))))))
(defn receiver-onStop [^com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver this]
(let [conn ^IRCConnection (get-from-state this :connection)]
(when (and conn (.isConnected conn))
(doto conn
(.send (str "PART " wikimedia-channel))
(.interrupt)
(.join 3000)))))