cl-disque

2015-09-23

A Disque client for Common Lisp.

Disque is an in-memory job queue and message broker. It's distributed and fault-tolerant so it works as a middle layer among processes that want to exchange messages.

Learn more about message queues here, and more about the Disque implementation here.

cl-disque provides a client for working with Disque through sending and receiving jobs and commands.

Usage

Quickstart

First, make sure a Disque server is running.

Load cl-disque and connect to the Disque server on the given host and port:

  (ql:quickload 'cl-disque)

  ; host defaults to 127.0.0.1, port defaults to 7711
  (cl-disque:connect :host <host> :port <port>)

You can interact with the server using commands from the disque package.

(disque:hello)
;=> (1 "node-id"
;    ("node-ids" "host" "7711" "1"))

Disconnect from the Disque server with:

(cl-disque:disconnect)

Alternatively, you can wrap all interactions in the with-connection macro, which creates a new connection to execute the given body, and assures a disconnect afterwards:

(cl-disque:with-connection ()
  (disque:addjob "queue" "job" 0)
  (disque:getjob "queue" 1))
;=> (("queue" "job-hash" "job"))

The Makefile offers a couple of commands for running the test-suite and loading cl-disque into an SBCL repl:

# To run the test suite
$ make test
# To load an SBCL repl
$ make sbcl-repl

Available commands

Cl-Disque supports all of the Disque client commands and their arguments. See The Disque Documentation for more specifics on each command

INFO

  • Args: ()
  • Response-type: :bulk

HELLO

  • Args: ()
  • Response-type: :multi

QLEN

  • Args: (queue)
  • Response-type: :integer

QPEEK

  • Args: (queue count)
  • Response-type: :multi

QSCAN

  • Args: (&rest args &key count busyloop minlen maxlen importrate)
  • Response-type: :multi

GETJOB

  • Args: (queues &rest args &key nohang timeout count withcounters)
  • Reponse-type: :multi

Note: queues can either be a single queue or a list of queues:

(disque:getjob "queue1")
;; or
(disque:getjob '("queue1" "queue2" "queue3")

ADDJOB

  • Args: (queue job timeout &rest args &key replicate delay retry ttl
maxlen async)
  • Response-type: :status

ACKJOB

  • Args: (job &rest jobs)
  • Response-type: :integer

FASTACK

  • Args: (job &rest jobs)
  • Response-type: :integer

WORKING

  • Args: (job)
  • Response-type: :integer

NACK

  • Args: (job &rest jobs)
  • Response-type: :integer

ENQUEUE

  • Args: (job &rest jobs)
  • Reponse-type: :integer

DEQUEUE

  • Args: (job &rest jobs)
  • Response-type :integer

DELJOB

  • Args: (job &rest jobs)
  • Response-type: :integer

SHOW

  • Args: (job)
  • Response-type: :multi

JSCAN

  • Args: (cursor &rest args &key count blocking queue state reply)
  • Response-type: :multi

Code organization

The system provides two packages: CL-DISQUE and DISQUE.

Everything is available in the CL-DISQUE package.

The DISQUE package contains all of the commands for interacting with a Disque server. This is simply syntactic sugar, as all of the commands are also available in the CL-DISQUE package with a command prefix. For Example:

(disque:info)
; is the same as
(cl-disque:disque-info)

Installation

Git clone this repo into your ~/quicklisp/local-projects/ directory, and (ql:quickload :cl-disque).

Dependencies

Debugging and error recovery

If *echo-p* is T, all client-server communications will be echoed to the stream *echo-stream*, which defaults to *standard-output*.

Error handling is mimicked after Postmodern. In particular, whenever an error occurs that breaks the communication stream, a condition of type disque-connection-error is signalled offering a :reconnect restart. If it is selected the whole Disque command will be resent, if the reconnection attempt succeeds. Furthermore, connect checks if a connection to Disque is already established, and offers two restarts (:leave and :replace) if this is the case.

When the server respondes with an error reply a condition of type disque-error-reply is signalled.

There's also a high-level with-persistent-connection macro, that tries to do the right thing? (i.e. automatically reopen the connection once, if it is broken).

Advanced usage

Pipelining

For better performance Disque allows to pipeline commands and delay receiving results until the end, and process them all in oine batch afterwards. To support that there's with-pipelining macro.

Note, that with-pipelining calls theoretically may nest, but the results will only be available to the highest-level pipeline, all the nested pipelines will return :PIPELINED. So a warining is signalled in this situation.

Note: Pipelining has not been tested since being ported form cl-redis.

Credits

Cody Reichert <codyreichert@gmail.com> is the maintainer of CL-DISQUE.

CL-DISQUE is a ported of the CL-REDIS client, which is developed and maintained by Vsevolod Dyomkin <vseloved@gmail.com>. Many thanks to him for implementing the protocol and providing most of the internals.

Alexandr Manzyuk <manzyuk@googlemail.com> also contributed to CL-REDIS client and developed the connection handling code following the implementation in Postmodern. It was since partially rewritten to accommodate more advanced connection handling strategies, like persistent connection.

License

MIT (See LICENSE file for details).

Author
Cody Reichert <codyreichert@gmail.com>, Cody Reichert
Maintainer
Cody Reichert <codyreichert@gmail.com>
License
MIT