Freigeben über


Schnellstart: Senden von Ereignissen an oder Empfangen von Ereignissen von Event Hubs mithilfe von Go

Azure Event Hubs ist eine Big Data-Streamingplattform und ein Ereigniserfassungsdienst, der pro Sekunde Millionen von Ereignissen empfangen und verarbeiten kann. Event Hubs kann Ereignisse, Daten oder Telemetriedaten, die von verteilter Software und verteilten Geräten erzeugt wurden, verarbeiten und speichern. An einen Event Hub gesendete Daten können transformiert und mit einem beliebigen Echtzeitanalyse-Anbieter oder Batchverarbeitungs-/Speicheradapter gespeichert werden. Eine ausführliche Übersicht über Event Hubs finden Sie unter Was ist Azure Event Hubs? und Event Hubs-Features im Überblick.

In dieser Schnellstartanleitung wird beschrieben, wie Sie Go-Anwendungen schreiben, um Ereignisse an einen Event Hub zu senden oder ereignisse von einem Event Hub zu empfangen.

Hinweis

Diese Schnellstartanleitung basiert auf Beispielen auf GitHub unter https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Der Abschnitt 'Sendereignisse' basiert auf dem Beispiel 'example_producing_events_test.go', und der Abschnitt 'Empfang' basiert auf dem Beispiel 'example_processor_test.go'. Der Code ist für die Schnellstartanleitung vereinfacht, und alle detaillierten Kommentare werden entfernt. Weitere Details und Erläuterungen finden Sie in den Beispielen.

Voraussetzungen

Zum Durchführen dieser Schnellstartanleitung benötigen Sie Folgendes:

  • Eine lokale Go-Installation. Befolgen Sie bei Bedarf diese Anweisungen .
  • Ein aktives Azure-Konto. Wenn Sie noch kein Azure-Abonnement haben, erstellen Sie ein kostenloses Konto, bevor Sie beginnen.
  • Erstellen Sie einen Event Hubs-Namespace und einen Event Hub. Verwenden Sie das Azure-Portal , um einen Namespace vom Typ Event Hubs zu erstellen und die Verwaltungsanmeldeinformationen abzurufen, die Ihre Anwendung für die Kommunikation mit dem Event Hub benötigt. Erstellen Sie anhand der Anleitung in diesem Artikel einen Namespace und einen Event Hub.

Senden von Ereignissen

In diesem Abschnitt wird gezeigt, wie Sie eine Go-Anwendung erstellen, um Ereignisse an einen Event Hub zu senden.

Go-Paket installieren

Rufen Sie das Go-Paket für Event Hubs ab, wie im folgenden Beispiel gezeigt.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Code zum Senden von Ereignissen an einen Event Hub

Hier sehen Sie den Code zum Senden von Ereignissen an einen Event Hub. Die wichtigsten Schritte im Code sind:

  1. Erstellen Sie einen Event Hubs-Produzentenclient mithilfe einer Verbindungszeichenfolge mit dem Event Hubs-Namespace und dem Event Hub-Namen.
  2. Erstellen Sie ein Batchobjekt, und fügen Sie dem Batch Beispielereignisse hinzu.
  3. Senden des Ereignisbatchs an die Ereignisse.

Von Bedeutung

Ersetzen Sie NAMESPACE CONNECTION STRING durch die Verbindungszeichenfolge zu Ihrem Event Hubs-Namespace und EVENT HUB NAME durch den Ereignishubnamen im Beispielcode.

package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// create an Event Hubs producer client using a connection string to the namespace and the event hub
	producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	// create sample events
	events := createEventsForSample()

	// create a batch object and add sample events to the batch
	newBatchOptions := &azeventhubs.EventDataBatchOptions{}

	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if err != nil {
			panic(err)
		}
	}

	// send the batch of events to the event hub
	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}

Führen Sie die Anwendung noch nicht aus. Sie müssen zuerst die Empfänger-App und dann die Absender-App ausführen.

Empfangen von Ereignissen

Erstellen eines Speicherkontos und eines Containers

