rstream

A Python asyncio-based client for RabbitMQ Streams

https://github.com/rabbitmq-community/rstream

Science Score: 26.0%

This score indicates how likely this project is to be science-related based on various indicators:

  • CITATION.cff file
  • codemeta.json file
    Found codemeta.json file
  • .zenodo.json file
    Found .zenodo.json file
  • DOI references
  • Academic publication links
  • Committers with academic emails
  • Institutional organization owner
  • JOSS paper metadata
  • Scientific vocabulary similarity
    Low similarity (10.1%) to scientific vocabulary

Keywords

asyncio python rabbitmq rabbitmq-client rabbitmq-streams
Last synced: 6 months ago · JSON representation

Repository

A Python asyncio-based client for RabbitMQ Streams

Basic Info
  • Host: GitHub
  • Owner: rabbitmq-community
  • License: mit
  • Language: Python
  • Default Branch: master
  • Homepage:
  • Size: 462 KB
Statistics
  • Stars: 93
  • Watchers: 7
  • Forks: 17
  • Open Issues: 16
  • Releases: 39
Topics
asyncio python rabbitmq rabbitmq-client rabbitmq-streams
Created over 4 years ago · Last pushed 6 months ago
Metadata Files
Readme License

README.md

RabbitMQ Stream Python Client

A Python asyncio-based client for RabbitMQ Streams

The RabbitMQ stream plug-in is required. See the documentation for enabling it.

Table of Contents

Installation

The RabbitMQ stream plug-in is required. See the documentation for enabling it.

The client is distributed via PIP: bash pip install rstream

Examples

Here you can find different examples.

Client Codecs

Before start using the client is important to read this section. The client supports two codecs to store the messages to the server: - AMQP 1.0 - Binary

By default you should use AMQP 1.0 codec: python amqp_message = AMQPMessage( body=bytes("hello: {}".format(i), "utf-8"), )

AMQP 1.0 codec vs Binary

You need to use the AMQP 1.0 codec to exchange messages with other stream clients like Java, .NET, Rust, Go or if you want to use the AMQP 0.9.1 clients.

You can use the Binary version if you need to exchange messages from Python to Python.

Note: The messages stored in Binary are not compatible with the other clients and with AMQP 0.9.1 clients.
Once the messages are stored to the server, you can't change them.

Read also the Client Performances section

Publishing messages

You can publish messages with four different methods:

  • send: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires.
  • send_batch: synchronous, the user buffers the messages and sends them. This is the fastest publishing method.
  • send_wait: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method.
  • send_sub_entry: asynchronous. See Sub-entry batching and compression.

On the examples directory you can find diffent way to send the messages: - producer using send - producer using send_wait - producer using send_batch - producer using subentrybatch

Publishing with confirmation

The Send method takes as parameter an handle function that will be called asynchronously when the message sent will be notified from the server to have been published.

Example: - producer using send and handling confirmation - producer using send_batch and handling confirmation

With send_wait instead will wait until the confirmation from the server is received.

Sub-Entry Batching and Compression

RabbitMQ Stream provides a special mode to publish, store, and dispatch messages: sub-entry batching. This mode increases throughput at the cost of increased latency and potential duplicated messages even when deduplication is enabled. It also allows using compression to reduce bandwidth and storage if messages are reasonably similar, at the cost of increasing CPU usage on the client side.

Sub-entry batching consists in squeezing several messages – a batch – in the slot that is usually used for one message. This means outbound messages are not only batched in publishing frames, but in sub-entries as well.

```python

# sending with compression await producer.sendsubentry( STREAM, compressiontype=CompressionType.Gzip, subentry_messages=messages ) ``` Full example producer using sub-entry batch

Consumer side is automatic, so no need configurations.

The client is shipped with No Compression (CompressionType.No) and Gzip Compression (CompressionType.Gzip) the other compressions (Snappy, Lz4, Zstd) can be used implementing the ICompressionCodec class.

Deduplication

RabbitMQ Stream can detect and filter out duplicated messages, based on 2 client-side elements: the producer name and the message publishing ID. All the producer methods to send messages (send, sendbatch, sendwait) takes a publisher_name parameter while the message publishing id can be set in the AMQP message.

