Replicating with Datomic

February 11, 2017

Motivation

I will focus in this post on a practical application of replikativ. I will talk more about the theoretical underpinnings and motivations in the next post. So in this concrete example we want to store tweets. The whole code is available here. We do not know yet what we want to do with them exactly, but we want to have a straightforward way to access them later. This is what replikativ was built for: An open data management layer as a baseline for distributed applications. Actually local applications are a subset of distributed applications, so this means all computer programs. This generality is not meaningful here though.

Setup

We need to require a bunch of namespaces:

(ns twitter-collector.core
  (:require [gezwitscher.core :refer [stream]]
            [clojure.core.async :refer [chan timeout]]
            [kabel.peer :refer [start stop]]
            [konserve
             [filestore :refer [new-fs-store delete-store]]
             [memory :refer [new-mem-store]]]
            [konserve-leveldb.core :refer [new-leveldb-store]]
            [replikativ
             [peer :refer [server-peer client-peer]]
             [stage :refer [connect! create-stage!]]]
            [replikativ.crdt.cdvcs.stage :as cs]
            [replikativ.stage :as s]
            [superv.async :refer [go-try <? <?? go-loop-try S]]
            [konserve.core :as k]))

This will probably be more unified under an API namespace at some point, but right now I want to expose the primitives for exploration. gezwitscher is our twitter4j wrapper, kabel provides network IO, konserve storage IO and replikativ our interface to CRDTs Confluent replicated datatypes, you can find out more about them here or in my next post. . We also use superv.async, a version of core.async with Erlang-like error-handling for more robustness and better stacktraces. Note also that while we use the JVM here, all these libraries also work with ClojureScript in the browser.

We then define some constants describing the universal address of our datatype in the replikativ system.

(def user "mail:twitter@crawler.com") ;; will be used to authenticate you (not yet)

