次の方法で共有


クイック スタート: Azure Event Hubs との間でイベントを送受信する

このクイック スタートでは、 azure-messaging-eventhubs Java パッケージを使用して、Azure イベント ハブとの間でイベントを送受信する方法について説明します。

ヒント

Spring アプリケーションで Azure Event Hubs リソースを使用している場合は、 Spring Cloud Azure を別の方法として検討することをお勧めします。 Spring Cloud Azure は、Spring と Azure サービスのシームレスな統合を実現するオープンソース プロジェクトです。 Spring Cloud Azure の詳細と Event Hubs の使用例については、Azure Event Hubs での Spring Cloud Stream に関するページを参照してください。

[前提条件]

Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に 、Event Hubs の概要 を参照してください。

このクイックスタートを完了するには、次の前提条件が必要です。

  • Microsoft Azure サブスクリプション。 Azure Event Hubs を含む Azure サービスを使用するには、サブスクリプションが必要です。 既存の Microsoft Azure アカウントをお持ちでない場合は、アカウントを作成する際に、無料試用版にサインアップするか、MSDN サブスクライバー特典を利用できます。
  • Java 開発環境。 このクイック スタートでは 、Eclipse を使用します。 バージョン 8 以上の Java Development Kit (JDK) が必要です。
  • Event Hubs 名前空間とイベント ハブを作成する。 最初の手順では、 Azure portal を使用して Event Hubs 型の名前空間を作成し、アプリケーションがイベント ハブと通信するために必要な管理資格情報を取得します。 名前空間とイベント ハブを作成するには、こちらの記事の手順に従います。 次に、 Event Hubs 名前空間の接続文字列を 取得します。接続文字列の 取得に関する記事の手順に従います。 接続文字列は、このクイック スタートの後半で使用します。

イベントを送信する

このセクションでは、イベント ハブにイベントを送信する Java アプリケーションを作成する方法について説明します。

Azure Event Hubs ライブラリへの参照を追加する

まず、お気に入りの Java 開発環境でコンソール/シェル アプリケーション用の新しい Maven プロジェクトを作成します。 pom.xml ファイルを次のように更新します。 Event Hubs 用の Java クライアント ライブラリは、 Maven Central リポジトリで入手できます。

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.20.2</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.16.1</version>
		    <scope>compile</scope>
		</dependency>

Maven リポジトリに発行された最新バージョンにバージョンを更新します。

Azure に対してアプリを認証する

このクイック スタートでは、Azure Event Hubs に接続する 2 つの方法について説明します。

  • パスワードレス。 Microsoft Entra ID とロールベースのアクセス制御 (RBAC) のセキュリティ プリンシパルを使用して、Event Hubs 名前空間に接続します。 コード、構成ファイル、または Azure Key Vault などのセキュリティで保護されたストレージに、ハードコーディングされた接続文字列を含める心配はありません。
  • 接続文字列。 接続文字列を使用して Event Hubs 名前空間に接続します。 Azure を初めて使用する場合は、接続文字列オプションの方が理解しやすいかもしれません。

実際のアプリケーションと運用環境では、パスワードレス オプションを使用することをお勧めします。 詳細については、Azure サービスの Service Bus の認証と承認とパスワードレス接続に関するページを参照してください。

Microsoft Entra ユーザーにロールを割り当てる

ローカルで開発するときは、Azure Event Hubs に接続するユーザー アカウントに適切なアクセス許可があることを確認します。 メッセージを送受信するには、 Azure Event Hubs データ所有者 ロールが必要です。 このロールを自分に割り当てるには、ユーザー アクセス管理者ロール、または Microsoft.Authorization/roleAssignments/write アクションを含む別のロールが必要です。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 詳細については、「 Azure RBAC のスコープについて」ページを 参照してください。

次の例では、ユーザー アカウントに Azure Event Hubs Data Owner ロールを割り当てます。これにより、Azure Event Hubs リソースにフル アクセスできます。 実際のシナリオでは、より安全な運用環境を実現するため、最小限の特権の原則に従って、必要な最小限のアクセス許可のみをユーザーに付与します。

