Streaming Wikipedia edits with Spark and Clojure

Wikipedia edits and IRC

The Wikimedia project has an IRC server with raw feeds of changes to the different Wikimedia wikis, and we can join #en.wikipedia to see all edits to the English language Wikipedia in real-time.

The Apache Flink project has a streaming source for those edits, which is great for getting started with streaming data processing. Apache Spark does not have such a source, so we'll make one. To be precise, we'll implement a Spark Streaming custom receiver, and we'll use clj-bean from the previous post to do it.

The Receiver interface

We only need to implement three methods: onStart, onStop and receive, but first we need to create a Java class with gen-class.

Since we want to play nice with the rest of the JVM ecosystem (Java, Scala, etc.) and Clojure doesn't support interfaces with generics, we specify AbstractWikipediaEditReceiver in Java.

public abstract class AbstractWikipediaEditReceiver extends Receiver<WikipediaEditEvent> {
    public AbstractWikipediaEditReceiver(StorageLevel storageLevel) {
        super(storageLevel);
    }
}

From the previous post you may recall that :constructors is a map from the types of our constructor methods to the types of the superclass's constructor. We have three constructors, one without arguments in which case we'll choose a random nickname when connecting, one with the nickname specified, and one with both the nickname and the storage level for our RDDs.

(:gen-class
 :name com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver
 :extends com.wjoel.spark.streaming.wikiedits.AbstractWikipediaEditReceiver
 :init init
 :state state
 :prefix "receiver-"
 :constructors {[] [org.apache.spark.storage.StorageLevel]
                [String] [org.apache.spark.storage.StorageLevel]
                [String org.apache.spark.storage.StorageLevel] [org.apache.spark.storage.StorageLevel]}
 :main false)

The IRC library we use has a default adapter for receiving events. All the messages we are interested in will be sent as private messages, so we can use proxy to implement only onPrivMsg to call a message handling function for each message received.

(defn make-irc-events-listener [message-fn]
  (proxy [IRCEventAdapter] []
    (onPrivmsg [target user msg]
      (message-fn msg))))

It's easy but tedious to connect to the IRC server, but once we have a connection we can add this listener and use the store method of our Receiver to pass the events back to Spark.

(.addIRCEventListener
 (make-irc-events-listener
  (fn [msg]
    (when-let [edit-event (edit-event-message->edit-event msg)]
      (.store this edit-event)))))

edit-event-message->edit-event uses a regexp to extract the different fields from the message and create a WikipediaEditEvent JavaBean which we have created using clj-bean, as described at the end of the previous post.

We need to start a thread in onStart and clean up in onStop.

(defn connect-as [^com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver this nick]
  (when-let [conn (IRCConnection. wikimedia-irc-host
                                  (int-array [wikimedia-irc-port]) "" nick nick nick)]
    (.put ^java.util.HashMap (.state this)
          "connection" (init-connection this conn))))

(defn receiver-onStart [^com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver this]
  (.start (Thread. (fn []
                     (connect-as this ^String (get-from-state this :nick))))))

(defn receiver-onStop [^com.wjoel.spark.streaming.wikiedits.WikipediaEditReceiver this]
  (let [conn ^IRCConnection (get-from-state this :connection)]
    (when (and conn (.isConnected conn))
      (doto conn
        (.send (str "PART " wikimedia-channel))
        (.interrupt)
        (.join 3000)))))

Trying it out

That's almost all there is to it. You can find the source code for spark-streaming-wikiedits on Github to get the few remaining details. You can run this in spark-shell by loading spark-shell-top10.scala

If you haven't used Apache Spark before it's a simple matter of downloading the package from their website, unpacking it and then (in the Spark directory) starting it as follows.

$ ./bin/spark-shell --master local[4] \
  --packages "org.clojure:clojure:1.8.0,\
org.schwering:irclib:1.10,\
com.wjoel:clj-bean:0.2.0,\
com.wjoel:spark-streaming-wikiedits:0.1.3"

scala> :load /path/to/spark-streaming-wikiedits/examples/spark-shell-top10.scala

In 20 seconds you will start getting results with updates every five seconds.

+--------------------+-----------+
|               title|sumByteDiff|
+--------------------+-----------+
|   Patrick Boucheron|        393|
|      Sharon Pincott|        347|
|Water Education F...|       -297|
|     Renormalization|        191|
|   Amal (given name)|        182|
|            BuzzFeed|        111|
| Personalized search|        107|
|      David Pastrňák|        107|
|  Jeepers Creepers 2|        102|
|Indo-Aryan languages|        -90|
+--------------------+-----------+

The JavaBeans we spent so much time making last time make this possible in a type safe manner. This line is what makes it possible.

implicit val encoder = org.apache.spark.sql.Encoders.bean(classOf[WikipediaEditEvent])

Next time we'll use Apache Zeppelin with this connector to analyze and visualize Wikipedia edits in real-time.

Creating JavaBeans with Clojure

Introduction