Example: - producer with deduplication

Consuming messages

See consumer examples for basic consumer and consumers with different offsets.

Server-side offset tracking

RabbitMQ Streams provides server-side offset tracking for consumers. This features allows a consuming application to restart consuming where it left off in a previous run. You can use the storeoffset (to store an offset in the server) and queryoffset (to query it) methods of the consumer class like in this example: - server side offset tracking

Superstreams

A super stream is a logical stream made of individual, regular streams. It is a way to scale out publishing and consuming with RabbitMQ Streams: a large logical stream is divided into partition streams, splitting up the storage and the traffic on several cluster nodes.

See the blog post for more info.

You can use superstream_producer and superstream_consumer classes which internally uses producers and consumers to operate on the componsing streams.

See the Super Stream example

Single Active Consumer

Single active consumer provides exclusive consumption and consumption continuity on a stream.
See the blog post for more info. See examples in:

See the single active consumer example

Filtering

Filtering is a new streaming feature enabled from RabbitMQ 3.13 based on Bloom filter. RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side. This helps to save network bandwidth when a consuming application needs only a subset of messages.

https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#filtering

See the filtering examples

Connecting with SSL

You can enable ssl/tls. See example here: tls example

Sasl Mechanisms

You can use the following sasl mechanisms: - PLAIN - EXTERNAL

The client uses PLAIN mechanism by default.

The EXTERNAL mechanism is used to authenticate a user based on a certificate presented by the client. Example: ```python sslcontext = ssl.SSLContext(ssl.PROTOCOLTLSCLIENT) # put the root certificate of the ca sslcontext.loadverifylocations("certs/cacertificate.pem") sslcontext.loadcertchain( "certs/clientHOSTNAMEcertificate.pem", "certs/clientHOSTNAMEkey.pem", )

async with Producer(
    "HOSTNAME",
    username="not_important",
    password="not_important",
    port=5551,
    ssl_context=ssl_context,
    sasl_configuration_mechanism=SlasMechanism.MechanismExternal ## <--- here EXTERNAL configuration

The plugin `rabbitmq_auth_mechanism_ssl` needs to be enabled on the server side, and `ssl_options.fail_if_no_peer_cert` needs to set to `true` config example: authmechanisms.3 = PLAIN authmechanisms.2 = AMQPLAIN auth_mechanisms.1 = EXTERNAL

ssloptions.cacertfile = certs/cacertificate.pem ssloptions.certfile = certs/servercertificate.pem ssloptions.keyfile = certs/serverkey.pem listeners.ssl.default = 5671 stream.listeners.ssl.default = 5551 ssloptions.verify = verifypeer ssloptions.failifnopeer_cert = true ```

Managing disconnections

The client supports auto-reconnect just for Producer and SuperstreamProducer at the moment.

When the TCP connection is disconnected unexpectedly, the Producer and the SuperstreamProducer will try to automatically reconnect while in case of the Consumer/SuperstreamConsumer the client raises an event that needs to be managed:

```python async def onconnectionclosed(disconnectioninfo: OnClosedErrorInfo) -> None: print( "connection has been closed from stream: " + str(disconnectioninfo.streams) + " for reason: " + str(disconnection_info.reason) )

consumer = Consumer( ..
onclosehandler=onconnectionclosed, ) ```

Reconnect

When the on_close_handler event is raised, you can close the Consumers by doing a correct clean-up or try reconnecting using the reconnect stream.

Example: ```python async def onconnectionclosed(disconnectioninfo: OnClosedErrorInfo) -> None: print( "connection has been closed from stream: " + str(disconnectioninfo.streams) + " for reason: " + str(disconnection_info.reason) )

    for stream in disconnection_info.streams:
        print("reconnecting stream: " + stream)
        await producer.reconnect_stream(stream)

```

Please take a look at the complete reliable client example here

Metadata Update

If the streams topology changes (ex:Stream deleted or add/remove follower), The server removes the producers and consumers linked to the stream and then it sends the Metadata update event. the behaviour is similar to what we have for disconnections. In case of the Producer/Superstream Producer the Client will try to automatically reconnect while the Consumer needs to manage the onclosehandler event.

Please take a look at the complete reliable client example here

