Async, Futures, AMQP, pick three

A few weeks ago, we set out to develop an AMQP client library in Rust, and I'm happy to release it now! We will integrate it in more and more of our tools in the future.

Design: a futures based API and a low level API

One of our goals was to leverage tokio and futures to make an API that is easy to use, but also allowing for lower level implementations using directly an event loop with something like mio.

This was a bit challenging, but we ended up with two crates:

The resulting code can work with tokio-core's event reactor, or even futures-cpupool.

The libraries use, for the network frame format: nom, the Rust parser combinators library; cookie-factory, the experimental serialization library with the same approach as nom. It is a great example of employing nom inside a tokio transport, and integrating a complex protocol's state machine directly with tokio-io. We will release a tutorial on how to write such a protocol soon.

The libraries are also designed to be completely independent from the network stream: you can use a basic TCP stream, a TLS stream or a unix socket, and you won't be blocked by rust-openssl version conflicts between many libraries (which was a big issue for us).

Using the futures API: publishing a message

Every method returns a future, to let you chain them: the connect result will give a correct client once the complete AMQP handshake was performed, the channel will be available once the server has answered, etc. But the nature of AMQP makes parallel work on the same connection easy.

extern crate futures;
extern crate tokio_core;
extern crate lapin_futures as lapin;

use std::default::Default;
use futures::Stream;
use futures::future::Future;
use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
use lapin::client::ConnectionOptions;
use lapin::channel::{BasicPublishOptions,QueueDeclareOptions};

fn main() {

  // create the reactor
  let mut core = Core::new().unwrap();
  let handle = core.handle();
  let addr = "127.0.0.1:5672".parse().unwrap();

  core.run(

    TcpStream::connect(&addr, &handle).and_then(|stream| {

      // connect() returns a future of an AMQP Client
      // that resolves once the handshake is done
      lapin::client::Client::connect(
        stream,
        &ConnectionOptions{
          username: "guest",
          password: "guest",
          ..Default::default()
        }
      )
    }).and_then(|client| {

      // create_channel returns a future that is resolved
      // once the channel is successfully created
      client.create_channel()
    }).and_then(|channel| {
      let id = channel.id;
      info!("created channel with id: {}", id);

      channel.queue_declare("hello", &QueueDeclareOptions::default()).and_then(move |_| {
        info!("channel {} declared queue {}", id, "hello");

        channel.basic_publish(
          "hello",
          b"hello from tokio",
          &BasicPublishOptions::default(),
          BasicProperties::default().with_user_id("guest".to_string()).with_reply_to("foobar".to_string())
        )
      })
    })
  ).unwrap();
}

Every struct of the API, be it a client, channel or consumer, holds a synchronized reference to the underlying transport, so you could call it from any thread.

Using the futures API: creating a consumer

When you call the basic_consume method, it returns a future of a Consumer. It implements Stream, so this can reuse all the related combinators from the futures library.

extern crate futures;
extern crate tokio_core;
extern crate lapin_futures as lapin;

use futures::Stream;
use futures::future::Future;
use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
use lapin::client::ConnectionOptions;
use lapin::channel::{BasicConsumeOptions,QueueDeclareOptions};

fn main() {

  // create the reactor
  let mut core = Core::new().unwrap();
  let handle = core.handle();
  let addr = "127.0.0.1:5672".parse().unwrap();

  core.run(

    TcpStream::connect(&addr, &handle).and_then(|stream| {
      lapin::client::Client::connect(stream, &ConnectionOptions::default())
    }).and_then(|client| {

      client.create_channel()
    }).and_then(|channel| {

      let id = channel.id;
      info!("created channel with id: {}", id);

      let ch = channel.clone();
      channel.queue_declare("hello", &QueueDeclareOptions::default()).and_then(move |_| {
        info!("channel {} declared queue {}", id, "hello");

        channel.basic_consume("hello", "my_consumer", &BasicConsumeOptions::default())
      }).and_then(|stream| {
        info!("got consumer stream");

        stream.for_each(|message| {
          debug!("got message: {:?}", message);
          info!("decoded message: {:?}", std::str::from_utf8(&message.data).unwrap());

          ch.basic_ack(message.delivery_tag);
          Ok(())
        })
      })
    })
  ).unwrap();
}

Looking under the hood: lapin-async

The lapin-async library is meant for use with an event loop that will tell you when you can read or write on the underlying stream. As such, it does not own the network stream, nor the buffers used to read and write. You handle your IO, then pass the buffers to the protocol's state machine. It will update its state, tell you how much data it consumed, give you data to send to the network. And then you can query it for state changes.

There are various reasons for an architecture like this one:

  • a library that owns the IO stream usually does not play well with event loops
  • the developer might want to make their own optimizations with sockets and buffers
  • separating the IO makes the library easy to test: you can pass buffers (or even
    complete structs) to the state machine and verify the expected state easily

More generally, a protocol library should not dictate how the application handles its networking.

As an example of how it could run:

let mut stream = TcpStream::connect("127.0.0.1:5672").unwrap();
stream.set_nonblocking(true);

let capacity = 8192;
let mut send_buffer    = Buffer::with_capacity(capacity as usize);
let mut receive_buffer = Buffer::with_capacity(capacity as usize);

let mut conn: Connection = Connection::new();
assert_eq!(conn.connect().unwrap(), ConnectionState::Connecting(ConnectingState::SentProtocolHeader));
loop {
  match conn.run(&mut stream, &mut send_buffer, &mut receive_buffer) {
    Err(e) => panic!("could not connect: {:?}", e),
    Ok(ConnectionState::Connected) => break,
    Ok(state) => info!("now at state {:?}, continue", state),
  }
  thread::sleep(time::Duration::from_millis(100));
}
info!("CONNECTED");

the run method is a helper that will read from the network, parse frames, update internal state with the frames, write new frames to the network. We loop until the state switches to "connected". Most of the behaviour is on that model.

While the lapin-async library has most of the functionality, it is still a lot of manual work to manage, and you should prefer the futures based library.

A young library

This is an early release, and it is missing a lot of features, but the design makes them easy to implement.

Right now, the only authentication method is "plain", you can create and close channels, create queues (without options), and use the methods from the "basic" AMQP class. RabbitMQ's "publisher confirms" extension is also available.

It is mainly missing the "nack" extension, and the exchange and transaction handling methods.

More features will come in the following weeks, and if you want to contribute, you're very welcome 🙂

Blog

À lire également

How We Deployed a Vinext Application in Minutes on Clever Cloud with AI

Cloudflare has just announced Vinext, a drop-in replacement for Next.js built on Vite. The project is vibecoded, experimental, but the promise is compelling: builds up to 4x faster, bundles 57% lighter, and 94% coverage of the Next.js API. We wanted to see how easy it would be to deploy it on Clever Cloud.
Engineering

New IAM feature: what our Managed Keycloak offers today

Since its launch, Keycloak as a Service has evolved significantly to meet the concrete needs of businesses and the requirements of operating IAM at scale.
Engineering

Clever Cloud to be heard by the National Assembly’s Law Committee in the context of the bill on securing Digital Public Procurement

Nantes, 16 February 2026 – Clever Cloud is honoured to be heard on 20 February 2026 before the Law Committee of the French National Assembly as part of the examination of Bill No. 2258 on securing digital public procurement, adopted by the Senate.
Company Press