JavaBeans have been around since forever in the Java world. They're well supported, but not well designed, as you can see from the list of disadvantages on Wikipedia. Unfortunately, we're stuck with them. Frameworks like Apache Spark and others give us nice things in return for those beans. To create a library which is usable from Java and Scala and is compatible with Spark we must be able to create proper JavaBeans (perhaps - we'll get back to this later).

Hence, we may need to follow the JavaBean standard. The requirements are simple.

  • Getters and setters, also known as accessors, for all fields. Those are the typically verbose Java methods like long getSomeLongField() and void setSomeLongField(long value) which we perhaps wanted to escape from by moving to Clojure. The setters imply that our instance fields must be mutable.
  • JavaBean classes need to implement java.io.Serializable but this is usually easy. We just need to specify that we implement this interface and make sure the types of our fields also implement the interface.
  • A nullary constructor, ie. a constructor that takes zero arguments. This one is a bit difficult for Clojure.

JavaBeans through deftype

There are several ways to create Java classes from Clojure. This can be confusing, but in our case we can directly rule out defrecord since it only supports immutable fields.

Our options are deftype and gen-class. We'll start with deftype since it's easier to work with. Mutable fields can be created by specifying the :volatile-mutable true metadata, and we need to use definterface to specify our accessor methods.

We can use deftype to implement a JavaBean for edits to Wikipedia (we'll have more to say about this in the future).

(definterface IWikiEdit
  (^Long getTimestamp[])
  (setTimestamp [^Long timestamp])
  (^String getTitle [])
  (setTitle [^String title])
  (^long getByteDiff [])
  (setByteDiff [^Long byte-diff]))

(deftype DeftypeEditEvent
    [^{:volatile-mutable true
       :tag java.lang.Long} timestamp
     ^{:volatile-mutable true
       :tag java.lang.String} title
     ^{:volatile-mutable true
       :tag java.lang.Long} byteDiff]
  java.lang.Object
  (toString [_] (str "DeftypeEditEvent; title=" title ", byteDiff=" byteDiff))
  IEditEvent
  (getTimestamp [_] timestamp)
  (setTimestamp [_ v] (set! timestamp v))
  (getTitle [_] title)
  (setTitle [_ v] (set! title v))
  (getByteDiff [_] byteDiff)
  (setByteDiff [_ v] (set! byteDiff v))
  java.io.Serializable)

We can then create DeftypeEditEvent instances by using the factory method or by calling the associated Java class constructor directly.

(ns bean.deftype-bean-test
  (:require [bean.deftype-bean :as dt]))

(println "This is a bean:" (dt/->DeftypeEditEvent 1483138282 "hi" 123))
(println "This too:" (bean.deftype_bean.DeftypeEditEvent. 1483138282 "hi" 123))

Unfortunately this is not a true JavaBean, because it doesn't have a nullary constructor. deftype only creates a single constructor which takes as many arguments as there are fields. Can we create real JavaBeans in Clojure?

gen-class to the rescue

gen-class supports a lot of features, including nullary constructors. Due to its complexity gen-class is generally not recommended, but if we want nullary constructors for our JavaBeans it's the only way - at least if we want to stick to pure Clojure. We'll take care to avoid reflection by using type hints.

(gen-class
 :name bean.gen_class_bean.EditGenClass
 :implements [java.io.Serializable]
 :init init
 :state state
 :prefix "edit-"
 :constructors {[] []
                [Long String Long] []}
 :methods [[getTimestamp [] Long]
           [setTimestamp [Long] void]
           [getTitle [] String]
           [setTitle [String] void]
           [getByteDiff [] Long]
           [setByteDiff [Long] void]])

(defn edit-init
  ([] (edit-init 0 nil 0))
  ([timestamp title byteDiff]
   [[] (object-array [timestamp title byteDiff])]))

(defn edit-getTimestamp [this]
  (aget ^objects (.state ^bean.gen_class_bean.EditGenClass this) 0))
(defn edit-setTimestamp [this v]
  (aset ^objects (.state ^bean.gen_class_bean.EditGenClass this) 0 ^Long v))
(defn edit-getTitle [this]
  (aget ^objects (.state ^bean.gen_class_bean.EditGenClass this) 1))
(defn edit-setTitle [this v]
  (aset ^objects (.state ^bean.gen_class_bean.EditGenClass this) 1 ^String v))
(defn edit-getByteDiff [this]
  (aget ^objects (.state ^bean.gen_class_bean.EditGenClass this) 2))
(defn edit-setByteDiff [this v]
  (aset ^objects (.state ^bean.gen_class_bean.EditGenClass this) 2 ^Long v))

Instance methods are within the body of the class definition in Java, while in Clojure we define them in the namespace of the gen-class statement and with the prefix given by :prefix. We'll create a constructor with the name given by :init, which together with the prefix of our choice means that the constructor will invoke our edit-init function.

We must support two constructors. In addition to the nullary constructor, we also need a constructor as in the deftype case where we accept one argument for each field. In order for Clojure to figure out how to call the constructor of the super class, we use :constructors.

 :constructors {[] []
                [Long String Long] []}

This tells Clojure that we want to call the super class constructor with zero arguments if our constructor is called with zero arguments, but also that we want to call the super class constructor with zero arguments if our constructor is called with three arguments of the given types, one for each field.

Our constructor must return a vector of two elements. The first element contains the arguments for the super class constructor (empty in this case) and the second element is the initial value for :state. We use a Java array of objects as state since we must support mutable state.

Benchmarks

Performance is important since we may create a very large number of JavaBeans when using data processing frameworks. We'll benchmark our implementations and compare them with their Java counterparts by creating 500000 JavaBeans, saving the beans to disk, reading them back and summing their byteDiff values. We'll also look at the size of the JavaBeans persisted on disk.

You may have noticed that we have been using Long and Boolean instead of the primitive types long and boolean in our JavaBeans so far. To be fair to Java we should also consider an implementation taking advantage of primitive types. You can find the benchmark implementation on Github if you want to run the benchmarks yourself. Let's look at the results.

Benchmark results

Implementation Size on disk Mean execution time Standard deviation
Java 87554 KB 50.5 s 0.410 s
Java (primitives) 70536 KB 26.0 s 0.156 s
deftype 87554 KB 50.5 s 0.204 s
gen-class 90067 KB 39.6 s 0.106 s
clj-bean 85185 KB 36.2 s 0.480 s

Each benchmark was executed 10 times to calculate the mean and standard deviation. deftype performs exactly as Java without primitive types, Java with primitive types is better than everything else, but clj-bean is a good second.

What's clj-bean you ask? Good question.

So what's the point? Macros.

If our Clojure implementation looked as above and performed this much worse than the optimal Java implementation there wouldn't be much point to Clojure for this specific use case. We could just implement the JavaBeans as Java classes and include them in our Clojure project.

Macros are perfect for eliminating the verbosity of gen-class. With an appropriate macro we only need to specify the name of the JavaBean, its fields, and their types. The macro will take care of generating all the required constructors, state, setters and getters.

We can also reduce the size and improve the performance of our JavaBeans by changing the way we deal with state. Instead of a single array of objects as the state, we'll use an object array where each element is an array of a primitive type, or an object. Non-primitive fields will be stored in the object array while primitive fields are stored in arrays of their primitive types.

The full WikipediaEditEvent has 13 fields, so it would be very tedious to write out all the required accessors. Using a macro it's easy, which brings us back to clj-bean and its (currently) sole purpose, the defbean macro. This is what the full definition of a Wikipedia edit event looks like:

(defbean WikipediaEditEvent
  [[long timestamp]
   [String channel]
   [String title]
   [String diffUrl]
   [String user]
   [long byteDiff]
   [String summary]
   [boolean minor]
   [boolean new]
   [boolean unpatrolled]
   [boolean botEdit]
   [boolean special]
   [boolean talk]])

You can find clj-bean on GitHub. There are good tutorials out there for learning how to write macros. If you are new to macros and want to understand clj-bean you may want to go through a macro tutorial and look at the tests before moving on to the implementation.

The beauty of macros is that you as a user of them only need to understand what they do, as explained in this post, and not necessarily how they do it with all the strange quoting rules and code generation under the hood - as long as they work without any issues, anyway.

Conclusion

The optimal Java implementation still has better performance, but the final Clojure implementation isn't far off and wins big time in the number of lines required to specify the JavaBean.

This pattern of starting off with a naive implementation and refining it until it does what you want but is overly verbose is a useful tool for creating macros. By first focusing on the code and writing down the desired end goal first (the constructors, the state, and all the accessors) we get some useful tests. Once our macro can replicate this end goal we can think about optimizations, using the tests to ensure we keep the same behavior.

In the end we have a great API for generating JavaBeans with good performance. Ideally we'd get all the way to Java's performance with primitive types, but this will be difficult without generating byte code. Perhaps we'll do that one day, but the API would remain the same since it contains only the essentials for the definition of a JavaBean.

From JSON to Parquet using Spark SQL (and HDFS sequence files)

But what does it mean?

The title of this post may sound like gibberish if you are not familiar with the Apache world of "big data" technologies. So let's start with an almost jargon free explanation of what we're going to do and a glossary.

We are going to query text files containing JSON objects as if they were a database table and show the results as charts. To do this efficiently we will then convert the text files to a more table-like file format.

  • Spark is a data processing framework.
  • Spark SQL is a library built on Spark which implements the SQL query language.
  • The HDFS sequence file format from the Hadoop filesystem consists of a sequence of records.
  • Parquet is a column-oriented file format that supports compression. Being column-oriented means that instead of storing each row sequentially we store each column separately.
  • Zeppelin is a web-based tool for data visualization and collaboration. We will use Zeppelin to run Scala with Spark (including Spark SQL) and create charts.
  • Kafka is a platform for distributed, ordered logs, or topics. Not important for understanding the rest of this article, but a fantastic piece of technology that will likely be discussed in another post.

All the above are either full Apache projects or trying to become such by going through a stage known as "incubating." To get started with Zeppelin you just download it from the web page, unpack it, run ./bin/zeppelin.sh and visit http://localhost:8080.

HDFS sequence files

I have some HDFS sequence files in a directory, where the value of each record in the files is a JSON string. In this case they have been created by Secor which is used to back up Kafka topics. The files contain about 14 million records from the NYC taxi data set. Their combined size is 4165 MB and we want to use Spark SQL in Zeppelin to allow anyone who knows SQL to run queries on the files and easily visualize the results.

To do this we will need to convert the HDFS sequence files into a string RDD (resilient distributed dataset, an important construct in Spark) which is used to create a DataFrame. A DataFrame is a table where each column has a type, and the DataFrame can be queried from Spark SQL as a temporary view/table.

Fortunately there is support both for reading a directory of HDFS sequence files by specifying wildcards in the path, and for creating a DataFrame from JSON strings in an RDD.

A slightly tricky detail is the need to use copyBytes in order to get a serializable type. This is required because the record values have the type BytesWritable which is not serializable.

Just creating the view takes a while (slightly less than a minute on my computer) because Spark SQL looks at the JSON strings to figure out an appropriate schema.

import org.apache.hadoop.io._

val paths = "file:///home/wjoel/seq/*/*"
val seqs = sc.sequenceFile[LongWritable, BytesWritable](paths)
    .map((record: (LongWritable, BytesWritable)) =>
           new String(record._2.copyBytes(), "utf-8"))
val df = sqlContext.read.json(seqs)
df.createOrReplaceTempView("trips")

This table, or view, already has a schema that has been inferred from the contents. All columns have the type "string" but we'll change this later.

df.printSchema()

root
 |-- dropoff_datetime: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- medallion: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- trip_time_in_secs: string (nullable = true)
 |-- vendor_id: string (nullable = true)

This is already useful, but the performance is not necessarily good enough for exploratory use cases. Just counting the rows takes a bit more a minute.

SELECT COUNT(*) FROM trips
14,776,615

We can make another query to calculate the average trip distance based on the number of passengers. The data includes some trips with "255" passengers, which is most likely "-1" or "unknown" number of passengers, so we only consider trips with fewer than 50 passengers. Calculating this takes about a minute and a half.

SELECT passenger_count, avg(trip_distance) FROM trips WHERE passenger_count < 50 GROUP BY passenger_count ORDER BY 2

Caching and Parquet

The main reason for the performance issues is the conversion from JSON and, to a lesser extent, the fact that all columns in our DataFrame will be strings. Most of the columns are actually numerical, so strings will be converted to doubles (or longs) again and again when running queries involving means, averages, etc.

If we have enough memory available we can use df.persist(StorageLevel.MEMORY_ONLY) (cache() is just an alias for persist(StorageLevel.MEMORY_ONLY)) to avoid reading the files and converting from JSON for each query.

The first time we do the row count it actually takes longer, since there's extra work that needs to be done in order to do the caching in memory, but if we make the same query for the count again it takes less than a second. Calculating the average trip distance only takes 3 seconds.

In short, caching has improved the performance dramatically at the cost of some memory. In fact, we need to configure Zeppelin to actually have enough memory available for Spark to cache the whole RDD, since it needs 1670 MB and by default we only have 400 MB (spark.driver.memory * spark.storage.memoryFraction) available for storage. We need to add the following to conf/zeppelin-env.sh to have 2.1 GB available for storage.

export ZEPPELIN_INTP_MEM="-Xms4g -Xmx4g"

Visit http://localhost:4040/executors/ after running some code block in Zeppelin to see how many executors you have running (there should be one, with executor ID "driver") and the amount of storage memory per executor. Then go to http://localhost:4040/storage/ to see the RDDs, and what percentage of them, that are cached.

Storage tab of the Spark web UI

You may be wondering why there's an RDD at all if we're using DataFrames. The reason is that DataFrames are built on top of RDDs, and our table (or view) is "backed" by this RDD.

As if the configuration hassles weren't bad enough the performance can become unpredictable if there's a shortage of memory, since Spark can and will evict persisted RDDs from memory if necessary. If that happens we may end up reading all the files and parsing all the JSON again. As a compromise we could use StorageLevel.MEMORY_AND_DISK, but then we would not get the same performance improvements and all columns would still be of type "string."

It would be better if we could use a columnar data format with types. Parquet is one such format. We will create a Parquet file from our DataFrame. We get several benefits from using Parquet:

  • There will be no need to parse JSON strings since it's a binary format. We can still use caching, but if and when it's necessary to read the data from disk it will be much faster.
  • The data can be queried more efficiently because the format is based on columns, so there's no need to look around in a JSON structure to find a field, and columns that are irrelevant to the query can be skipped entirely.
  • Aggregations which need to scan all values in a column can be done much more efficiently because there is no need for disk seeks and the column data is read sequentially, which is very cache friendly for CPUs.
  • The data will be much smaller because Parquet supports compression. Compression is very helpful since some of our data consists of strings with a lot of repetition, like the names of the two taxi companies.

Writing a Parquet file

Writing a DataFrame to a Parquet file is trivial. Using SaveMode.Overwrite is optional, but was helpful as I was trying different things.

import org.apache.spark.sql.SaveMode

df.write.mode(SaveMode.Overwrite).parquet("/home/wjoel/z/trips.parquet")

You can probably guess how we can create a DataFrame backed (by an RDD backed by) the Parquet file, which is 462 MB.

val parquetDf = sqlContext.read.parquet("file:///home/wjoel/z/trips.parquet")

parquetDf.createOrReplaceTempView("trips_parquet")

Counting the rows only takes an instant, but it takes 40 seconds to calculate the average trip distance. This is still not good enough for data exploration, at least not if we value the explorer's time. Caching can of course still be used but it requires about as much memory as before, since the data Spark needs to store is still a whole RDD of strings, but at least reading it back from disk after being evicted from memory will be faster.

Can we do better? Yes, of course we can. That was a rhetorical question.

Creating a typed schema

We will end this post by creating a proper schema for our DataFrame, because so far all columns are still strings even with Parquet. The performance is already good if we just use Parquet files, but it will be even better after we convert the columns into proper types before saving the Parquet file. Using proper types also saves us some additional disk space. To do this we will invariably have to know something about our data.

Ideally the types would be included in the messages themselves, for example by using Avro. Since we only have JSON we will create the schema explicitly and supply type information for all 14 columns.

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val schemaDf = parquetDf
  .withColumn("dropoff_datetime", parquetDf("dropoff_datetime").cast(TimestampType))
  .withColumn("dropoff_latitude", parquetDf("dropoff_latitude").cast(DoubleType))
  .withColumn("dropoff_longitude", parquetDf("dropoff_longitude").cast(DoubleType))
  .withColumn("hack_license", parquetDf("hack_license").cast(StringType))
  .withColumn("medallion", parquetDf("medallion").cast(StringType))
  .withColumn("passenger_count", parquetDf("passenger_count").cast(LongType))
  .withColumn("pickup_datetime", parquetDf("pickup_datetime").cast(TimestampType))
  .withColumn("pickup_latitude", parquetDf("pickup_latitude").cast(DoubleType))
  .withColumn("pickup_longitude", parquetDf("pickup_longitude").cast(DoubleType))
  .withColumn("rate_code", parquetDf("rate_code").cast(LongType))
  .withColumn("store_and_fwd_flag", parquetDf("store_and_fwd_flag").cast(StringType))
  .withColumn("trip_distance", parquetDf("trip_distance").cast(DoubleType))
  .withColumn("trip_time_in_secs", parquetDf("trip_time_in_secs").cast(LongType))
  .withColumn("vendor_id", parquetDf("vendor_id").cast(StringType))
schemaDf.createOrReplaceTempView("trips_parquet_schema_added")

We can print the schema to verify that it's using the types we specified and save this as another Parquet file.

schemaDf.printSchema()
schemaDf.write.parquet("/home/wjoel/z/trips_schema.parquet")
root
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- medallion: string (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- rate_code: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- trip_time_in_secs: long (nullable = true)
 |-- vendor_id: string (nullable = true)

The schema makes a difference. Our original data set was 4165 MB, while this Parquet file with an all strings "schema" is 462 MB and the Parquet file with a schema is 398 MB. Note that both of those files are actually smaller than the cached RDD, which was 1670 MB. Now let's read this back and make a third view.

val schemaParquetDf = sqlContext.read.parquet("/home/wjoel/z/trips_schema.parquet")
schemaParquetDf.createOrReplaceTempView("trips_schema_parquet")

It only takes 1 second to calculate the average trip distance. Again, we're not even caching this. It's fast because the trip distance column is stored as a separate file on disk, and the trip distances are stored as doubles and compressed. Reading that back and calculating the average is trivial compared to parsing JSON, and much faster than reading back strings and parsing them as doubles which is what needs to be done with the "all columns are strings" Parquet file.

Average trip distance with Parquet and schema

Summary and getting the data

We started out with queries taking about 45 seconds. Data that was 4165 MB. With some memory tricks and parameter tweaking we reduced the query time to 3 seconds, but only if our memory isn't allocated to caching something else. By using Parquet with a schema we ended up with queries taking 1 second and 398 MB data.

Types are important and Parquet is fantastic. JSON is used all over in the real world but converting it to Parquet is easy with Spark SQL. It takes some more effort to create a schema, but it doesn't have to if you use a format like Avro instead of JSON, and it may not be so bad even with just JSON.

Keep in mind that you can choose to specify types only for the specific fields you care about. For the examples in this post only specifying the types of vendor_id, passenger_count, and trip_distance would have given us most of the benefits.

To try this out on a smaller scale you can download a 10 MB or 200 MB HDFS sequence file and use this Zeppelin notebook at home.

If you want more you need to download some files from the NYC Taxi Trips data set, download Kafka and start a local Kafka broker (with ZooKeeper), and download and configure Secor. To use Secor you need to have access to Amazon S3, Azure blob storage, Google Cloud Storage or Openstack Swift. You might be able to make it work with Minio if you don't have a cloud computing account

After configuring Secor to use S3 you can use csv-to-kafka-json to post a CSV file from the taxi trips data set to Kafka, and after a short while you can find the HDFS sequence files created by Secor in your S3 bucket.

I'll have more to say about the visualizations in Zeppelin in the next post. It will be more advanced than this one, so stay tuned if this was too basic for you.

Mesos container log forwarding with Filebeat

The next level

In the previous post we managed to forward container logs to journald, but we could have done the same thing and much more if we could have forwarded the logs to systemd-cat instead. We wouldn't have had to write any code at all if that was possible.

So we're going to make that possible by backporting an upcoming "external" container logger, being developed for the next release of Mesos, as a module compiled for the currently stable release. That's version 1.0.1 at the time of writing.

Then we'll use this module to set up log forwarding from Mesos containers to Graylog using Filebeat 1.3 and include additional fields from the Mesos executor's environment.

Mesos and external container loggers

Apache Mesos has had support for a LogrotateContainerLogger since version 0.27, but if you look at what that container logger you'll find that all it does is receive input from standard input and standard error, counts the number of bytes received, and writes to the stdout and stdout files in the sandbox directory.

It calls logrotate once a configurable number of bytes has been received. This logic is implemented by mesos-logrotate-logger which is a stand-alone program. Why do we need a separate module to feed logs to this specific external program?

MESOS-6003 was created to add a logging module for logging to any external program. Thanks to the hard work of Will Rouesnel, shepherded by Joseph Wu, this is almost completed.

What if we can't wait for the next release of Mesos? Maybe we're stuck on 1.0.1 for a while for reasons beyond our control, but we'd love to use Filebeat for log forwarding. Unfortunately we can't, because the recommended setup doesn't work.

Even if it did we'd still be missing out on important context, like what the Marathon app ID and the Mesos task ID are. Maybe there's a specific version of the Marathon app which is having issues. It would be nice if we could find all log messages from that application and version with a simple search.

We'll make that possible by backporting the external container logging module to Mesos 1.0.1 and setting it up to use a small wrapper around Filebeat to create configuration which includes the task's environment variables and forwards the log messages to Graylog. It will also work if you are using Logstash.

Creating and using the backported module

By reusing much of what we did in the last post we can create a module for Mesos 1.0.1 using the code available in the review request for the external logger module. It doesn't compile using Mesos 1.0.1 as is, but the necessary changes are trivial.

The module has been compiled for Ubuntu 14.04 (Trusty Tahr) and Debian 8 and can be downloaded from the mesos-external-container-logger repository on GitHub, but using the provided scripts you can compile it for your system.

It is enabled by starting mesos-slave with a modules config similar to this one and using --container-logger=org_apache_mesos_ExternalContainerLogger.

{
  "libraries": [{
    "file": "/usr/lib/libexternal_container_logger-0.1.0.so",
    "modules": [{
      "name": "org_apache_mesos_ExternalContainerLogger",
      "parameters": [{"value": "/usr/bin/filebeat-container-logger.sh",
                      "key": "external_logger_cmd"}]
    }]
  }]
}

The ExternalContainerLogger module creates an environment variable called MESOS_LOG_STREAM which will have the value "STDOUT" or "STDERR" depending on where the log is coming from. To emulate the default sandbox container logger you could use this.

#!/bin/sh
cat > $(echo "$MESOS_LOG_STREAM" | tr '[:upper:]' '[:lower:]')

We can also emulate the LogrotateContainerLogger.

#!/bin/sh
/usr/libexec/mesos/mesos-logrotate-logger \
  --log_filename=$(echo "$MESOS_LOG_STREAM" | tr '[:upper:]' '[:lower:]')

Filebeat as the external logger process

We can use Filebeat to forward our logs to a central log server like Graylog if we're not happy with local files. We'll create a small bootstrap script to create the Filebeat configuration file and start it.

We extract all environment variables for the task from MESOS_EXECUTORINFO_JSON, which is set by the module, using jq to parse the JSON and a bit of awk to convert it to the proper format.

The script is simple enough to include in full here but you can also find it on GitHub as examples/filebeat-container-logger.sh

#!/bin/bash

config_path="$MESOS_LOG_SANDBOX_DIRECTORY/filebeat-$MESOS_LOG_STREAM.yml"

mesos_fields=$(echo "$MESOS_EXECUTORINFO_JSON" | \
                jq -r ".command.environment.variables
                         |map(\"\(.name):\(.value|tostring)\")|.[]" | \
                # Skip empty variables, use mesos_ prefix, convert to lowercase
                awk -F: 'length($2) > 0 {
                           $1=tolower($1);
                           if (!match($1, "^mesos_.*")) {
                             $1="mesos_" $1;
                           }
                           printf("%s: \"%s\"\n        ", $1, $2);
                         }')

cat <<EOF > $config_path
filebeat:
  prospectors:
    -
      paths:
        - "-"
      input_type: stdin
      close_eof: true
      fields:
        mesos_log_stream: $MESOS_LOG_STREAM
        mesos_log_sandbox_directory: $MESOS_LOG_SANDBOX_DIRECTORY
        $mesos_fields

output:
  logstash:
    hosts: ["graylog.aws.wjoel.com:5044"]
EOF

/usr/bin/filebeat -c $config_path

Change the hosts variable to your Graylog or Logstash server. More output options are available if you need to send the logs to something else.

All environment variables in the ExecutorInfo JSON description, prefixed with mesos_ and in lowercase, are included as extra fields. This makes it easy to find all log messages for any executor, Marathon application, or Mesos agent.

This is what we get in Graylog when we run three instances of the "hello world" Marathon app from the previous post with all of the above in place.

Logs from Marathon in Graylog

Making the previous post irrelevant with systemd-cat

At the end of the previous post we noted that we had implemented a worse systemd-cat. We can use the external logger module to provide logging to journald with systemd-cat. Note that this example requires that jq is installed on the Mesos agent, just like the example above.

#!/bin/bash

log_level="info"
if [ "$MESOS_LOG_STREAM" = "STDERR" ]; then
  log_level="err"
fi

task_id=$(echo "$MESOS_EXECUTORINFO_JSON" | \
  jq -r '.command.environment.variables[]|select(.name=="MESOS_TASK_ID").value')

if [ "$task_id" ]; then
    task_identifier="-t $task_id"
fi

systemd-cat $task_identifier -p $log_level

Finally, we can combine Filebeat with mesos-logrotate-logger by changing the last line of filebeat-container-logger.sh to the following.

log_stream=$(echo $MESOS_LOG_STREAM | tr '[:upper:]' '[:lower:]')
tee >(/usr/libexec/mesos/mesos-logrotate-logger \
        --log_filename="$MESOS_LOG_SANDBOX_DIRECTORY/$log_stream" \
        --logrotate_options="rotate 5" \
        --max_size="10M") | \
  /usr/bin/filebeat -c $config_path

This way the logs are available both in Graylog and the sandbox, so we can view logs in the Mesos web interface as usual. In addition, log rotation will ensure that don't out of disk space due to unbounded container logs.

Caveats and possible improvements

The cost for this setup is two Filebeat processes per container. That's acceptable for most setups since it was created to be lightweight, but in some cases it could be worth exploring other options.

For example, you could run just one Filebeat process on each Mesos agent and create config for each container in an external logger script which is then loaded by Filebeat. You would then have to find a way to clean up configs after containers are terminated, but this should be possible (although ugly) with a cron job.

Filebeat unfortunately does not terminate when standard input is closed, even with close_eof. The newly added -once flag might help, but it's so new that you would currently have to compile Filebeat from source to enable it.

This should not be much of an issue if you have long-running services but otherwise you should find a way to solve this. Again, perhaps by using a cron to kill off old Filebeat processes.

Summary

By backporting the upcoming external container logger module to Mesos 1.0.1 we can centralize container logs by using Filebeat and Graylog today.

The external container logger is flexible enough to replace all existing container logger modules and the one for journald which we created in the previous post. It can even be used to combine the functionality of several other container loggers.

By centralizing the container logs with Filebeat we can include environment variables provided by Mesos, Marathon and other frameworks to provide more context.

The future of logging is bright for Mesos, and we can have much of it today thanks to modules. You can get this module on GitHub in the mesos-external-container-logger repository and follow the development by Will Rouesnel in MESOS-6003.

It's not perfect yet, but I hope you find this useful.

A Mesos logging module for journald

A short introduction to Mesos

We're going to send logs from Mesos containers to systemd-journald by writing a container logging module in C++. You can skip this section if you already know what Mesos is.

Apache Mesos describes itself as a distributed systems kernel, which is appropriate in a lot of ways. For some it may sound a bit intimidating and complicated, and I think that's a bit unfortunate, because it can be explained very simply without losing too much.

Mesos offers resources in a cluster. Resources types include CPU cores, memory, disk space, network ports, GPUs, etc. Let's say I'm a developer and I want to run my application on the cluster. I get an offer from Mesos of 4 CPU cores, 8 GBs of memory, 40 GBs of disk space and a range of ports from 10000-20000.

I don't need all of it, and reply that accept 1 CPU core, 2 GBs of memory, 200 MBs of disk space and one port, port 10000, and I want to fetch https://wjoel.com/foo-standalone.jar (a self-contained "fat JAR" with no external dependencies) and run it with the command java -jar foo-standalone.jar. Mesos will create a container using cgroups (if running on Linux) to enforce limits based on the resource constraints I accepted. The container is also known as a sandbox, and we get to play in it as long as we stay within the resource limits.

Developers typically don't want to bother with resource offers from Mesos. Programs that respond to resource offers from Mesos are called frameworks. One such framework is Mesosphere's Marathon, and its application specifications are essentially lists of resources and the command to run. Marathon can also ensure that applications are restarted if they die for any reason, do rolling updates, and many other useful things that developers like to have.

You may have noticed that I told Mesos to run my JAR file using Java, but didn't specify that I wanted Java to be downloaded. Hence, my application will only run if Mesos decides to run it somewhere where Java is already installed.

I could create a Docker image which includes foo-standalone.jar, Java, and any other dependencies I might need. Mesos can run Docker containers as well, either on its own or using by Docker for isolation. Alternatively, I could have included an additional URL in my reply, containing the location of an archive with a full Java installation and used that instead, all from within the container.

Container logging in Mesos

The output from my program will end up in the directory of the sandbox Mesos created, in the files stdout and stderr. That's fine in a lot of cases, since the Mesos UI has an interface to view the contents of those files and even updates the view when the files are changed.

Some people prefer to have all their logs go through systemd-journald, or journald for short, perhaps because they have already solved the problems of log forwarding and archiving for journald. We can get this today, instead of having to wait for a release that has it, because there is support for many types of modules in Mesos. There is a module type for container loggers, so let's make one for journald.

The default behavior of logging to stdout and stderr is implemented by the sandbox logger, which can be found in src/slave/container_logger/sandbox.cpp. It's alright if you don't know (or dislike) C++, because the important lines are simple enough.

  process::Future<ContainerLogger::SubprocessInfo> prepare(
      const ExecutorInfo& executorInfo,
      const std::string& sandboxDirectory)
  {
    ContainerLogger::SubprocessInfo info;

    info.out = SubprocessInfo::IO::PATH(path::join(sandboxDirectory, "stdout"));
    info.err = SubprocessInfo::IO::PATH(path::join(sandboxDirectory, "stderr"));

    return info;
  }

In other words, direct all standard output into stdout in the sandbox directory, and direct all standard errors into stderr in the sandbox directory.

A Mesos module for container logging to systemd-journald

At first I thought I'd have to intercept everything written to info.out and info.err and split it on newlines, and then send them to journald using sd_journal_print. I was sufficiently disgusted by the idea of going back to the ancient C world of reading bytes from file descriptors to go looking for prior art. There is already a command for sending lines of texts to journald called systemd-cat, and it is straightforward.

Using sd_journal_stream_fd doesn't quite work with the example above, since it is using (file) paths, but it's possible to assign a file descriptor to info.out and info.err.

  // find out how to set this to the Mesos task identifier on github
  std::string identifier;

  journal_out = sd_journal_stream_fd(identifier.c_str(), LOG_INFO, 1);
  journal_err = sd_journal_stream_fd(identifier.c_str(), LOG_ERR, 1);

  info.out = SubprocessInfo::IO::FD(journal_out);
  info.err = SubprocessInfo::IO::FD(journal_err);

The documentation on Mesos modules instructions is good, but some extra steps are needed for the compilation to succeed. First compile Mesos but use ../configure --enable-install-module-dependencies. You might be forgiven for missing that step, as I did, since it's not (yet?) included in the documentation and it's a new flag. I also had to install libz-dev on a clean Debian 8 installation, but once Mesos has been compiled and installed we can compile the module.

g++ -I/usr/local/lib/mesos/3rdparty/include -c -fpic \
 -o journald_container_logger.o journald_container_logger.cpp
gcc -lsystemd -shared -o mesos_journald_container_logger.so journald_container_logger.o

The GitHub repository for this code uses CMake, but the above is the essence of what it does and it's where I started before going down the CMake route.

A quick demo

First we start mesos-master as usual. I use an internal IP address here. Then we start mesos-agent with two additional flags, perhaps using sudo. We'll use Marathon to run a simple shell script.

$ mesos-master --ip=10.0.1.3
$ mesos-agent --master=10.0.1.3:5050 \
  --modules='{"libraries":[{"file":"/path/to/mesos_journald_container_logger.so", \
              "modules":[{"name":"com_wjoel_JournaldLogger"}]}]}' \
  --container_logger=com_wjoel_JournaldLogger
$ /path/to/marathon-1.1.1/bin/start --master 10.0.1.3:5050

Once everything is up and running we can create an application in Marathon which echoes "hello world" every 5 seconds using a simple shell loop, with a journalctl -f running in the background.

Marathon app creation

In the terminal running journalctl -f we can see the output from our application (along with a failed ssh login from Colombia).

journald log

What we win and what we lose

The output from our container is now forwarded to journald. That means the output is no longer written to files in the sandbox, so we can't view it in the Mesos web interface.

This isn't an issue if log forwarding has been set up on all machines where a container might run, but wouldn't it be nice if we could have both? And while our change to make this happen was most delightfully simple, isn't it just a worse systemd-cat?

Of course it would, and sure it is. We'll take care of all that and more next time. Until then, you can find the mesos-journald-container-logger on GitHub.