.NET Microservices mit RabbitMQ

Usergroup Berlin-Brandenburg

09.08.2016

Frank Pommerening

  • Senior - Softwareentwickler
  • Consultant
  • Softwarearchitekt


frank@pommerening-online.de
Follow @FPommerening @FPommerening
AXP Consulting GmbH & Co. KG in Leipzig
Gründung: Mai 2012
Anzahl Mitarbeiter: 8 feste
Branchenfokus: Energiebranche


  • Consulting (fachlich & IT)
    • Requirements Engineering / Projektmanagement
    • IT-Fachprozess-Analyse / Dokumentation
  • Software-Entwicklung
    • Microservices, SOA, REST, OOA und OOD
    • Microsoft Technologien z.B. .NET (C#), WPF, WCF
    • Datenbanken (MS SQL Server / Oracle / MongoDB)
Übersicht für Bespiel Prozessketten

RabbitMQ - ein Überblick

Logo of Rabbit MQ

Was ist RabbitMQ?

  • Message Broker / Queue Manager
    • System um Queues zu verwalten
    • Nachrichten von Systemen empfangen
    • Nachrichten zwischenspeichern
    • Nachrichten nach Regeln an Systeme zustellen
  • Erstellt in Erlang / OTP
  • Erlaubt Entkopplung von unterschiedlichen und unabhängigen Anwendungen

Nachrichtentransport

Übersicht Nachrichtentransport

12 Begriffe sollt ihr sein

confused

AMQP (Advanced Message Queuing Protocol)
Protokoll zur Kommunikation über Systemgrenzen


Produzent (Producer)
Anwendung, die eine Nachricht erstellt


Konsument (Consumer)
Anwendung, die eine Nachricht empfängt

Nachricht (Message)
Information, die zwischen Produzent und Konsument ausgetauscht wird


Warteschlange (Queue)
Speichert die Nachrichten zwischen


Verbindung (Connection)
TCP Netzwerkverbindung zwischen Anwendung und RabbitMQ broker

Channel
Virtuelle Verbindung innerhalb der TCP-Verbindung


Binding
Verknüpfung zwischen Queue und Exchange


Routing key
"Adresse", nach welcher der Exchange entscheidet, in welche Queue er die Nachricht leitet.

Benutzer (User)

  • Zugangsdaten (Benutzername / Password) für RabbitMQ
  • Zuordnung von Rechten (Lesen, Schreiben, Konfigurieren etc.)
  • Definition global oder für einen spezifischen virtuellen Host


Vhost (virtual host)
Erlaubt, Anwendung auf einer RabbitMQ Instanz zu isolieren

Exchange

  • Fanout Exchange
    • Eingehende Nachrichten werden an alle verbunden Queues gesendet
    • Keine Beachtung der Binding-Konfiguration
  • Direct Exchange
    • Producer erstellt Routing Key
    • Binding Key der Queue muss exakt passen
  • Topic
    • Ähnlich dem Direct Exchange
    • Einsatz von Platzhaltern, z.B. Wort (*) / mehrere Worte (#)
  • Header Exchange
    • Routing anhand von Headerinformationen der Nachricht

RabbitMQ - Installation

Installation Windows

Erlang OTP Download (Voraussetzung)
RabbitMQ Download
Screenshot install RabbitMQ step 2 Screenshot install erlang step 1 Screenshot install erlang step 2 Screenshot install erlang step 3 Screenshot install erlang step 4 Screenshot install erlang step 5 Screenshot install RabbitMQ step 1 Screenshot install RabbitMQ step 2 Screenshot install RabbitMQ step 2 Screenshot install RabbitMQ step 2 Screenshot install RabbitMQ step 2 Screenshot install RabbitMQ step 2

Installation Linux


                            
                                apt-get install rabbitmq-server 
                            
                        

Management Plugin

Infos
Screenshot Management Oberfläche

Docker

Standardimage auf Dockerhub Screenshot Dockerhub

Starten RabbitMQ Instanz

                            
                                $ docker run -d --hostname my-rabbit --name firstRabbit rabbitmq:3
                            
                        

Starten mit Management Plugin
                            
                                $ docker run -d --hostname myRabbit --name myRabbit 
-p 5672:5672 -p 15672:15672 rabbitmq:3-management
                            
                        

EasyNetQ

Logo EasyNetQ
  • Einfach
  • Open Source
  • Modular



Nuget: EasyNetQ
Github: https://github.com/EasyNetQ/EasyNetQ

Warum nicht direkt

Subscriber

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare(exchange: "logs", type: "fanout");
    var queueName = channel.QueueDeclare().QueueName;
    channel.QueueBind(queue: queueName, exchange: "logs", 
        routingKey: "");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>{
        var message = Encoding.UTF8.GetString(ea.Body);
        DoSomething(MyMessage.Deserialize(message));    };
    channel.BasicConsume(queue: queueName, noAck: true,
     consumer: consumer);
}
Publisher

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare(exchange: "logs", type: "fanout");
    var message = new MyMessage {Content = "Hallo Welt"};
    var body = Encoding.UTF8.GetBytes(message.Serialize());
    channel.BasicPublish(exchange: "logs", routingKey: "",
            basicProperties: null, body: body);
}
Serialisierung, Fehlerbehandlung ... fehlt alles noch