Load Balancer

In order to handle load balancers, you can use the load_balancer_mode parameter for producers and consumers. This will always attempt to create a connection via the load balancer, discarding connections that are inappropriate for the client type.

Producers must connect to the leader node, while consumers can connect to any, prioritizing replicas if available.

Client Performances

The RabbitMQ Stream queues can handle high throughput. Currently, the client cannot reach the maximum throughput the server can handle.

We found some bottlenecks; one of them is the current AMQP 1.0 marshal and unmarshal message format.

This one: python for i in range(1_000_000): amqp_message = AMQPMessage( body=bytes("hello: {}".format(i), "utf-8"), ) # send is asynchronous await producer.send(stream=STREAM, message=amqp_message)

is more or less ~55% slower than: python for i in range(1_000_000): # send is asynchronous await producer.send(stream=STREAM, message=b"hello")

You can use the batch_send to test the performances.

We are evaluating rewriting the AMQP 1.0 codec optimized for the stream use case.

Test case

  • Linux Ubuntu 4 cores and 8 GB of Ram
  • RabbitMQ installed to the server

  • Send batch with AMQP 1.0 codec: python $ python3 docs/examples/basic_producers/producer_send_batch.py Sent 1.000.000 messages in 9.3218 seconds. 107.275,5970 messages per second

  • Send batch with binary codec: python $ python3 docs/examples/basic_producers/producer_send_batch_binary.py Sent 1.000.000 messages in 2.9930 seconds. 334.116,5639 messages per second

Build and Test

To run the tests, you need to have a running RabbitMQ Stream server. You can use the docker official image.

Run the server with the following command: bash docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \ -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \ rabbitmq:3.13.1-management

enable the plugin: bash docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management rabbitmq_amqp1_0

and run the tests: bash poetry run pytest

Project Notes

The project is in development and stabilization phase. Features and API are subject to change, but breaking changes will be kept to a minimum.
Any feedback or contribution is welcome

Owner

  • Name: rabbitmq-community
  • Login: rabbitmq-community
  • Kind: organization

GitHub Events

Total
  • Create event: 13
  • Issues event: 19
  • Release event: 10
  • Watch event: 8
  • Delete event: 11
  • Issue comment event: 37
  • Push event: 19
  • Pull request review comment event: 1
  • Pull request review event: 3
  • Pull request event: 13
  • Fork event: 2
Last Year
  • Create event: 13
  • Issues event: 19
  • Release event: 10
  • Watch event: 8
  • Delete event: 11
  • Issue comment event: 37
  • Push event: 19
  • Pull request review comment event: 1
  • Pull request review event: 3
  • Pull request event: 13
  • Fork event: 2

Committers

Last synced: 9 months ago

All Time
  • Total Commits: 155
  • Total Committers: 7
  • Avg Commits per committer: 22.143
  • Development Distribution Score (DDS): 0.471
Past Year
  • Commits: 27
  • Committers: 4
  • Avg Commits per committer: 6.75
  • Development Distribution Score (DDS): 0.556
Top Committers
Name Email Commits
Daniele d****a@v****m 82
Gabriele Santomaggio G****o@g****m 33
qweeze q****e@g****m 22
Cameron Jones c****s@v****m 9
Bulygin Evgeny 4****1 6
Cameron Jones c****s@g****m 2
aborigeth 1****h 1
Committer Domains (Top 20 + Academic)

Issues and Pull Requests

Last synced: 6 months ago

All Time
  • Total issues: 62
  • Total pull requests: 113
  • Average time to close issues: 3 months
  • Average time to close pull requests: 3 days
  • Total issue authors: 22
  • Total pull request authors: 9
  • Average comments per issue: 1.89
  • Average comments per pull request: 0.97
  • Merged pull requests: 103
  • Bot issues: 0
  • Bot pull requests: 0
Past Year
  • Issues: 13
  • Pull requests: 23
  • Average time to close issues: 9 days
  • Average time to close pull requests: about 21 hours
  • Issue authors: 13
  • Pull request authors: 7
  • Average comments per issue: 1.38
  • Average comments per pull request: 0.96
  • Merged pull requests: 20
  • Bot issues: 0
  • Bot pull requests: 0
