提问者:小点点

如何在现有nodejs服务器上使用sqs消息


我想在SQS收到新消息时接收并触发电子邮件,现在我已经有nodejs服务器在运行,我如何使它工作? 我真的不想触发这个函数。 但我希望当SQS中有新消息时,这个消费者将使用并执行发送电子邮件的业务逻辑。

但在我的活动中没有任何触发。 注意:我不是在调用这个函数,我希望它在SQS有新消息可用时自动触发。

const AWS = require('aws-sdk');
const mongoose = require('mongoose');

//
// Configure the aws details
//
AWS.config.update({
    region: process.env['AWS_REGION'],
    accessKeyId: process.env['AWS_ACCESS_KEY_ID'],
    secretAccessKey: process.env['AWS_SECRET_ACCESS_KEY']
  });


const sqs = new AWS.SQS({apiVersion: '2012-11-05'});

var queueURL = "https://sqs.us-east-1.amazonaws.com/xxxxx/demo-lambda-to-email-sqs"



var params = {
    AttributeNames: [
        "SentTimestamp"
     ],
    MaxNumberOfMessages: 1,
    MessageAttributeNames: [
       "All"
    ],
    QueueUrl: queueURL,
    VisibilityTimeout: 20,
    WaitTimeSeconds: 0
   };

   sqs.receiveMessage(params, function(err, data) {
    if (err) {
      console.log("Receive Error", err);
    } else if (data.Messages) {
      console.log('--------------------------- MESSAGE RECEIVED -------------')
      var deleteParams = {
        QueueUrl: queueURL,
        ReceiptHandle: data.Messages[0].ReceiptHandle
      };
      sqs.deleteMessage(deleteParams, function(err, data) {
        if (err) {
          console.log("Delete Error", err);
        } else {
          console.log("Message Deleted", data);
        }
      });
    }
  });

共3个答案

匿名用户

SQS是一种排队服务,因此需要通过基于拉的机制而不是基于推的机制来使用它。

只有当您具有轮询SQS队列的功能,然后在消息进入时触发该函数时,才能调用该函数。

如果您不想维护消费者脚本,那么应该考虑将此脚本迁移到Lambda函数中。 当使用此选项时,Lambda服务将充当队列的使用者,并仅在添加消息时触发Lambda函数。

有关将AWS Lambda与SQS队列一起使用的更多信息,请参见文档。

匿名用户

在不使用长轮询的情况下,您只能调用它一次。 因此它启动并得到一个空响应。 所以您需要一个基于pull的机制,一个基本的实现可以在SetInterval内运行receiveMessage。 类似于:

setInterval(function() {
    sqs.receiveMessage(params, function(err, data){
      // your logic here!
    });
}, 10000);
// Run every 10s

大于1s的WaitTimeSeconds启用长轮询,并通过消除空响应的数量帮助降低使用Amazon SQS的成本。

匿名用户

下面的代码每30秒调用一次SQS,请求消息。 每个呼叫最多等待20秒来接收消息。

const AWS = require('aws-sdk')
AWS.config.update({
    region: 'us-east-1',
    accessKeyId: '...',
    secretAccessKey: '...'
})
const sqs = new AWS.SQS()

receiveMessage = () => sqs.receiveMessage({
    QueueUrl: 'https://sqs.us-east-1.amazonaws.com/.../...',
    WaitTimeSeconds: 20
}, (error, data) => {
    if (error) console.error("ERROR:", error)
    if (data.Messages) data.Messages.forEach(m => console.info(m.Body))
})

setInterval(receiveMessage, 30000)