Azure Event Hubs の Azure の組み込みロール

Azure Event Hubs の場合、Azure portal および Azure リソース管理 API による名前空間とそれに関連するすべてのリソースの管理は、Azure RBAC モデルを使用して既に保護されています。 Azure には、Event Hubs 名前空間へのアクセスを承認するための次の組み込みロールが用意されています。

  • Azure Event Hubs データ所有者: Event Hubs 名前空間とそのエンティティ (キュー、トピック、サブスクリプション、フィルター) へのデータ アクセスを有効にします。
  • Azure Event Hubs データ送信者: このロールを使用して、Event Hubs 名前空間とそのエンティティへのアクセス権を送信者に付与します。
  • Azure Event Hubs データ受信者: このロールを使用して、Event Hubs 名前空間とそのエンティティへのアクセス権を受信者に付与します。

カスタム ロールを作成する場合は、Event Hubs 操作に必要な権限に関するページを参照してください。

Von Bedeutung

ほとんどの場合、ロールの割り当てが Azure に伝達されるまでの時間は 1、2 分です。 まれに、最大 8 分かかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。

  1. Azure portal で、メインの検索バーまたは左側のナビゲーションを使用して Event Hubs 名前空間を見つけます。

  2. 概要ページで、左側のメニューから [アクセス制御 (IAM)] を選択します。

  3. [アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。

  4. 上部のメニューから [ + 追加] を選択します。 次に、[ ロールの割り当ての追加] を選択します。

    ロールを割り当てる方法を示すスクリーンショット。

  5. 検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、Azure Event Hubs Data Owner を検索して一致する結果を選択します。 [次へ] を選びます。

  6. [ アクセスの割り当て] で、[ ユーザー、グループ、またはサービス プリンシパル] を選択します。 [ + メンバーの選択] を選択します

  7. ダイアログで、Microsoft Entra ユーザー名 (通常は user@___domain のメール アドレス) を検索します。 ダイアログの下部にある [選択] を 選択 します。

  8. [ 確認と割り当て] を選択して、最後のページに移動します。 [ 確認と割り当て ] をもう一度選択してプロセスを完了します。

イベント ハブにメッセージを送信するコードを記述する

Senderという名前のクラスを追加し、次のコードをクラスに追加します。

Von Bedeutung

  • <NAMESPACE NAME>を Event Hubs 名前空間の名前で更新します。
  • イベント ハブの名前で <EVENT HUB NAME> を更新します。
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

プログラムをビルドし、エラーがないことを確認します。 このプログラムは、受信側プログラムを実行した後に実行します。

受信イベント

このチュートリアルのコードは、 GitHub の EventProcessorClient サンプルに基づいており、完全に動作するアプリケーションを確認できます。

チェックポイント ストアとして Azure Blob Storage を使用する場合は、次の推奨事項に従います。

  • コンシューマー グループごとに個別のコンテナーを使用します。 同じストレージ アカウントを使用できますが、各グループごとに 1 つのコンテナーを使用します。
  • ストレージ アカウントを他の目的で使用しないでください。
  • コンテナーを他の目的で使用しないでください。
  • デプロイされたアプリケーションと同じリージョンにストレージ アカウントを作成します。 アプリケーションがオンプレミスの場合は、可能な中で最も近いリージョンを選択することを試みてください。

Azure portal の [ストレージ アカウント] ページの [Blob service] セクションで、次の設定が無効になっていることを確認してください。

  • 階層型名前空間
  • BLOB の論理的な削除
  • バージョン管理

Azure Storage と BLOB コンテナーを作成する

このクイック スタートでは、チェックポイント ストアとして Azure Storage (具体的には Blob Storage) を使用します。 チェックポイント処理は、イベント プロセッサがパーティション内で最後に正常に処理されたイベントの位置をマークまたはコミットするプロセスです。 チェックポイントのマーキングは、通常、イベントを処理する関数内で行われます。 チェックポイント処理の詳細については、「 イベント プロセッサ」を参照してください。

Azure ストレージ アカウントを作成するには、次の手順に従います。

  1. Azure Storage アカウントを作成する
  2. BLOB コンテナーを作成する
  3. BLOB コンテナーへのアクセスを認証する

ローカルで開発する場合は、BLOB データにアクセスするユーザー アカウントに適切なアクセス許可があることを確認します。 BLOB データの読み取りと書き込みを行うには、 ストレージ BLOB データ共同作成者 が必要です。 このロールを自分に割り当てるには、 ユーザー アクセス管理者 ロール、または Microsoft.Authorization/roleAssignments/write アクションを含む別のロールを割り当てる必要があります。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 詳しくは、「Azure RBAC のスコープについて」を参照してください。

このシナリオでは、ストレージ アカウントをスコープとするアクセス許可をユーザー アカウントに割り当てて、 最小限の特権の原則に従います。 この方法を使って、ユーザーに必要最小限のアクセス許可のみを与え、より安全な運用環境を作成します。

次の例では、 ストレージ BLOB データ共同作成者 ロールをユーザー アカウントに割り当てます。これによって、ストレージ アカウント内の BLOB データへの読み取りと書き込みの両方のアクセスが提供されます。

Von Bedeutung

ほとんどの場合、ロールの割り当てが Azure に伝達されるまでの時間は 1、2 分です。 まれに、最大 8 分かかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。

  1. Azure portal で、メインの検索バーまたは左側のナビゲーションを使ってストレージ アカウントを見つけます。

  2. ストレージ アカウント ページで、左側のメニューから [アクセス制御 (IAM)] を選択します。

  3. [アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。

  4. 上部のメニューから [ + 追加] を選択します。 次に、[ ロールの割り当ての追加] を選択します。

    ストレージ アカウント ロールを割り当てる方法を示すスクリーンショット。

  5. 検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、 ストレージ BLOB データ共同作成者を検索します。 一致する結果を選択し、次に [次へ]を押します。

  6. [アクセスの割り当て先] で、[ユーザー、グループ、またはサービス プリンシパル] を選び、[+ メンバーの選択] を選びます。

  7. ダイアログで、自分の Microsoft Entra ユーザー名 (通常は user@___domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。

  8. [ 確認と割り当て] を選択して、最後のページに移動します。 [ 確認と割り当て ] をもう一度選択してプロセスを完了します。

Event Hubs ライブラリを Java プロジェクトに追加する

pom.xml ファイルに次の依存関係を追加します。

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.20.2</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.20.6</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.16.1</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Java ファイルの先頭に次の import ステートメントを追加します。

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Receiverという名前のクラスを作成し、次の文字列変数をクラスに追加します。 プレースホルダーを正しい値に置き換えます。

    Von Bedeutung

    プレースホルダーを正しい値に置き換えます。

    • <NAMESPACE NAME> をご利用の Event Hubs 名前空間の名前に置換します。
    • <EVENT HUB NAME> 名前空間内のイベント ハブの名前を指定します。
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. 次の main メソッドをクラスに追加します。

    Von Bedeutung

    プレースホルダーを正しい値に置き換えます。

    • <STORAGE ACCOUNT NAME> を Azure Storage アカウントの名前で指定します。
    • <CONTAINER NAME> を、ストレージ アカウントの BLOB コンテナーの名前にします
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. イベントとエラーを処理する 2 つのヘルパー メソッド (PARTITION_PROCESSORERROR_HANDLER) を Receiver クラスに追加します。

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. プログラムをビルドし、エラーがないことを確認します。

アプリケーションの実行

  1. 最初に Receiver アプリケーションを実行します。

  2. 次に、 Sender アプリケーションを実行します。

  3. [受信側アプリケーション] ウィンドウで、Sender アプリケーションによって発行されたイベントが表示されることを確認します。

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. 受信側アプリケーション ウィンドウで Enter キー を押して、アプリケーションを停止します。

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

GitHub の次のサンプルを参照してください。