Freigeben über


Schnellstart: Erfassen von Event Hubs-Daten in Azure Storage und Lesen mithilfe von Python (azure-eventhub)

Sie können einen Event Hub so konfigurieren, dass die an einen Event Hub gesendeten Daten in einem Azure-Speicherkonto oder azure Data Lake Storage Gen 1 oder Gen 2 erfasst werden. In diesem Artikel erfahren Sie, wie Sie Python-Code schreiben, um Ereignisse an einen Event Hub zu senden und die erfassten Daten aus Azure Blob Storage zu lesen. Weitere Informationen zu diesem Feature finden Sie in der Übersicht über das Feature "Event Hubs Capture".

In dieser Schnellstartanleitung wird das Azure Python SDK verwendet, um das Aufnahmefeature zu veranschaulichen. Die sender.py-App sendet simulierte Umgebungstelemetrie an Event Hubs im JSON-Format. Der Event Hub ist für die Verwendung des Capture-Features konfiguriert, um diese Daten in Blob Storage in Batches zu speichern. Die capturereader.py-App liest diese Blobs und erstellt eine Anhangedatei für jedes Gerät. Anschließend schreibt die App die Daten in CSV-Dateien.

In dieser Schnellstartanleitung führen Sie die folgenden Schritte aus:

  • Erstellen Sie ein Azure Blob Storage-Konto und einen Container im Azure-Portal.
  • Erstellen Sie einen Event Hubs-Namespace mithilfe des Azure-Portals.
  • Erstellen Sie einen Event Hub mit aktivierter Aufnahmefunktion, und verbinden Sie ihn mit Ihrem Speicherkonto.
  • Senden Sie Daten mithilfe eines Python-Skripts an Ihren Event Hub.
  • Lesen und Verarbeiten von Dateien aus Event Hubs Capture mithilfe eines anderen Python-Skripts.

Voraussetzungen

Aktivieren des Aufnahmefeatures für den Event Hub

Aktivieren Sie das Aufnahmefeature für den Event Hub. Befolgen Sie dazu die Anweisungen unter Enable Event Hubs Capture mithilfe des Azure-Portals. Wählen Sie das Speicherkonto und den BLOB-Container aus, den Sie im vorherigen Schritt erstellt haben. Wählen Sie Avro für das Serialisierungsformat des Ausgabeereignisses aus.

Erstellen eines Python-Skripts zum Senden von Ereignissen an Ihren Event Hub

In diesem Abschnitt erstellen Sie ein Python-Skript, das 200 Ereignisse (10 Geräte * 20 Ereignisse) an einen Event Hub sendet. Bei diesen Ereignissen handelt es sich um eine Beispielumweltmessung, die im JSON-Format gesendet wird.

  1. Öffnen Sie Ihren bevorzugten Python-Editor, z. B. Visual Studio Code.

  2. Erstellen Sie ein Skript namens sender.py.

  3. Fügen Sie den folgenden Code in sender.py ein.

    import time
    import os
    import uuid
    import datetime
    import random
    import json
    
    from azure.eventhub import EventHubProducerClient, EventData
    
    # This script simulates the production of events for 10 devices.
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    
    for y in range(0,20):    # For each device, produce 20 events. 
        event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 
        for dev in devices:
            # Create a dummy reading.
        reading = {
                'id': dev, 
                'timestamp': str(datetime.datetime.utcnow()), 
                'uv': random.random(), 
                'temperature': random.randint(70, 100), 
                'humidity': random.randint(70, 100)
            }
            s = json.dumps(reading) # Convert the reading into a JSON string.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        producer.send_batch(event_data_batch) # Send the batch of events to the event hub.
    
    # Close the producer.    
    producer.close()
    
  4. Ersetzen Sie die folgenden Werte in den Skripts:

    • Ersetzen Sie EVENT HUBS NAMESPACE CONNECTION STRING durch die Verbindungszeichenfolge für den Event Hubs-Namespace.
    • Ersetzen Sie EVENT HUB NAME mit dem Namen Ihres Event Hubs.
  5. Führen Sie das Skript aus, um Ereignisse an den Event Hub zu senden.

  6. Im Azure-Portal können Sie überprüfen, ob der Event Hub die Nachrichten empfangen hat. Wechseln zur Ansicht "Nachrichten " im Abschnitt "Metriken ". Laden Sie die Seite neu, um das Diagramm zu aktualisieren. Es kann einige Sekunden dauern, bis die Seite anzeigt, dass die Nachrichten empfangen wurden.

    Überprüfen, ob der Event Hub die Nachrichten empfangen hat

