webentwicklung-frage-antwort-db.com.de

So erstellen Sie eine verzögerte Warteschlange in RabbitMQ

Was ist der einfachste Weg, um eine Verzögerungswarteschlange mit Python, Pika und RabbitMQ zu erstellen? Ich habe ähnliche Fragen gesehen, aber keine für Python. 

Ich halte dies für eine hilfreiche Idee beim Entwerfen von Anwendungen, da wir damit Nachrichten drosseln können, die erneut in die Warteschlange gestellt werden müssen.

Es besteht immer die Möglichkeit, dass Sie mehr Nachrichten erhalten, als Sie verarbeiten können. Möglicherweise ist der HTTP-Server langsam oder die Datenbank ist zu stark beansprucht.

Ich fand es auch sehr nützlich, wenn in Szenarien, in denen eine Null-Toleranz für den Verlust von Nachrichten besteht, ein Fehler aufgetreten ist. Es kann auch Probleme verursachen, wenn die Nachricht immer wieder in die Warteschlange gestellt wird. Dies kann zu Leistungsproblemen führen und Spam protokollieren. 

41
eandersson

Ich fand das sehr nützlich bei der Entwicklung meiner Anwendungen. Da es Ihnen eine Alternative gibt, können Sie Ihre Nachrichten einfach neu in die Warteschlange stellen. Dies kann die Komplexität Ihres Codes leicht reduzieren und ist eine von vielen leistungsstarken, verborgenen Funktionen in RabbitMQ.

Schritte

Zuerst müssen wir zwei Basiskanäle einrichten, einen für die Hauptwarteschlange und einen für die Verzögerungswarteschlange. In meinem Beispiel am Ende füge ich ein paar zusätzliche Flags ein, die nicht erforderlich sind, aber den Code zuverlässiger machen. wie confirm delivery, delivery_mode und durable. Weitere Informationen hierzu finden Sie im RabbitMQ manual .

Nachdem wir die Kanäle eingerichtet haben, fügen wir dem Hauptkanal eine Bindung hinzu, mit der wir Nachrichten vom Verzögerungskanal an unsere Hauptwarteschlange senden können.

channel.queue_bind(exchange='amq.direct',
                   queue='hello')

Als Nächstes müssen wir unseren Verzögerungskanal so konfigurieren, dass Nachrichten nach Ablauf an die Hauptwarteschlange weitergeleitet werden.

delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 5000,
  'x-dead-letter-exchange' : 'amq.direct',
  'x-dead-letter-routing-key' : 'hello'
})
  • x-message-ttl(Nachricht - Zeit zum Leben)

    Normalerweise werden alte Nachrichten nach einer bestimmten Dauer automatisch in der Warteschlange .__ entfernt. Durch Hinzufügen zweier optionaler Argumente können wir dieses Verhalten jedoch ändern. Stattdessen wird dieser Parameter in Millisekunden festgelegt in der Verzögerungsschlange bleiben.

  • x-dead-letter-routing-key

    Diese Variable ermöglicht es uns, die Nachricht in eine andere Warteschlange zu übertragen, nachdem sie abgelaufen ist, anstatt das Standardverhalten des Entfernens von vollständigem.

  • x-dead-letter-exchange

    Diese Variable bestimmt, mit welcher Exchange die Nachricht von hello_delay in die Hello-Warteschlange übertragen wurde.

Veröffentlichen in der Verzögerungswarteschlange

Wenn Sie alle grundlegenden Pika-Parameter eingerichtet haben, senden Sie einfach eine Nachricht in die Verzögerungswarteschlange.

delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))

Nachdem Sie das Skript ausgeführt haben, sollten die folgenden Warteschlangen in Ihrem RabbitMQ-Verwaltungsmodul erstellt werden. enter image description here

Beispiel.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer 
# messages from our delay queue.
channel.queue_bind(exchange='amq.direct',
                   queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_delivery()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 5000, # Delay until the message is transferred in milliseconds.
  'x-dead-letter-exchange' : 'amq.direct', # Exchange used to transfer the message from A to B.
  'x-dead-letter-routing-key' : 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))

print " [x] Sent"
85
eandersson

Sie können das offizielle RabbitMQ-Plugin verwenden: x-delayed-message .

Laden Sie zunächst die ez-Datei in Your_rabbitmq_root_path/plugins herunter und kopieren Sie sie

Zweitens aktivieren Sie das Plugin (Sie müssen den Server nicht neu starten):

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Veröffentlichen Sie Ihre Nachricht abschließend mit "x-delay" -Headern wie:

headers.put("x-delay", 5000);

Hinweis:

Dies gewährleistet nicht die Sicherheit Ihrer Nachricht. Wenn Ihre Nachricht nur während der Ausfallzeit Ihres rabbitmq-Servers abläuft, geht die Nachricht leider verloren. Also seien Sie vorsichtig , wenn Sie dieses Schema verwenden.

