Streaming Wikipedia edits with Spark and Clojure

Wikipedia edits and IRC

The Wikimedia project has an IRC server with raw feeds of changes to the different Wikimedia wikis, and we can join #en.wikipedia to see all edits to the English language Wikipedia in real-time.

The Apache Flink project has a streaming source for those edits, which is great for getting started with streaming data processing. Apache Spark does not have such a source, so we'll make one. To be precise, we'll implement a Spark Streaming custom receiver, and use clj-bean from the previous post to do it.

The Receiver interface

We only need to implement three methods: onStart, onStop and receive, but first we need to create a Java class with gen-class.

Since we want to play nice with the rest of the JVM ecosystem (Java, Scala, etc.) and Clojure doesn't support interfaces with generics, we specify AbstractWikipediaEditReceiver in Java.

public abstract class AbstractWikipediaEditReceiver extends Receiver<WikipediaEditEvent> {
    public AbstractWikipediaEditReceiver(StorageLevel storageLevel) {
        super(storageLevel);
    }
}

From the previous post you may recall that :constructors is a map from the types of our constructor methods to the types of the superclass's constructor. We have three constructors, one without arguments in which case we'll choose a random nickname when connecting, one with the nickname specified, and one with both the nickname and the storage level for our RDDs.

(:gen-class
 :name com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver
 :extends com.wjoel.spark.streaming.wikiedits.AbstractWikipediaEditReceiver
 :init init
 :state state
 :prefix "receiver-"
 :constructors {[] [org.apache.spark.storage.StorageLevel]
                [String] [org.apache.spark.storage.StorageLevel]
                [String org.apache.spark.storage.StorageLevel] [org.apache.spark.storage.StorageLevel]}
 :main false)

The IRC library we use has a default adapter for receiving events. All the messages we are interested in will be sent as private messages, so we can use proxy to implement only onPrivMsg to call a message handling function for each message received.

(defn make-irc-events-listener [message-fn]
  (proxy [IRCEventAdapter] []
    (onPrivmsg [target user msg]
      (message-fn msg))))

It's easy but tedious to connect to the IRC server, but once we have a connection we can add this listener and use the store method of our Receiver to pass the events back to Spark.

(.addIRCEventListener
 (make-irc-events-listener
  (fn [msg]
    (when-let [edit-event (edit-event-message->edit-event msg)]
      (.store this edit-event)))))

edit-event-message->edit-event uses a regexp to extract the different fields from the message and create a WikipediaEditEvent JavaBean which we have created using clj-bean, as described at the end of the previous post.

We need to start a thread in onStart and clean up in onStop.

(defn connect-as [^com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver this nick]
  (when-let [conn (IRCConnection. wikimedia-irc-host
                                  (int-array [wikimedia-irc-port]) "" nick nick nick)]
    (.put ^java.util.HashMap (.state this)
          "connection" (init-connection this conn))))

(defn receiver-onStart [^com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver this]
  (.start (Thread. (fn []
                     (connect-as this ^String (get-from-state this :nick))))))

(defn receiver-onStop [^com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver this]
  (let [conn ^IRCConnection (get-from-state this :connection)]
    (when (and conn (.isConnected conn))
      (doto conn
        (.send (str "PART " wikimedia-channel))
        (.interrupt)
        (.join 3000)))))

Read more…

Creating JavaBeans with Clojure

Introduction

