Replicating with Datomic
February 11, 2017
I will focus in this post on a practical application of replikativ. I will talk more about the theoretical underpinnings and motivations in the next post. So in this concrete example we want to store tweets. The whole code is available here. We do not know yet what we want to do with them exactly, but we want to have a straightforward way to access them later. This is what replikativ was built for: An open data management layer as a baseline for distributed applications. Actually local applications are a subset of distributed applications, so this means all computer programs. This generality is not meaningful here though.
We need to require a bunch of namespaces:
This will probably be more unified under an API namespace at some point, but right now I want to expose the primitives for exploration. gezwitscher is our twitter4j wrapper, kabel provides network IO, konserve storage IO and replikativ our interface to CRDTs Confluent replicated datatypes, you can find out more about them here or in my next post. . We also use superv.async, a version of core.async with Erlang-like error-handling for more robustness and better stacktraces. Note also that while we use the JVM here, all these libraries also work with ClojureScript in the browser.
We then define some constants describing the universal address of our datatype in the replikativ system.
We use a simple buffer for tweets, so we can write at our speed. This is not strictly necessary, but it makes the system more scalable if you do not write every piece of information atomically, but in batches. The twitter API can become very bursty at times. The metadata grows with every operation and is still communicated in total in a handshake on each connection. This can be addressed by a durable persistent datastructure like a Hitchhiker tree, providing optimal deltas. For now the buffer and fixed transaction speed for this datatype instance will allow write capacity for at least a year, or more than three million write operations.
store-tweets function takes the tweets out of the buffer, transforms them
into a very simple form of bytecode, describing the function to call with the
add-tweet. You are free to put any Clojure datastructure there you
like, importantly you can use the function’s source code to communicate
unambigously for which function each transaction was meant. But for now you can
ignore it and just use a symbol. The crucial call is to
cs/transact which will
feed the tweets into the specified datatype address.
Let’s fire up replikativ now:
So we have created a store With LevelDB as the backend. and a peer, reachable on localhost Using plain websockets (no SSL).
We now set up the interface to replikativ, which we call a stage This is an abuse of the git terminology, because we initially staged changes first before committing them there. If you have better proposals, let me know :). :
We have also created the datatype now, which includes global subscription.
We create a loop to flush the buffer every ten seconds:
Finally we hook into twitter:
Now we store our data. Perfect, wait, what do we do with it? I assume we were interested in some tweets beforehand, but so far we just cheaply store data. The nice thing about using replikativ is that data collection and data evaluation are decoupled. This is the point of replikativ really, as I will clearify in the next blog post.
We can see from the log output that things are happening, but how do we access the data?
Accessing the tweets
The following code does in no way access the collector written above. So you can run it on another machine. I use it to analyze tweets on my laptop from time to time, for example.
So let’s setup a replikativ peer on a machine you want to do analysis on:
Note that we use the realization for CDVCS, which supplies us with streaming.
First we do the same setup as above, but for the client, and connect to the server (e.g. localhost):
Finally we extract all tweet texts into an atom:
The crucial part here is
stream-into-identity which does what you expect it to
do. It realizes the transactions with help of the supplied map. Again, think of them as bytecode, if you like. Feel free to peek at
the tweet atom, or do a regex on it:
Note that we decide upon the semantics of the realization process at the point where we stream into some identity. You can take the same raw tweet data and stream it into a file or, finally, in Datomic. If we would have streamed it into Datomic directly, we would have thrown away a lot of data, which we might be interested in later. If not, then you need to have some other storage system, too.
Streaming into Datomic
Assuming you have a Datomic instance set up, you can leverage its powerful query capabilities to build applications as usual:
So again we stream the datatype, but we use a different evaluation map. We map
all tweets into Datomic fitting its schema and then transact them. Note that we
:applied-log key which makes the stream persistent. It will remember
applied operations on restarts and provides exactly once semantics for
d/transact. The schema is provided in the project.
Let’s have some fun:
So we have used replikativ to decouple the write operations in an eventual consistent fashion from our query logic, which allows us to build a distributed architecture. You can also write directly on the same peer through replikativ into Datomic strongly consistently until Datomic has transacted. replikativ will then provide you with a simple means to replicate the streamed data elsewhere.
In this example we use CDVCS, which behaves like git if you write to it on multiple replicas: You have to resolve conflicts before writing more data. Other datatypes like replikativ’s OR-Map provide conflict-free semantics, but do not guarantee the order in which operations are applied. I think this is more appropriate for this particular use case, but CDVCS allows you to safely serialize all write operations in the same strongly consistent way as Datomic expects.
You have to write some glue code, but note what we haven’t done. We have not opened any connections for our datatype The peers easily could actually automatically connect in a P2P fashion, but this is not implemented yet. . You also do not have to care about storage IO, because replikativ manages all your data and distributes it openly as needed with different storage backends including the filesystem. replikativ is not a database in the sense of powerful queries though. For this its streaming functionality leverages all the tools you already know and love. It is there to decouple distributed data management for you. Or to just store some raw data for later retrieval. If you have any problems with the examples, feel free to join our chat and complain :).