Angaben zum Status, z.B. Leases auf Partitionen und Prüfpunkte in den Ereignissen, werden über einen Azure Storage-Container zwischen den Empfängern freigegeben. Sie können ein Speicherkonto und einen Container mit dem Go SDK erstellen, aber Sie können auch eins erstellen, indem Sie die Anweisungen unter "Informationen zu Azure-Speicherkonten" befolgen.

Befolgen Sie diese Empfehlungen, wenn Sie Azure Blob Storage als Prüfpunktspeicher verwenden:

  • Verwenden Sie einen separaten Container für jede Consumergruppe. Sie können dasselbe Speicherkonto verwenden, aber verwenden Sie für jede Gruppe einen eigenen Container.
  • Verwenden Sie das Speicherkonto nicht für andere Elemente.
  • Verwenden Sie den Container nicht für andere Elemente.
  • Erstellen Sie das Speicherkonto in derselben Region wie die bereitgestellte Anwendung. Wenn die Anwendung lokal ist, versuchen Sie, die nächstgelegene Region auszuwählen.

Stellen Sie auf der Seite Speicherkonto im Azure-Portal im Abschnitt Blobdienst sicher, dass die folgenden Einstellungen deaktiviert sind.

  • Hierarchischer Namespace
  • Vorläufiges Löschen von Blobs
  • Versionsverwaltung

Go-Pakete

Um die Nachrichten zu empfangen, rufen Sie die Go-Pakete für Event Hubs ab, wie im folgenden Beispiel gezeigt.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob

Code zum Empfangen von Ereignissen von einem Ereignishub

Hier sehen Sie den Code zum Empfangen von Ereignissen von einem Event Hub. Die wichtigsten Schritte im Code sind:

  1. Überprüfen Sie ein Prüfpunktspeicherobjekt, das den Azure Blob Storage darstellt, der vom Event Hub für die Prüfpunkterstellung verwendet wird.
  2. Erstellen Sie einen Consumerclient für Event Hubs mithilfe einer Verbindungszeichenfolge mit dem Event Hubs-Namespace und dem Event Hub-Namen.
  3. Erstellen Sie einen Ereignisprozessor mithilfe des Clientobjekts und des Prüfpunktspeicherobjekts. Der Prozessor empfängt und verarbeitet Ereignisse.
  4. Erstellen Sie für jede Partition im Event Hub einen Partitionsclient mit processEvents als Funktion zum Verarbeiten von Ereignissen.
  5. Führen Sie alle Partitionsclients aus, um Ereignisse zu empfangen und zu verarbeiten.

Von Bedeutung

Ersetzen Sie die folgenden Platzhalterwerte durch tatsächliche Werte:

  • AZURE STORAGE CONNECTION STRING durch die Verbindungszeichenfolge für Ihr Azure-Speicherkonto
  • BLOB CONTAINER NAME mit dem Namen des BLOB-Containers, den Sie im Speicherkonto erstellt haben
  • NAMESPACE CONNECTION STRING durch die Verbindungszeichenfolge für Ihren Event Hubs-Namespace
  • EVENT HUB NAME mit dem Ereignishubnamen im Beispielcode.
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

	// create a container client using a connection string and container name
	checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

	if err != nil {
		panic(err)
	}

	// create a checkpoint store that will be used by the event hub
	checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

	if err != nil {
		panic(err)
	}

	// create a consumer client using a connection string to the namespace and the event hub
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// create a processor to receive and process events
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	//  for each partition in the event hub, create a partition client with processEvents as the function to process events
	dispatchPartitionClients := func() {
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				break
			}

			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	// run all partition clients
	go dispatchPartitionClients()

	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	defer closePartitionResources(partitionClient)
	for {
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", string(event.Body))
		}

		if len(events) != 0 {
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
				return err
			}
		}
	}
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	defer partitionClient.Close(context.TODO())
}

Ausführen von Empfänger- und Absender-Apps

  1. Führen Sie zuerst die Empfänger-App aus.

  2. Führen Sie die Absender-App aus.

  3. Warten Sie eine Minute, um die folgende Ausgabe im Empfangsfenster zu sehen.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Nächste Schritte

Beispiele auf GitHub unter https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.