Protocolo MQTT

mayo 03, 2021

 Protocolo MQTT

Que es MQTT?

MQTT son las siglas MQ Telemetry Transport, aunque en primer lugar fue conocido como Message Queing Telemetry Transport. Es un protocolo de comunicación M2M (machine-to-machine) de tipo message queue.

Está basado en la pila TCP/IP como base para la comunicación. En el caso de MQTT cada conexión se mantiene abierta y se "reutiliza" en cada comunicación. Es una diferencia, por ejemplo, a una petición HTTP 1.0 donde cada transmisión se realiza a través de conexión.

MQTT fue creado por el Dr. Andy Stanford-Clark de IBM y Arlen Nipper de Arcom (ahora Eurotech) en 1999 como un mecanismo para conectar dispositivos empleados en la industria petrolera.

Aunque inicialmente era un formato propietario, en 2010 fue liberado y pasó a ser un estándar en 2014 según la OASIS (Organization for the Advancement of Structured Information Standards)

FUNCIONAMIENTO

El funcionamiento del MQTT es un servicio de mensajería push con patrón publicador/suscriptor (pub-sub). Como vimos en la entrada anterior, en este tipo de infraestructuras los clientes se conectan con un servidor central denominado broker.

Para filtrar los mensajes que son enviados a cada cliente los mensajes se disponen en topics organizados jerárquicamente. Un cliente puede publicar un mensaje en un determinado topic. Otros clientes pueden suscribirse a este topic, y el broker le hará llegar los mensajes suscritos.



Los clientes inician una conexión TCP/IP con el broker, el cual mantiene un registro de los clientes conectados. Esta conexión se mantiene abierta hasta que el cliente la finaliza. Por defecto, MQTT emplea el puerto 1883 y el 8883 cuando funciona sobre TLS.

Para ello el cliente envía un mensaje CONNECT que contiene información necesaria (nombre de usuario, contraseña, client-id…). El broker responde con un mensaje CONNACK, que contiene el resultado de la conexión (aceptada, rechazada, etc).


Para enviar los mensajes el cliente emplea mensajes PUBLISH, que contienen el topic y el payload.

Para suscribirse y desuscribirse se emplean mensajes SUBSCRIBE y UNSUSCRIBE, que el servidor responde con SUBACK y UNSUBACK.

Por otro lado, para asegurar que la conexión está activa los clientes mandan periódicamente un mensaje PINGREQ que es respondido por el servidor con un PINGRESP. Finalmente, el cliente se desconecta enviando un mensaje de DISCONNECT.

ESTRUCTURA DE UN MENSAJE MQTT

Uno de los componentes más importantes del protocolo MQTT es la definición y tipología de los mensajes, ya que son una de las bases de la agilidad en la que radica su fortaleza. Cada mensaje consta de 3 partes:

Cabecera fija. Ocupa 2 a 5 bytes, obligatorio. Consta de un código de control, que identifica el tipo de mensaje enviado, y de la longitud del mensaje. La longitud se codifica en 1 a 4 bytes, de los cuales se emplean los 7 primeros bits, y el último es un bit de continuidad.

Cabecera variable. Opcional, contiene información adicional que es necesaria en ciertos mensajes o situaciones.

Contenido(payload). Es el contenido real del mensaje. Puede tener un máximo de 256 Mb aunque en implementaciones reales el máximo es de 2 a 4 kB.

CALIDAD DEL SERVICIO (QOS)

MQTT dispone de un mecanismo de calidad del servicio o QoS, entendido como la forma de gestionar la robustez del envío de mensajes al cliente ante fallos (por ejemplo, de conectividad).

MQTT tiene tres niveles QoS posibles.

QoS 0 unacknowledged (at most one): El mensaje se envía una única vez. En caso de fallo por lo que puede que alguno no se entregue.

QoS 1 acknowledged (at least one): El mensaje se envía hasta que se garantiza la entrega. En caso de fallo, el suscriptor puede recibir algún mensaje duplicados.

