この記事では、Azure Databricks で構造化ストリーミング ワークロードを実行するときに、Apache Kafka をソースまたはシンクとして使用する方法について説明します。
Kafka の詳細については、Kafka のドキュメントを参照してください。
Kafka からデータを読み取る
Kafka から読み取られたストリーミングの例を次に示します。
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Azure Databricks では、次の例に示すように、Kafka データ ソースのバッチ読み取りセマンティクスもサポートされています。
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
増分バッチ読み込みの場合、Databricks では、Trigger.AvailableNow
で Kafka を使用することをお勧めします。 「増分バッチ処理の構成」を参照してください。
Databricks Runtime 13.3 LTS 以降では、Azure Databricks には Kafka データを読み取るための SQL 関数が用意されています。 SQL を使用したストリーミングは、Lakeflow 宣言パイプラインまたは Databricks SQL のストリーミング テーブルでのみサポートされます。
テーブル値関数read_kafka
参照してください。
Kafka 構造化ストリーミング リーダーを構成する
Azure Databricks では、Kafka 0.10 以降への接続を構成するためのデータ形式として kafka
キーワードが提供されます。
Kafka の最も一般的な構成を次に示します。
サブスクライブするトピックを指定するには、複数の方法があります。 次のいずれかのパラメーターのみを指定する必要があります。
オプション | 値 | 説明 |
---|---|---|
購読 | トピックのコンマ区切りの一覧。 | 購読するトピックの一覧。 |
購読パターン | Java 正規表現文字列。 | トピックのサブスクライブに使用するパターン。 |
割り当てる | JSON 文字列 {"topicA":[0,1],"topic":[2,4]} 。 |
使用する特定の topicPartition。 |
その他の注目すべき構成:
オプション | 値 | Default value | 説明 |
---|---|---|---|
kafka.bootstrap.servers | host:port のコンマ区切りリスト。 | 空 | [必須] Kafka bootstrap.servers の構成。 Kafka からのデータがない場合は、まずブローカーのアドレス一覧を確認してください。 ブローカーのアドレス一覧が正しくない場合は、エラーがない可能性があります。 これは、Kafka クライアントによって、ブローカーは最終的に利用可能になると想定され、ネットワーク エラーが発生した場合は永久に再試行されるためです。 |
データ消失時に失敗させる (failOnDataLoss) |
true または false 。 |
true |
[省略可能]データが失われた可能性がある場合にクエリを失敗させるかどうか。 クエリは、トピックの削除、処理前のトピックの切り捨てなど、多くのシナリオが原因で、Kafka からのデータの読み取りに永続的に失敗する可能性があります。 データが失われた可能性があるかどうかについて控えめな推定を試みます。 これにより、誤ってアラームが発生する場合があります。 このオプションは、期待どおりに動作しない場合、またはデータ損失にもかかわらずクエリの処理を続行する場合に false に設定します。 |
最小パーティション数 (minPartitions) | >= 0 の整数、0 = 無効。 | 0 (無効) | [省略可能] Kafka から読み取るパーティションの最小数。
minPartitions オプションを使用して、Kafka から読み取るパーティションの最小数に任意の数を指定して Spark を構成することができます。 通常、Spark では、Kafka の topicPartitions と、Kafka から使用する Spark パーティションが 1 対 1 でマッピングされます。
minPartitions オプションを Kafka topicPartitions より大きい値に設定すると、Spark は大きな Kafka パーティションをより小さな部分に分割します。 ピーク時の負荷やデータの偏りがあるとき、またはストリームが遅れている際に処理速度を上げるためにこのオプションを設定できます。 トリガーごとに Kafka コンシューマーを初期化すると、Kafka に接続するときに SSL を使用する場合、パフォーマンスに影響を与える可能性があります。 |
kafka.group.id | Kafka コンシューマー グループ ID。 | 設定しない | [省略可能] Kafka から読み取り中に使用するグループ ID。 注意して使用する必要があります。 既定では、各クエリによって、データを読み取るための一意のグループ ID が生成されます。 これにより、各クエリでは、独自のコンシューマー グループが使用され、他のコンシューマーによる干渉を受けないため、サブスクライブされたトピックのすべてのパーティションを確実に読み取ることができます。 一部のシナリオ (たとえば、Kafka グループ ベースの承認) では、データを読み取るために特定の承認済みグループ ID を使用することが必要な場合があります。 必要に応じて、グループ ID を設定できます。 ただし、予期しない動作が発生する可能性があるため、細心の注意を払って設定してください。
|
スターティングオフセット | earliest、latest | 最新 | [省略可能] クエリが開始される時点の開始ポイントは、最も古いオフセットから始める場合は「最も古い」、もしくは各TopicPartitionの開始オフセットを指定するjson文字列のいずれかです。 json では、オフセットとして -2 を使用して最も古い内容を参照し、-1 を使用して最新の内容を参照できます。 注: バッチ クエリの場合、latest (暗黙的に、または JSON で -1 を使用して) は許可されません。 ストリーミング クエリの場合、これは新しいクエリが開始されたときにのみ適用され、再開は常にクエリが中断された場所から取得されます。 クエリ中に新しく検出されたパーティションは、earliest で開始されます。 |
他の省略可能な構成については、「構造化ストリーミング + Kafka 統合ガイド」を参照してください。
Kafka レコードのスキーマ
Kafka レコードのスキーマは次のとおりです。
コラム | タイプ |
---|---|
鍵 | [バイナリ] |
価値 | [バイナリ] |
トピック | ひも |
パーティション | 整数 (int) |
オフセット | 長い |
タイムスタンプ(時刻印) | 長い |
タイムスタンプタイプ | 整数 (int) |
key
と value
は、ByteArrayDeserializer
を使用して常にバイト配列として逆シリアル化されます。 DataFrame 操作 ( cast("string")
など) を使用して、キーと値を明示的に逆シリアル化します。
Kafka にデータを書き込む
Kafka へのストリーミング書き込みの例を次に示します。
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Azure Databricks では、次の例に示すように、Kafka データ シンクへのバッチ書き込みセマンティクスもサポートされています。
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Kafka 構造化ストリーミング ライターを構成する
重要
Databricks Runtime 13.3 LTS 以降には、既定でべき等の書き込みを有効にする新しいバージョンの kafka-clients
ライブラリが含まれています。 Kafka シンクでバージョン 2.8.0 以下を使用していて、ACL が構成されているが IDEMPOTENT_WRITE
が有効になっていない場合、書き込みはエラー メッセージ org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
で失敗します。
このエラーを解決するには、Kafka バージョン 2.8.0 以降にアップグレードするか、構造化ストリーミング ライターの構成時に .option(“kafka.enable.idempotence”, “false”)
を設定します。
DataStreamWriter に提供されるスキーマは、Kafka シンクと対話します。 次のフィールドを使用できます。
列名 | 必須または省略可能 | タイプ |
---|---|---|
key |
省略可能 |
STRING または BINARY |
value |
必須 |
STRING または BINARY |
headers |
省略可能 | ARRAY |
topic |
省略可能 ( topic がライター オプションとして設定されている場合は無視されます) |
STRING |
partition |
省略可能 | INT |
Kafka への書き込み中に設定される一般的なオプションを次に示します。
オプション | 値 | 既定値 | 説明 |
---|---|---|---|
kafka.boostrap.servers |
コンマ区切りのリスト <host:port> |
なし | [必須] Kafka bootstrap.servers の構成。 |
topic |
STRING |
設定しない | [省略可能] すべての行のトピックが書き込まれるように設定します。 このオプションは、データに存在するすべてのトピック列をオーバーライドします。 |
includeHeaders |
BOOLEAN |
false |
[省略可能] 行に Kafka ヘッダーを含めるかどうか。 |
他の省略可能な構成については、「構造化ストリーミング + Kafka 統合ガイド」を参照してください。
Kafka メトリックを取得する
ストリーミング クエリがサブスクライブされているすべてのトピックにおいて、利用可能な最新のオフセットに対してどれだけ遅れているかを示すオフセット数の平均、最小、最大を、avgOffsetsBehindLatest
、maxOffsetsBehindLatest
、minOffsetsBehindLatest
というメトリクスを使用して取得できます。 「対話形式によるメトリックの読み取り」を参照してください。
注意
Databricks Runtime 9.1 以降で使用できます。
estimatedTotalBytesBehindLatest
の値を調べることで、サブスクライブされたトピックからクエリ プロセスが消費していない推定合計バイト数を取得します。 この推定値は、過去 300 秒間に処理されたバッチ数に基づきます。 推定値の基になる期間は、オプション bytesEstimateWindowLength
を別の値に設定することによって変更できます。 たとえば、10 分に設定するには、次のようにします。
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
ノートブックでストリームを実行している場合、これらのメトリックは、ストリーミング クエリの進行状況ダッシュボードの [生データ] タブに表示されます。
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
SSL を使用して Azure Databricks を Kafka に接続する
Kafka への SSL 接続を有効にするには、Confluent ドキュメントの SSL による暗号化と認証の手順に従います。 オプションとして、そこで説明されている構成を、プレフィックス kafka.
を付けて指定できます。 たとえば、プロパティ kafka.ssl.truststore.___location
で、信頼ストアの場所を指定します。
Databricks では、次のことが推奨されています。
- 証明書をクラウド オブジェクト ストレージに格納します。 証明書へのアクセスは、Kafka にアクセスできるクラスターのみに制限できます。 Azure Databricks を使用したデータ ガバナンスに関するページを参照してください。
- 証明書のパスワードをシークレットとしてシークレット スコープに格納します。
次の例では、オブジェクト ストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.___location", <truststore-___location>)
.option("kafka.ssl.keystore.___location", <keystore-___location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
HDInsight 上の Kafka を Azure Databricks に接続する
HDInsight Kafka クラスターを作成します。
手順については、「Azure Virtual Network 経由で HDInsight 上の Apache Kafka に接続する」を参照してください。
正しいアドレスをアドバタイズするように Kafka ブローカーを構成します。
「IP をアドバタイズするように Kafka を構成する」の手順に従います。 Azure Virtual Machines で Kafka を自分で管理する場合は、ブローカーの
advertised.listeners
構成がホストの内部 IP に設定されていることを確認します。Azure Databricks クラスターを作成します。
Kafka クラスターを Azure Databricks クラスターにピアリングします。
「仮想ネットワークをピアリングする」の手順に従います。
Microsoft Entra ID と Azure Event Hubs を使用したサービス プリンシパル認証
Azure Databricks は、Event Hubs サービスを使用した Spark ジョブの認証をサポートしています。 認証は Microsoft Entra ID によって OAuth で経由で行われます。
Azure Databricks では、以下のコンピューティング環境で、クライアント ID とシークレットを使用した Microsoft Entra ID 認証をサポートしています。
- 専用アクセス モード (以前のシングル ユーザー アクセス モード) で構成されたコンピューティング上の Databricks Runtime 12.2 LTS 以降。
- Databricks Runtime 14.3 LTS以降では、標準アクセスモード(以前は共有アクセスモード)で構成されたコンピュートを使用します。
- Unity カタログなしで構成された Lakeflow 宣言パイプライン。
Azure Databricks では、どのコンピューティング環境でも、Unity カタログで構成された Lakeflow 宣言パイプラインでも、証明書を使用した Microsoft Entra ID 認証はサポートされていません。
この認証は、標準アクセス モードのコンピューティングや Unity Catalog Lakeflow 宣言パイプラインでは機能しません。
AWS MSK と Azure Event Hubs の Unity カタログ サービス資格情報のサポート
DBR 16.1 のリリース以降、Azure Databricks では、Aws Managed Streaming for Apache Kafka (MSK) と Azure Event Hubs へのアクセスを認証するための Unity カタログ サービス資格情報がサポートされています。 Azure Databricks では、共有クラスターで Kafka ストリーミングを実行し、サーバーレス コンピューティングを使用する場合に、このアプローチをお勧めします。
認証に Unity カタログ サービスの資格情報を使用するには、次の手順に従います。
- 新しい Unity カタログ サービス資格情報を作成します。 このプロセスに慣れていない場合は、 サービス資格情報の作成 に関する手順を参照してください。
- Kafka 構成のソース オプションとして、Unity カタログ サービスの資格情報の名前を指定します。
databricks.serviceCredential
オプションをサービス資格情報の名前に設定します。
注: Kafka に Unity カタログ サービスの資格情報を指定する場合は、不要になったこれらのオプションを指定 しないでください 。
kafka.sasl.mechanism
kafka.sasl.jaas.config
kafka.security.protocol
kafka.sasl.client.callback.handler.class
kafka.sasl.oauthbearer.token.endpoint.url
構造化ストリーミング Kafka コネクタの構成
Microsoft Entra ID で認証を実行するには、次の値が必要です。
テナント ID。 これは、Microsoft Entra ID の [サービス] タブにあります。
clientID (アプリケーション ID とも呼ばれます)。
クライアント シークレット。 これを取得したら、シークレットとして Databricks ワークスペースに追加する必要があります。 このシークレットを追加するには、「シークレットの管理」を参照してください。
EventHubs トピック。 トピックの一覧は、特定の Event Hubs 名前空間ページの [ エンティティ ] セクションの Event Hubs セクションにあります。 複数のトピックを操作するには、Event Hubs レベルで IAM ロールを設定します。
EventHubs サーバー。 これは、特定の Event Hubs 名前空間の概要ページにあります。
さらに、Entra ID を使用するには、OAuth SASL メカニズム (SASL は汎用プロトコルであり、OAuth は SASL "メカニズム" の一種です) を使用するように Kafka に指示する必要があります。
-
kafka.security.protocol
は、SASL_SSL
である必要があります。 -
kafka.sasl.mechanism
は、OAUTHBEARER
である必要があります。 -
kafka.sasl.login.callback.handler.class
は、シェーディングされた Kafka クラスのログイン コールバック ハンドラーについて値kafkashaded
を持つ Java クラスの完全修飾名にする必要があります。 正確なクラスについては、次の例を参照してください。
例
次に、実行中の例を見てみましょう。
Python(プログラミング言語)
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
スカラ (プログラミング言語)
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
潜在的なエラーの処理
ストリーミング オプションがサポートされません。
Unity カタログで構成された Lakeflow 宣言パイプラインでこの認証メカニズムを使用しようとすると、次のエラーが発生する可能性があります。
このエラーを解決するには、サポートされているコンピューティング構成を使用します。 「Microsoft Entra ID と Azure Event Hubs を使用したサービス プリンシパル認証」を参照してください。
新しい
KafkaAdminClient
の作成に失敗しました。これは、次のいずれかの認証オプションが正しくない場合に Kafka がスローする内部エラーです。
- クライアント ID (アプリケーション ID とも呼ばれます)
- テナント ID
- Event Hubs サーバー
エラーを解決するには、これらのオプションの値が正しいことを確認します。
また、この例で既定で提供されている (変更しないように求められた)
kafka.security.protocol
などの構成オプションを変更すると、このエラーが表示される場合があります。返されるレコードがありません
DataFrame を表示または処理しようとしても結果が得られない場合は、UI に次の情報が表示されます。
このメッセージは、認証が成功したが、EventHubs がデータを返さなかったことを意味します。 次のような理由が考えられます (ただし、決して網羅的ではありません)。
- 正しくない EventHubs トピックを指定しました。
-
startingOffsets
の既定の Kafka 構成オプションはlatest
であり、現在、このトピックを通じてデータを受信していません。startingOffsets
をearliest
に設定すると、Kafka の最も古いオフセットからデータの読み取りを開始できます。