I recently found myself wanting to work with Amazon’s Simple Queue Service (SQS), but I could find no reasonable Clojure library for accessing it. Of course, AWS’ own Java SDK is the canonical implementation of their APIs (at least in the JVM space), so putting together a Clojure wrapper that adds a few handy extras wasn’t particularly difficult.
You can find Bandalore hosted on github, licensed under the EPL. A proper release will find its way into Maven central within the next couple of days. The code isn’t much more than 12 hours old, so consider yourself forewarned. ;-)
I hope people find the library useful. If you’ve any questions, feel free to ping me in irc or twitter.
What follows is an excerpt from the README documentation for Bandalore that describes some of its more interesting functionality:
seqs being the lingua franca of Clojure collections, it would be helpful if we could treat an SQS queue as a seq of messages. While
receive does return a seq of messages, each
receive call is limited to receiving a maximum of 10 messages, and there is no streaming or push counterpart in the SQS API.
The solution to this is
polling-receive, which returns a lazy seq that reaches out to SQS as necessary:
=> (map (sqs/deleting-consumer client :body) (sqs/polling-receive client q :limit 10)) ("3" "5" "7" "8" ... "81" "90" "91")
polling-receive accepts all of the same optional kwargs as
receive does, but adds two more to control its usage of
receive API calls:
:period– time in ms to wait after an unsuccessful `receive` request (default: 500)
:max-wait– maximum time in ms to wait to successfully receive messages before terminating the lazy seq (default 5000ms)
Often queues are used to direct compute resources, so you’d like to be able to saturate those boxen with as much work as your queue can offer up. The obvious solution is to
pmap across a seq of incoming messages, which you can do trivially with the seq provided by
polling-receive. Just make sure you tweak the
:max-wait time so that, assuming you want to continuously process incoming messages, the seq of messages doesn’t terminate because none have been available for a while.
Here’s an example where one thread sends a message once a second for a minute, and another consumes those messages using a lazy seq provided by
=> (defn send-dummy-messages [client q count] (future (doseq [n (range count)] (Thread/sleep 100) (sqs/send client q (str n))))) #'cemerick.bandalore-test/send-dummy-messages => (defn consume-dummy-messages [client q] (future (dorun (map (sqs/deleting-consumer client (comp println :body)) (sqs/polling-receive client q :max-wait Integer/MAX_VALUE :limit 10))))) #'cemerick.bandalore-test/consume-dummy-messages => (consume-dummy-messages client q) ;; start the consumer #<core$future_call$reify__5500@a6f00bc: :pending> => (send-dummy-messages client q 1000) ;; start the sender #<core$future_call$reify__5500@18986032: :pending> 3 4 1 0 2 8 5 7 ...
You’d presumably want to set up some ways to control your consumer. Hopefully it’s clear that it would be trivial to parallelize the processing function being wrapped by
pmap, distribute processing among agents if that’s more appropriate, etc.