QoS 2 assured (exactly one). Se garantiza que cada mensaje se entrega al suscriptor, y únicamente una vez.

Usar un nivel u otro depende de las características y necesidades de fiabilidad de nuestro sistema. Lógicamente, un nivel de QoS superior requiere un mayor intercambio mayor de mensajes de verificación con el cliente y, por tanto, mayor carga al sistema.

VENTAJAS DEL MQTT

Son varias las ventajas del protocolo MQTT como sistema de comunicación M2M. Por un lado, tenemos todas las ventajas del patrón pub/sub que vimos en la entrada anterior, como son escalabilidad, asincronismo, desacoplamiento entre clientes.

Además, MQTT aporta una serie de características que le han hecho sobre salir sobre otros competidores. La principal, como hemos mencionado, es su sencillez y ligereza. Esto lo hace adecuado para aplicaciones IoT, donde frecuentemente se emplean dispositivos de escasa potencia.

Además, esto menor necesidad de recursos se traduce en un menor consumo de energía, lo cual es interesante en dispositivos que funcionan 24/7 y muy especialmente en dispositivos alimentados por batería.

Otra consecuencia de la ligereza del protocolo MQTT es que requiere un ancho de banda mínimo, lo cual es importante en redes inalámbricas, o conexiones con posibles problemas de calidad.

Por último, MQTT dispone de medidas adicionales importantes, como la seguridad y calidad del servicio (QoS). Por último, es una solución largamente testada y consolidad, que aporta robustez y fiabilidad.

Código Fuente en Java de un Cliente MQTT


package MQTTListener;