Vorteile

  • Messaging Pattern, z.B. Publish/Subscribe
  • Routing Strategien
  • Serialisierung / Deserialisierung als JSON (lesbar)
  • Handling von Thread für Konsumenten
  • Subscriber Reconnect
  • QoS / publisher confirm
  • Fehlerbehandlung

API Design

API EasyNETQ

EasyNetQ Einstieg

Verbindungsaufbau


using System;
using EasyNetQ;

namepace FP.MsRmq.Connecting
{
    public class Programm
    {
        public static void Main(string[] args)
        {
            var myBus = RabbitHutch.CreateBus("host=myRabbitMQ");
        }
    }
}

                    

Verbindungsoptionen

Option Standardwert Bemerkung
host 5672 DNS-Name/IP[:Port]
username guest
passwort guest
prefetchcount 50 Anzahl der Nachrichten, die gleichzeitig abgerufen werden
timeout 10 timeout in Sekunden
publisherConfirms false Erzwingt eine Annahmebestätigung

Publish / Subscribe

Sendet eine Nachricht an beliebig viele Empfänger

Wenn kein Empfänger definiert wurde, geht Nachricht verloren (keine Speicherung / Verarbeitung)

Zieltypen der Bestellung müssen übereinstimmen

Polymorphy ist möglich
Subscribe

 myBus.Subscribe<MyClass>("MySub", 
        msg => DoSomething(msg));
 myBus.SubscribeAsync<MyClass>("MySubAsync",
        msg => DoSomethingAsync(msg));
     
                    
Publish

var msg = new MyClass{};                            
myBus.Publish(msg);
myBus.PublishAsync(msg);
     
                    

Topic Based Routing

Sonderform des Publish / Subscribe

Neben dem Zieltyp muss auch das Thema (Topic) passen

Im Thema sind Platzhalter möglich
Subscribe

 myBus.Subscribe<MyClass>("MyTopicSub", 
        msg => DoSomething(msg), x => x.WithTopic("MyTopic"));
     
                    
Publish

var msg = new MyClass{};                            
myBus.Publish(msg, "MyTopic");
     
                    

Request / Response (RPC)

Sendet eine Nachricht und erwartet eine Antwort

Matching über Typen der Anfrage und der Antwort

Exception, wenn Antwort nicht innerhalb Timeout
Response

                          
myBus.Respond<MyClass1,MyClass2>(DoSometingWithResult);
myBus.RespondAsync<MyClass1,MyClass2>(DoSometingAsyncWithResult);
                    
Request

var req = new MyClass1{};                            
var result = myBus.Request<MyClass1,MyClass2>(req);
var result = await myBus.RequestAsync<MyClass1,MyClass2>(req);
                    