Top Authors
Issue Authors
  • DanielePalaia (24)
  • Gsantomaggio (14)
  • wrobell (3)
  • xuqinghan (2)
  • wonesy (2)
  • garvenlee (1)
  • jbackofe (1)
  • chun0826 (1)
  • jason19970210 (1)
  • akrherz (1)
  • CedricCabessa (1)
  • twinnedAI (1)
  • arnab-mukherjee (1)
  • rajeev1982 (1)
  • dbotwinick (1)
Pull Request Authors
  • DanielePalaia (69)
  • Gsantomaggio (26)
  • nesb1 (7)
  • wonesy (4)
  • qweeze (3)
  • dbotwinick (2)
  • mattalexanderhill (2)
  • CedricCabessa (1)
  • aborigeth (1)
Top Labels
Issue Labels
enhancement (5) duplicate (3) performance (2) low-priority (2) bug (1)
Pull Request Labels

Packages

  • Total packages: 1
  • Total downloads:
    • pypi 9,717 last-month
  • Total docker downloads: 162
  • Total dependent packages: 1
  • Total dependent repositories: 1
  • Total versions: 43
  • Total maintainers: 2
pypi.org: rstream

A python client for RabbitMQ Streams

  • Versions: 43
  • Dependent Packages: 1
  • Dependent Repositories: 1
  • Downloads: 9,717 Last month
  • Docker Downloads: 162
Rankings
Docker downloads count: 3.1%
Downloads: 7.6%
Dependent packages count: 10.1%
Stargazers count: 10.2%
Average: 10.7%
Forks count: 11.4%
Dependent repos count: 21.6%
Maintainers (2)
Last synced: 6 months ago

Dependencies

.github/workflows/test.yaml actions
  • actions/checkout v2 composite
  • actions/setup-python v2 composite
  • snok/install-poetry v1 composite
  • pivotalrabbitmq/rabbitmq-stream * docker
poetry.lock pypi
  • appnope 0.1.2 develop
  • atomicwrites 1.4.0 develop
  • attrs 21.2.0 develop
  • backcall 0.2.0 develop
  • black 21.9b0 develop
  • click 8.0.3 develop
  • colorama 0.4.4 develop
  • decorator 5.0.9 develop
  • flake8 3.9.2 develop
  • iniconfig 1.1.1 develop
  • ipython 7.26.0 develop
  • ipython-genutils 0.2.0 develop
  • isort 5.9.3 develop
  • jedi 0.18.0 develop
  • matplotlib-inline 0.1.2 develop
  • mccabe 0.6.1 develop
  • mypy 0.910 develop
  • mypy-extensions 0.4.3 develop
  • packaging 21.0 develop
  • parso 0.8.2 develop
  • pathspec 0.9.0 develop
  • pexpect 4.8.0 develop
  • pickleshare 0.7.5 develop
  • platformdirs 2.4.0 develop
  • pluggy 0.13.1 develop
  • prompt-toolkit 3.0.19 develop
  • ptyprocess 0.7.0 develop
  • py 1.10.0 develop
  • pycodestyle 2.7.0 develop
  • pyflakes 2.3.1 develop
  • pygments 2.9.0 develop
  • pyparsing 2.4.7 develop
  • pytest 6.2.4 develop
  • pytest-asyncio 0.15.1 develop
  • regex 2021.10.21 develop
  • toml 0.10.2 develop
  • tomli 1.2.1 develop
  • traitlets 5.0.5 develop
  • typing-extensions 3.10.0.2 develop
  • wcwidth 0.2.5 develop
  • certifi 2021.10.8
  • six 1.16.0
  • uamqp 1.5.2
pyproject.toml pypi
  • black ^21.9b0 develop
  • flake8 ^3.9.2 develop
  • ipython ^7.13.0 develop
  • isort ^5.9.3 develop
  • mypy ^0.910 develop
  • pytest ^6.2.4 develop
  • pytest-asyncio ^0.15.1 develop
  • python ^3.9
  • uamqp ^1.5.2
.github/workflows/publish-prerelease.yaml actions
.github/workflows/publish-pypi.yaml actions
  • actions/checkout v3 composite
.github/workflows/publish-release.yaml actions