import java.util.HashMap;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class MQTTListenerimplements MqttCallback {

    MqttClient myClient;
    MqttConnectOptions connOpt;

    static final String BROKER_URL = "tcp://127.0.0.1:1883";
    static final String M2MIO_DOMAIN = "test";
    static final String M2MIO_STUFF = "things";
    static final String M2MIO_THING = "alohalistener";
    static final String M2MIO_USERNAME = "admin";
    static final String M2MIO_PASSWORD_MD5 = "12345";

    // the following two flags control whether this example is a publisher, a subscriber or both
    static final Boolean subscriber = true;
    static final Boolean publisher = true;

    /**
     *
     * connectionLost This callback is invoked upon losing the MQTT connection.
     *
     */
    @Override
    public void connectionLost(Throwable t) {
        System.out.println("Connection lost!");
        // code to reconnect to the broker would go here if desired
    }

    /**
     *
     * deliveryComplete This callback is invoked when a message published by
     * this client is successfully received by the broker.
     *
     */
    //@Override
    public void deliveryComplete(MqttDeliveryToken token) {
        //System.out.println("Pub complete" + new String(token.getMessage().getPayload()));
    }

    /**
     *
     * messageArrived This callback is invoked when a message is received on a
     * subscribed topic.
     *
     */
    //@Override
    public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception {
        System.out.println("-------------------------------------------------");
        System.out.println("| Topic:" + topic.getName());
        System.out.println("| Message: " + new String(message.getPayload()));
        System.out.println("-------------------------------------------------");
    }

    /**
     *
     * MAIN
     *
     */
    public static void main(String[] args) {
        MQTTListenersmc = new MQTTListener();
        smc.runClient();
    }

    /**
     *
     * runClient The main functionality of this simple example. Create a MQTT
     * client, connect to broker, pub/sub, disconnect.
     *
     */
    public void runClient() {
        // setup MQTT Client
        String clientID = M2MIO_THING;
        connOpt = new MqttConnectOptions();

        connOpt.setCleanSession(true);
        connOpt.setKeepAliveInterval(30);
        connOpt.setUserName(M2MIO_USERNAME);
        connOpt.setPassword(M2MIO_PASSWORD_MD5.toCharArray());

        // Connect to Broker
        try {
            myClient = new MqttClient(BROKER_URL, clientID);
            myClient.setCallback(this);
            myClient.connect(connOpt);
        } catch (MqttException e) {
            e.printStackTrace();
            System.exit(-1);
        }

        System.out.println("Connected to " + BROKER_URL);

        // setup topic
        // topics on m2m.io are in the form <domain>/<stuff>/<thing>
        //String myTopic = M2MIO_DOMAIN + "/" + M2MIO_STUFF + "/" + M2MIO_THING;
        String myTopic = M2MIO_DOMAIN;
        MqttTopic topic = myClient.getTopic(myTopic);

        // subscribe to topic if subscriber
        if (subscriber) {
            try {
                int subQoS = 0;
                myClient.subscribe(myTopic, subQoS);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

//         publish messages if publisher
        if (publisher) {
            for (int i = 1; i <= 10; i++) {
                String pubMsg = "{\"pubmsg\":" + i + "}";
                int pubQoS = 0;
                MqttMessage message = new MqttMessage(pubMsg.getBytes());
                message.setQos(pubQoS);
                message.setRetained(false);

                // Publish the message
                System.out.println("Publishing to topic \"" + topic + "\" qos " + pubQoS);
                MqttDeliveryToken token = null;
                try {
                    // publish message to broker
                    token = topic.publish(message);
                    // Wait until the message has been delivered to the broker
                    token.waitForCompletion();
                    Thread.sleep(100);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
//         disconnect
        try {
            // wait to ensure subscribed messages are delivered
            if (subscriber) {
                Thread.sleep(5000);
            }
            myClient.disconnect();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void messageArrived(String string, MqttMessage mm) throws Exception {
//        System.out.println("-------------------------------------------------");
//        System.out.println("| Topic:" + string);
//        System.out.println("| Message: " + new String(mm.getPayload()));
//        System.out.println("-------------------------------------------------");
        //Miramos que tipo de mensaje esta llegando 
        String sms = new String(mm.getPayload());
        if (sms.contains("mqtt param erro!")) {
//            System.out.println("MQTTListener.MQTTListener.messageArrived() Estoy en el mesnajes mqtt param erro!");
        } else {
            sms = sms.replace("{", "").replace("}", "").replace("\"", "").replace("\n", "").replace("\t", "");
            System.out.println(sms);
            String[] datos = sms.split(",");
            HashMap<String, String> hashMapSMS = new HashMap<>();
            for (String dato : datos) {
                if (hashMapSMS.containsKey("datas")) {
                    if (dato.contains("datas")) {
                        dato = dato.replace("datas:", "");
                        String smsdeparado[] = dato.split(":");
                        hashMapSMS.put(smsdeparado[0], smsdeparado[1]);
                    } else {
                        String smsdeparado[] = dato.split(":");
                        hashMapSMS.put(smsdeparado[0], smsdeparado[1]);
                    }
                }

            }
            PorcesaMSM(hashMapSMS);
        }

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken imdt) {
        //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
    }

    private void PorcesaMSM(HashMap<String, String> hashMapSMS) {
        if (hashMapSMS.containsKey("code")) {
            if (!hashMapSMS.get("code").equals("-1")) {
                switch (hashMapSMS.get("msg")) {
                    case "mqtt bind ctrl success":
                        //Respuesta de vinculacion del dispositvo 
                        ModeloDispositivoSegurex modeloDispositivoSegurex = new ModeloDispositivoSegurex();
                        ControladorDispositivoSegurex controladorDispositivoSegurex = new ControladorDispositivoSegurex();
                        modeloDispositivoSegurex = controladorDispositivoSegurex.SearchSerie(hashMapSMS.get("device_id"));
                        modeloDispositivoSegurex.setToken(hashMapSMS.get("device_token"));
                        modeloDispositivoSegurex.setSession(hashMapSMS.get("session_id"));
                        controladorDispositivoSegurex.Update(modeloDispositivoSegurex);
                        System.out.println("MQTTListener.MQTTListener.PorcesaMSM() Emparejado con exito ");
                        break;
                }
            }
        }
    }
}

You Might Also Like

0 Comments

Like us on Facebook