(def cdvcs-id #uuid "12d49511-e733-4007-937b-460c3794fae9")

Storing tweets

We use a simple buffer for tweets, so we can write at our speed. This is not strictly necessary, but it makes the system more scalable if you do not write every piece of information atomically, but in batches. The twitter API can become very bursty at times. The metadata grows with every operation and is still communicated in total in a handshake on each connection. This can be addressed by a durable persistent datastructure like a Hitchhiker tree, providing optimal deltas. For now the buffer and fixed transaction speed for this datatype instance will allow write capacity for at least a year, or more than three million write operations.

;; buffering function
(defn new-tweet [pending status]
  (swap! pending (fn [[prev cur] status] [prev (conj cur status)]) status))


(defn store-tweets [stage pending]
  (go-try S
   (let [tweets (vec (first (swap! pending (fn [[prev cur]] [cur '()]))))
         tweet-txs (mapv (fn [t] ['add-tweet t]) tweets)]
     (when-not (empty? tweets)
       (<? S (cs/transact! stage [user cdvcs-id] tweet-txs))))))

The store-tweets function takes the tweets out of the buffer, transforms them into a very simple form of bytecode, describing the function to call with the symbol add-tweet. You are free to put any Clojure datastructure there you like, importantly you can use the function’s source code to communicate unambigously for which function each transaction was meant. But for now you can ignore it and just use a symbol. The crucial call is to cs/transact which will feed the tweets into the specified datatype address.

Let’s fire up replikativ now:

(def store-path "/tmp/twitter-store")

(def store (<?? S (new-leveldb-store store-path)))

(def peer (<?? S (server-peer S store "ws://127.0.0.1:9095")))

(def pending (atom ['() '()]))

(start peer)

So we have created a store With LevelDB as the backend. and a peer, reachable on localhost Using plain websockets (no SSL).

We now set up the interface to replikativ, which we call a stage This is an abuse of the git terminology, because we initially staged changes first before committing them there. If you have better proposals, let me know :). :

(def stage (<?? S (create-stage! user peer)))

(<?? S (cs/create-cdvcs! stage :id cdvcs-id))

We have also created the datatype now, which includes global subscription.

We create a loop to flush the buffer every ten seconds:

(go-loop-try S []
                 (<? S (store-tweets stage pending))
                 (<? S (timeout (* 10 1000)))
                 (recur))

Finally we hook into twitter:

(def twitter-stream
        (stream
         (read-string (slurp "credentials.edn"))
         []
         (vec topics)
         (partial new-tweet pending)
         (fn [e]
           (println "Restarting stream due to:" e)
           (twitter-stream)
           (println "Waiting 15 minutes for rate limit.")
           (go-try S (<? S (timeout (* 15 60 1000)))
                   (start-filter-stream)))))

Now we store our data. Perfect, wait, what do we do with it? I assume we were interested in some tweets beforehand, but so far we just cheaply store data. The nice thing about using replikativ is that data collection and data evaluation are decoupled. This is the point of replikativ really, as I will clearify in the next blog post.

We can see from the log output that things are happening, but how do we access the data?

Accessing the tweets

The following code does in no way access the collector written above. So you can run it on another machine. I use it to analyze tweets on my laptop from time to time, for example.

So let’s setup a replikativ peer on a machine you want to do analysis on:

(ns twitter-collector.client
  (:require [twitter-collector.core :refer [user cdvcs-id]]
            [replikativ.crdt.cdvcs.realize :as r]
            [replikativ.peer :refer [client-peer]]
            [replikativ.p2p.fetch :refer [fetch]]
            [replikativ.stage :refer [create-stage! connect!]]
            [replikativ.crdt.cdvcs.stage :as cs]
            [konserve.filestore :refer [new-fs-store]]
            [clojure.core.async :as async]
            [superv.async :refer [<?? S]]
            [replikativ.stage :as s]
            [konserve.core :as k]
            [replikativ.crdt.cdvcs.stage :as cs]
            [datomic.api :as d]))

Note that we use the realization for CDVCS, which supplies us with streaming.

First we do the same setup as above, but for the client, and connect to the server (e.g. localhost):

(def client-store (<?? S (new-fs-store "/home/christian/twitter")))

(def client (<?? S (client-peer S client-store)))

(def client-stage (<?? S (create-stage! user client)))

(<?? S (cs/create-cdvcs! client-stage :id cdvcs-id))

(<?? S (connect! client-stage "ws://localhost:9095"))

Finally we extract all tweet texts into an atom:

(def tweets (atom []))

(def atom-stream (r/stream-into-identity! client-stage [user cdvcs-id]
                                          {'add-tweet (fn [old t]
                                                        (swap! old into [(:text t)])
                                                        old)}
                                          tweets))

The crucial part here is stream-into-identity which does what you expect it to do. It realizes the transactions with help of the supplied map. Again, think of them as bytecode, if you like. Feel free to peek at the tweet atom, or do a regex on it:

(take-last 10 @tweets)

(count (filter (fn [s] (re-find #"ethereum" s)) @tweets))

Note that we decide upon the semantics of the realization process at the point where we stream into some identity. You can take the same raw tweet data and stream it into a file or, finally, in Datomic. If we would have streamed it into Datomic directly, we would have thrown away a lot of data, which we might be interested in later. If not, then you need to have some other storage system, too.

Streaming into Datomic

Assuming you have a Datomic instance set up, you can leverage its powerful query capabilities to build applications as usual:

(def db-uri "datomic:free://localhost:4334/tweets")

(def conn (d/connect db-uri))

(defn tweet-tx [{:keys [id text timestamp_ms user
                        retweet_count favorite_count] :as tw}]
  {:db/id (d/tempid :db.part/user)
   :tweet/id id
   :tweet/text text
   :tweet/ts (java.util.Date. (Long/parseLong timestamp_ms))
   :tweet/screenname (:screen_name user)
   :tweet/favourite-count favorite_count
   :tweet/retweet-count retweet_count})
   
(def datomic-stream (r/stream-into-identity! client-stage
                                             [user cdvcs-id]
                                             {'add-tweet (fn [conn twt]
                                                           @(d/transact conn [(tweet-tx twt)])
                                                           conn)}
                                             conn
                                             :applied-log :datomic-analysis
                                             )) 

So again we stream the datatype, but we use a different evaluation map. We map all tweets into Datomic fitting its schema and then transact them. Note that we supply an :applied-log key which makes the stream persistent. It will remember applied operations on restarts and provides exactly once semantics for d/transact. The schema is provided in the project.

Let’s have some fun:

(d/q '[:find (count ?t)
       :where
       [?tw :tweet/text ?t]]
     (d/db conn))
       
(->> (d/q '[:find ?txt
            :in $
            :where
            [(fulltext $ :tweet/text "ethereum")
             [[?entity ?name ?tx ?score]]]
            [?entity :tweet/text ?txt]]
         (d/db conn))
    (take 10))
    
(require '[incanter.core :refer :all]
         '[incanter.charts :refer :all])

 ;; most active accounts        
(let [res (->> (d/q '[:find ?u (count ?t)
                       :where
                       [?t :tweet/screenname ?u]]
                      (d/db conn))
                 (sort-by second)
                 reverse
                 (take 5))
        screenname (map first res)
        tweet-count (map second res)]
    (view (bar-chart screenname tweet-count)))

Conclusion

So we have used replikativ to decouple the write operations in an eventual consistent fashion from our query logic, which allows us to build a distributed architecture. You can also write directly on the same peer through replikativ into Datomic strongly consistently until Datomic has transacted. replikativ will then provide you with a simple means to replicate the streamed data elsewhere.

In this example we use CDVCS, which behaves like git if you write to it on multiple replicas: You have to resolve conflicts before writing more data. Other datatypes like replikativ’s OR-Map provide conflict-free semantics, but do not guarantee the order in which operations are applied. I think this is more appropriate for this particular use case, but CDVCS allows you to safely serialize all write operations in the same strongly consistent way as Datomic expects.

You have to write some glue code, but note what we haven’t done. We have not opened any connections for our datatype The peers easily could actually automatically connect in a P2P fashion, but this is not implemented yet. . You also do not have to care about storage IO, because replikativ manages all your data and distributes it openly as needed with different storage backends including the filesystem. replikativ is not a database in the sense of powerful queries though. For this its streaming functionality leverages all the tools you already know and love. It is there to decouple distributed data management for you. Or to just store some raw data for later retrieval. If you have any problems with the examples, feel free to join our chat and complain :).

Replicating with Datomic - February 11, 2017 - christian weilbach