Erstellen eines Python-Skripts zum Lesen Ihrer Capture-Dateien

In diesem Beispiel werden die erfassten Daten im Azure Blob Storage gespeichert. Das Skript in diesem Abschnitt liest die erfassten Datendateien aus Ihrem Azure-Speicherkonto und generiert CSV-Dateien, damit Sie sie ganz einfach öffnen und anzeigen können. Im aktuellen Arbeitsverzeichnis der Anwendung werden 10 Dateien angezeigt. Diese Dateien enthalten die Umgebungswerte für die 10 Geräte.

  1. Erstellen Sie in Ihrem Python-Editor ein Skript namens capturereader.py. Dieses Skript liest die erfassten Dateien und erstellt eine Datei für jedes Gerät, um die Daten nur für dieses Gerät zu schreiben.

  2. Fügen Sie den folgenden Code in capturereader.py ein.

    import os
    import string
    import json
    import uuid
    import avro.schema
    
    from azure.storage.blob import ContainerClient, BlobClient
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    
    def processBlob2(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            filename = os.getcwd() + '\\' + str(device) + '.csv'
            deviceFile = open(filename, "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing():
        print('Processor started using path: ' + os.getcwd())
        # Create a blob container client.
        container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME")
        blob_list = container.list_blobs() # List all the blobs in the container.
        for blob in blob_list:
            # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).        
            if blob.size > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                # Create a blob client for the blob.
                blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
                # Construct a file name based on the blob name.
                cleanName = str.replace(blob.name, '/', '_')
                cleanName = os.getcwd() + '\\' + cleanName 
                with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                    my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
                processBlob2(cleanName) # Convert the file into a CSV file.
                os.remove(cleanName) # Remove the original downloaded file.
                # Delete the blob from the container after it's read.
                container.delete_blob(blob.name)
    
    startProcessing()    
    
  3. Ersetzen Sie AZURE STORAGE CONNECTION STRING durch die Verbindungszeichenfolge für Ihr Azure-Speicherkonto. Der Name des Containers, den Sie in diesem Schnellstart erstellt haben, ist capture. Wenn Sie einen anderen Namen für den Container verwendet haben, ersetzen Sie die Erfassung durch den Namen des Containers im Speicherkonto.

Ausführen der Skripts

  1. Öffnen Sie eine Eingabeaufforderung mit Python im Pfad, und führen Sie dann die folgenden Befehle aus, um die erforderlichen Python-Pakete zu installieren:

    pip install azure-storage-blob
    pip install azure-eventhub
    pip install avro-python3
    
  2. Ändern Sie Ihr Verzeichnis in das Verzeichnis, in dem Sie sender.py und capturereader.py gespeichert haben, und führen Sie den folgenden Befehl aus:

    python sender.py
    

    Dieser Befehl startet einen neuen Python-Prozess, um den Absender auszuführen.

  3. Warten Sie einige Minuten, bis die Aufzeichnung ausgeführt wird, und geben Sie dann den folgenden Befehl in Das ursprüngliche Befehlsfenster ein:

    python capturereader.py
    

    Dieser Erfassungsprozessor verwendet das lokale Verzeichnis, um alle Blobs aus dem Speicherkonto und Container herunterzuladen. Es verarbeitet Dateien, die nicht leer sind, und schreibt die Ergebnisse als CSV-Dateien in das lokale Verzeichnis.

Nächste Schritte

Schauen Sie sich Python-Beispiele auf GitHub an.