Viel Spaß und mehr Infos in rabbitmq-delayed-message-exchange

15
flycee

Zu Ihrer Information, wie Sie dies in Spring 3.2.x tun.

<rabbit:queue name="delayQueue" durable="true" queue-arguments="delayQueueArguments"/>

<rabbit:queue-arguments id="delayQueueArguments">
  <entry key="x-message-ttl">
    <value type="Java.lang.Long">10000</value>
  </entry>
  <entry key="x-dead-letter-exchange" value="finalDestinationTopic"/>
  <entry key="x-dead-letter-routing-key" value="finalDestinationQueue"/>
</rabbit:queue-arguments>


<rabbit:fanout-exchange name="finalDestinationTopic">
  <rabbit:bindings>
    <rabbit:binding queue="finalDestinationQueue"/>
  </rabbit:bindings>
</rabbit:fanout-exchange>
8
Ryan Walls

NodeJS-Implementierung.

Aus dem Code ist alles ziemlich klar.

var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});

// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
      deadLetterExchange: "my_final_delayed_exchange",
      messageTtl: 5000, // 5sec
}, function (err, q) {
      ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
      ch.bindQueue(q.queue, "my_final_delayed_exchange", '');

      ch.consume(q.queue, function (msg) {
          console.log("delayed - [x] %s", msg.content.toString());
      }, {noAck: true});
});
2
walv

Abhängig von Ihrem Szenario und Ihren Bedürfnissen würde ich die folgenden Ansätze empfehlen:

0
henrylilei

Nachrichten in der Kaninchen-Warteschlange können auf zwei Arten verzögert werden - mit QUEUE TTL - using Message TTL Wenn alle Nachrichten in der Warteschlange für eine feste Zeitdauer verzögert werden sollen, verwenden Sie die Warteschlange TTL . Wenn jede Nachricht um unterschiedliche Zeiten verzögert werden soll, verwenden Sie die Message TTL . Ich habe sie mit python3 und erläutert pika module . pika BasicProperties Das Argument 'expiration' in Millisekunden muss so eingestellt werden, dass die Nachricht in der Verzögerungswarteschlange verzögert wird .. _. Veröffentlichen Sie die Nachricht in einem delayed_queue ("keine tatsächliche Warteschlange, in der die Verbraucher auf den Verbrauch warten"). ), sobald die Nachricht in delayed_queue abläuft, wird die Nachricht mit exchange 'amq.direct' in eine tatsächliche Warteschlange geleitet.

def delay_publish(self, messages, queue, headers=None, expiration=0):
    """
    Connect to RabbitMQ and publish messages to the queue
    Args:
        queue (string): queue name
        messages (list or single item): messages to publish to rabbit queue
        expiration(int): TTL in milliseconds for message
    """
    delay_queue = "".join([queue, "_delay"])
    logging.info('Publishing To Queue: {queue}'.format(queue=delay_queue))
    logging.info('Connecting to RabbitMQ: {Host}'.format(
        Host=self.rabbit_Host))
    credentials = pika.PlainCredentials(
       RABBIT_MQ_USER, RABBIT_MQ_PASS)
    parameters = pika.ConnectionParameters(
       rabbit_Host, RABBIT_MQ_PORT,
        RABBIT_MQ_VHOST, credentials, heartbeat_interval=0)
    connection = pika.BlockingConnection(parameters)

    channel = connection.channel()
    channel.queue_declare(queue=queue, durable=True)

    channel.queue_bind(exchange='amq.direct',
                       queue=queue)
    delay_channel = connection.channel()
    delay_channel.queue_declare(queue=delay_queue, durable=True,
                                arguments={
                                    'x-dead-letter-exchange': 'amq.direct',
                                    'x-dead-letter-routing-key': queue
                                })

    properties = pika.BasicProperties(
        delivery_mode=2, headers=headers, expiration=str(expiration))

    if type(messages) not in (list, Tuple):
        messages = [messages]

    try:
        for message in messages:
            try:
                json_data = json.dumps(message)
            except Exception as err:
                logging.error(
                    'Error Jsonify Payload: {err}, {payload}'.format(
                        err=err, payload=repr(message)), exc_info=True
                )
                if (type(message) is dict) and ('data' in message):
                    message['data'] = {}
                    message['error'] = 'Payload Invalid For JSON'
                    json_data = json.dumps(message)
                else:
                    raise

            try:
                delay_channel.basic_publish(
                    exchange='', routing_key=delay_queue,
                    body=json_data, properties=properties)
            except Exception as err:
                logging.error(
                    'Error Publishing Data: {err}, {payload}'.format(
                        err=err, payload=json_data), exc_info=True
                )
                raise

    except Exception:
        raise

    finally:
        logging.info(
            'Done Publishing. Closing Connection to {queue}'.format(
                queue=delay_queue
            )
        )
        connection.close()
0