Send / Receive (Command Pipelines)

Command Pipelines

Queue enthält Nachrichten verschiedener Typen

Bestellung wird auf den Nachrichtentyp eingegrenzt

Nicht zustellbare Nachrichten werden in Error-Queue übertragen
Receive

myBus.Receive("MyMessageQueue", x => x
    .Add<MyClassA>(DoSomethingWithA)
    .Add<MyClassB>(DoSomethingWithB);
                    
Request

myBus.Send("MyMessageQueue", new MyClassA {});
myBus.Send("MyMessageQueue", new MyClassB {});

                    

Fehlerbehandlung

Queue: EasyNetQ_Default_Error_Queue
Tool: EasyNetQ.Hosepipe utility (Re-Publish)

Weblogger

Übersicht für Bespiel Weblogger

Prozessketten mit Webbroker

Übersicht für Bespiel Prozessketten

Webbroker / Authorization

  • NancyFx empfängt HTTP-Post auf URL http://*:80/service/
  • Zugangsdaten werden per Authorization-Service geprüft
    • HTTP 200 (OK)
    • HTTP 401 (Unauthorized)
    • HTTP 500 (JSON gegen Schema nicht valide)
  • Weitergabe JSON an Message Router Service

Routing

  • Message Router
    • Umwandlung JSON in DTO
    • Weiterleitung an Section Router Services (Customer / Invoic)
  • Customer Router
    • Prüfen des Nachrichtentyps
    • Weiterleitung der Registrierungen an Registration Service
  • Invoic Router
    • Prüfen des Nachrichtentyps
    • Weiterleitung der Rechnungen an Bill Service
    • Weiterleitung der Stornierungen an Reversal Service

Customer Services

Registration Service
  • Prüfen, ob Kunde bereits vorhanden
    • Ja
      • Ablehnung an Absender per Dispatcher senden
    • Nein
      • Kunden in der Datenbank erstellen
      • Bestätigung an Absender per Dispatcher senden

INVOIC Services

Prüfschritt Bill Service Reversal Service
Kunde ist bekannt Ja Nein Ja Nein
Quellrechnung vorhanden     Ja Nein  
Daten speichern Ja Nein Ja Nein Nein
Antwort per Dispatcher senden Bestätigung Ablehnung (Kunde unbekannt) Bestätigung Ablehnung (Quellrechnung unbekannt) Ablehnung (Kunde unbekannt)

Message Dispatcher

  • DTO in JSON umgewandelt
  • Nachricht per HTTP Post an Empfänger versenden

Microservices mit RabbitMQ

Die ultimative Lösung aller Probleme!?

Vorteile

  • Einfachere Verknüpfung / Routing
  • Kompensierung von Lastspitzen
  • Kompensierung von Nichtverfügbarkeit
  • Skalierung ohne Architekturänderung

Nachteile

  • Erhöhte Komplexität (bei kleinem System)
    • Implementierung
    • Betrieb
  • RabbitMQ als SPOF
  • Technologiemix schwieriger

EasyNetQ - Erweiterung

Logging mit Log4Net

Prozessketten mit Webbroker für Fortgeschrittene

Problem - große Nachrichten

Trennung von Befehlen und Daten
Übersicht für Beispiel Prozessketten mit MongoDB
Bildquellen:

https://commons.wikimedia.org/wiki/File%3AVisual_Studio_2013_Logo.svg
https://www.docker.com/sites/default/files/legal/docker_media_kit_jan_2016.zip
http://easynetq.com/design/logo_design_150.png
https://assets.wp.nginx.com/wp-content/themes/nginx-theme/assets/img//logo.png
https://www.rabbitmq.com/img/rabbitmq_logo_strap.png
https://avatars1.githubusercontent.com/u/706659?v=3&s=400
https://www.flickr.com/photos/kristiand/3223044657/
http://www.postgresql.org/media/img/about/press/elephant.png