JavaBeans have been around since forever in the Java world. They're well supported, but not well designed, as you can see from the list of disadvantages on Wikipedia. Unfortunately, we're stuck with them. Frameworks like Apache Spark and others give us nice things in return for those beans. To create a library which is usable from Java and Scala and is compatible with Spark we must be able to create proper JavaBeans (perhaps - we'll get back to this later).

Hence, we may need to follow the JavaBean standard. The requirements are simple.

  • Getters and setters, also known as accessors, for all fields. Those are the typically verbose Java methods like long getSomeLongField() and void setSomeLongField(long value) which we perhaps wanted to escape from by moving to Clojure. The setters imply that our instance fields must be mutable.
  • JavaBean classes need to implement java.io.Serializable but this is usually easy. We just need to specify that we implement this interface and make sure the types of our fields also implement the interface.
  • A nullary constructor, ie. a constructor that takes zero arguments. This one is a bit difficult for Clojure.

JavaBeans through deftype

There are several ways to create Java classes from Clojure. This can be confusing, but in our case we can directly rule out defrecord since it only supports immutable fields.

Our options are deftype and gen-class. We'll start with deftype since it's easier to work with. Mutable fields can be created by specifying the :volatile-mutable true metadata, and we need to use definterface to specify our accessor methods.

We can use deftype to implement a JavaBean for edits to Wikipedia (we'll have more to say about this in the future).

(definterface IWikiEdit
  (^Long getTimestamp[])
  (setTimestamp [^Long timestamp])
  (^String getTitle [])
  (setTitle [^String title])
  (^long getByteDiff [])
  (setByteDiff [^Long byte-diff]))

(deftype DeftypeEditEvent
    [^{:volatile-mutable true
       :tag java.lang.Long} timestamp
     ^{:volatile-mutable true
       :tag java.lang.String} title
     ^{:volatile-mutable true
       :tag java.lang.Long} byteDiff]
  java.lang.Object
  (toString [_] (str "DeftypeEditEvent; title=" title ", byteDiff=" byteDiff))
  IEditEvent
  (getTimestamp [_] timestamp)
  (setTimestamp [_ v] (set! timestamp v))
  (getTitle [_] title)
  (setTitle [_ v] (set! title v))
  (getByteDiff [_] byteDiff)
  (setByteDiff [_ v] (set! byteDiff v))
  java.io.Serializable)

We can then create DeftypeEditEvent instances by using the factory method or by calling the associated Java class constructor directly.

(ns bean.deftype-bean-test
  (:require [bean.deftype-bean :as dt]))

(println "This is a bean:" (dt/->DeftypeEditEvent 1483138282 "hi" 123))
(println "This too:" (bean.deftype_bean.DeftypeEditEvent. 1483138282 "hi" 123))

Unfortunately this is not a true JavaBean, because it doesn't have a nullary constructor. deftype only creates a single constructor which takes as many arguments as there are fields. Can we create real JavaBeans in Clojure?

gen-class to the rescue

gen-class supports a lot of features, including nullary constructors. Due to its complexity gen-class is generally not recommended, but if we want nullary constructors for our JavaBeans it's the only way - at least if we want to stick to pure Clojure. We'll take care to avoid reflection by using type hints.

Read more…

From JSON to Parquet using Spark SQL (and HDFS sequence files)

But what does it mean?

The title of this post may sound like gibberish if you are not familiar with the Apache world of "big data" technologies. So let's start with an almost jargon free explanation of what we're going to do and a glossary.

We are going to query text files containing JSON objects as if they were a database table and show the results as charts. To do this efficiently we will then convert the text files to a more table-like file format.

  • Spark is a data processing framework.
  • Spark SQL is a library built on Spark which implements the SQL query language.
  • The HDFS sequence file format from the Hadoop filesystem consists of a sequence of records.
  • Parquet is a column-oriented file format that supports compression. Being column-oriented means that instead of storing each row sequentially we store each column separately.
  • Zeppelin is a web-based tool for data visualization and collaboration. We will use Zeppelin to run Scala with Spark (including Spark SQL) and create charts.
  • Kafka is a platform for distributed, ordered logs, or topics. Not important for understanding the rest of this article, but a fantastic piece of technology that will likely be discussed in another post.

All the above are either full Apache projects or trying to become such by going through a stage known as "incubating." To get started with Zeppelin you just download it from the web page, unpack it, run ./bin/zeppelin.sh and visit http://localhost:8080.

HDFS sequence files

I have some HDFS sequence files in a directory, where the value of each record in the files is a JSON string. In this case they have been created by Secor which is used to back up Kafka topics. The files contain about 14 million records from the NYC taxi data set. Their combined size is 4165 MB and we want to use Spark SQL in Zeppelin to allow anyone who knows SQL to run queries on the files and easily visualize the results.

To do this we will need to convert the HDFS sequence files into a string RDD (resilient distributed dataset, an important construct in Spark) which is used to create a DataFrame. A DataFrame is a table where each column has a type, and the DataFrame can be queried from Spark SQL as a temporary view/table.

Fortunately there is support both for reading a directory of HDFS sequence files by specifying wildcards in the path, and for creating a DataFrame from JSON strings in an RDD.

A slightly tricky detail is the need to use copyBytes in order to get a serializable type. This is required because the record values have the type BytesWritable which is not serializable.

Just creating the view takes a while (slightly less than a minute on my computer) because Spark SQL looks at the JSON strings to figure out an appropriate schema.

import org.apache.hadoop.io._

val paths = "file:///home/wjoel/seq/*/*"
val seqs = sc.sequenceFile[LongWritable, BytesWritable](paths)
    .map((record: (LongWritable, BytesWritable)) =>
           new String(record._2.copyBytes(), "utf-8"))
val df = sqlContext.read.json(seqs)
df.createOrReplaceTempView("trips")

This table, or view, already has a schema that has been inferred from the contents. All columns have the type "string" but we'll change this later.

df.printSchema()

root
 |-- dropoff_datetime: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- medallion: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- trip_time_in_secs: string (nullable = true)
 |-- vendor_id: string (nullable = true)

This is already useful, but the performance is not necessarily good enough for exploratory use cases. Just counting the rows takes a bit more a minute.

SELECT COUNT(*) FROM trips
14,776,615

We can make another query to calculate the average trip distance based on the number of passengers. The data includes some trips with "255" passengers, which is most likely "-1" or "unknown" number of passengers, so we only consider trips with fewer than 50 passengers. Calculating this takes about a minute and a half.

SELECT passenger_count, avg(trip_distance) FROM trips WHERE passenger_count < 50 GROUP BY passenger_count ORDER BY 2

Caching and Parquet

The main reason for the performance issues is the conversion from JSON and, to a lesser extent, the fact that all columns in our DataFrame will be strings. Most of the columns are actually numerical, so strings will be converted to doubles (or longs) again and again when running queries involving means, averages, etc.

Read more…

Mesos container log forwarding with Filebeat

The next level

In the previous post we managed to forward container logs to journald, but we could have done the same thing and much more if we could have forwarded the logs to systemd-cat instead. We wouldn't have had to write any code at all if that was possible.

So we're going to make that possible by backporting an upcoming "external" container logger, being developed for the next release of Mesos, as a module compiled for the currently stable release. That's version 1.0.1 at the time of writing.

Then we'll use this module to set up log forwarding from Mesos containers to Graylog using Filebeat 1.3 and include additional fields from the Mesos executor's environment.

Mesos and external container loggers

Apache Mesos has had support for a LogrotateContainerLogger since version 0.27, but if you look at what that container logger you'll find that all it does is receive input from standard input and standard error, counts the number of bytes received, and writes to the stdout and stdout files in the sandbox directory.

It calls logrotate once a configurable number of bytes has been received. This logic is implemented by mesos-logrotate-logger which is a stand-alone program. Why do we need a separate module to feed logs to this specific external program?

MESOS-6003 was created to add a logging module for logging to any external program. Thanks to the hard work of Will Rouesnel, shepherded by Joseph Wu, this is almost completed.

What if we can't wait for the next release of Mesos? Maybe we're stuck on 1.0.1 for a while for reasons beyond our control, but we'd love to use Filebeat for log forwarding. Unfortunately we can't, because the recommended setup doesn't work.

Even if it did we'd still be missing out on important context, like what the Marathon app ID and the Mesos task ID are. Maybe there's a specific version of the Marathon app which is having issues. It would be nice if we could find all log messages from that application and version with a simple search.

We'll make that possible by backporting the external container logging module to Mesos 1.0.1 and setting it up to use a small wrapper around Filebeat to create configuration which includes the task's environment variables and forwards the log messages to Graylog. It will also work if you are using Logstash.

Read more…

A Mesos logging module for journald

A short introduction to Mesos

We're going to send logs from Mesos containers to systemd-journald by writing a container logging module in C++. You can skip this section if you already know what Mesos is.

Apache Mesos describes itself as a distributed systems kernel, which is appropriate in a lot of ways. For some it may sound a bit intimidating and complicated, and I think that's a bit unfortunate, because it can be explained very simply without losing too much.

Mesos offers resources in a cluster. Resources types include CPU cores, memory, disk space, network ports, GPUs, etc. Let's say I'm a developer and I want to run my application on the cluster. I get an offer from Mesos of 4 CPU cores, 8 GBs of memory, 40 GBs of disk space and a range of ports from 10000-20000.

I don't need all of it, and reply that accept 1 CPU core, 2 GBs of memory, 200 MBs of disk space and one port, port 10000, and I want to fetch https://wjoel.com/foo-standalone.jar (a self-contained "fat JAR" with no external dependencies) and run it with the command java -jar foo-standalone.jar. Mesos will create a container using cgroups (if running on Linux) to enforce limits based on the resource constraints I accepted. The container is also known as a sandbox, and we get to play in it as long as we stay within the resource limits.

Developers typically don't want to bother with resource offers from Mesos. Programs that respond to resource offers from Mesos are called frameworks. One such framework is Mesosphere's Marathon, and its application specifications are essentially lists of resources and the command to run. Marathon can also ensure that applications are restarted if they die for any reason, do rolling updates, and many other useful things that developers like to have.

You may have noticed that I told Mesos to run my JAR file using Java, but didn't specify that I wanted Java to be downloaded. Hence, my application will only run if Mesos decides to run it somewhere where Java is already installed.

I could create a Docker image which includes foo-standalone.jar, Java, and any other dependencies I might need. Mesos can run Docker containers as well, either on its own or using by Docker for isolation. Alternatively, I could have included an additional URL in my reply, containing the location of an archive with a full Java installation and used that instead, all from within the container.

Read more…