使用 RabbitMQ 发送对象

Send an object using RabbitMQ(使用 RabbitMQ 发送对象)
本文介绍了使用 RabbitMQ 发送对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我知道这个问题与问题重复使用rabbitmq发送消息不是字符串而是结构

I Understand that this question duplicates question at using rabbitmq to send a message not string but struct

如果使用第一种方法来做到这一点

if to do this using the first way

第一种方式

我有以下痕迹:

java.io.EOFException
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2304)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:798)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:298)
at com.mdnaRabbit.worker.data.Data.fromBytes(Data.java:78)
at com.mdnaRabbit.worker.App.main(App.java:41)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

我已经检查并确认该消息在发送者类中绝对可以很好地转换为字节,但消费者无法接收它.

I've checked and shure that message is transformd to bytes absolutely well in sender class, but the consumer can't receive it.

这是我的制作人课程:

package com.mdnaRabbit.newt;

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.apache.commons.lang.SerializationUtils;
import com.mdnaRabbit.worker.data.Data;

public class App {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main( String[] argv) throws IOException{

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        int i = 0;

        do {
            Data message = getMessage();
            byte [] byteMessage = message.getBytes();
            //System.out.println(byteMessage);
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, byteMessage);
            System.out.println(" [" + (i+1) + "] message Sent" + Data.fromBytes(byteMessage).getBody());
            i++;
        } while (i<15);

        channel.close();
        connection.close();
    }

    private static Data getMessage(){
        Data data = new Data();
        data.setHeader("header");
        data.setDomainId("abc.com");
        data.setReceiver("me");
        data.setSender("he");
        data.setBody("body");
        return data;
    }

    private static String joinStrings(String[] strings, String delimiter){
        int length = strings.length;
        if (length == 0) return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++){
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

这是我的消费类:

    package com.mdnaRabbit.worker;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.mdnaRabbit.worker.data.Data;
import org.apache.commons.lang.SerializationUtils;

public class App {

    private static final String TASK_QUEUE_NAME = "task_queue";
    private static int i = 0;
    public static void main( String[] argv )
            throws IOException,
            InterruptedException{

        ExecutorService threader = Executors.newFixedThreadPool(20);
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection(threader);
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(20);

        final QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

        try {

            while (true) {

                        try {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                            Data message = Data.fromBytes(delivery.getBody());
                            //Data message = (Data) SerializationUtils.deserialize(delivery.getBody());

                            System.out.println(" [" + (i++) +"] Received" + message.getBody());

                            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                        }catch (Exception e){
                        }
                    }
        } catch (Exception e){
            e.printStackTrace();
        }
        channel.close();
        connection.close();
    }

}

这是我的数据类:

package com.mdnaRabbit.worker.data;

import java.io.*;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Data implements Serializable{

    public String header;
    public String body;
    public String domainId;
    public String sender;
    public String receiver;

    public void setHeader(String head){
        this.header = head;
    }

    public String getHeader(){
        return header;
    }

    public void setBody(String body){
        this.body = body;
    }

    public String getBody(){
        return body;
    }

    public void setDomainId(String domainId){
        this.domainId = domainId;
    }

    public String getDomainId(){
        return domainId;
    }

    public void setSender(String sender){
        this.sender = sender;
    }

    public String getSender(){
        return sender;
    }

    public String getReceiver(){
        return receiver;
    }

    public void setReceiver(String receiver){
        this.receiver = receiver;
    }


    public byte[] getBytes() {
        byte[]bytes;
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try{
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(this);
            oos.flush();
            oos.reset();
            bytes = baos.toByteArray();
            oos.close();
            baos.close();
        } catch(IOException e){
            bytes = new byte[] {};
            Logger.getLogger("bsdlog").log(Level.ALL, "unable to write to output stream" + e);
        }
        return bytes;
    }

    public static Data fromBytes(byte[] body) {
        Data obj = null;
        try {
            ByteArrayInputStream bis = new ByteArrayInputStream(body);
            ObjectInputStream ois = new ObjectInputStream(bis);
            obj = (Data) ois.readObject();
            ois.close();
            bis.close();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        catch (ClassNotFoundException ex) {
            ex.printStackTrace();
        }
        return obj;
    }
}

我似乎总是消费者收到消息,因为当我不尝试将其转换为对象时,只是写System.out.println(delivery.getBody)它显示字节

I always seems that consumer receives messages, because when I'm not trying to transform it into the object and just write System.out.println(delivery.getBody) it shows bytes

推荐答案

你收到的字节数组好像是空的.这是因为:

It looks like the byte array you receive is empty. This happens because of this:

    } catch(IOException e){
        bytes = new byte[] {};
    }

产生异常时,代码不会警告您某些内容已损坏,而是发送一个空数组.您至少应该记录错误.

When an exception is produced, the code doesn't warn you that something is broken and just sends an empty array instead. You should at least log the error.

产生异常可能是因为您试图序列化一个不可序列化的类.要使类可序列化,您必须在其声明中添加implements Serializable":

The exception is being produced probably because you are trying to serialize a class that is not serializable. To make a class serializable you have to add "implements Serializable" to its declaration:

public class Data implements Serializable {

这篇关于使用 RabbitMQ 发送对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

相关文档推荐

How to send data to COM PORT using JAVA?(如何使用 JAVA 向 COM PORT 发送数据?)
How to make a report page direction to change to quot;rtlquot;?(如何使报表页面方向更改为“rtl?)
Use cyrillic .properties file in eclipse project(在 Eclipse 项目中使用西里尔文 .properties 文件)
Is there any way to detect an RTL language in Java?(有没有办法在 Java 中检测 RTL 语言?)
How to load resource bundle messages from DB in Java?(如何在 Java 中从 DB 加载资源包消息?)
How do I change the default locale settings in Java to make them consistent?(如何更改 Java 中的默认语言环境设置以使其保持一致?)