次の方法で共有


Azure Stream Analytics での異常検出

クラウドと Azure IoT Edge の両方で使用できる Azure Stream Analytics には、機械学習ベースの異常検出機能が組み込まれており、最も一般的に発生する 2 つの異常 (一時的と永続的) を監視するために使用できます。 AnomalyDetection_SpikeAndDip 関数と AnomalyDetection_ChangePoint 関数を使用すると、Stream Analytics ジョブで直接異常検出を実行できます。

機械学習モデルは、一様にサンプリングされた時系列を前提としています。 時系列が均一でない場合は、異常検出を呼び出す前に、タンブリング ウィンドウを使用して集計ステップを挿入できます。

現時点では、機械学習の操作では、季節性の傾向や多変量相関はサポートされていません。

Azure Stream Analytics での機械学習を使用した異常検出

次のビデオでは、Azure Stream Analytics の機械学習関数を使用して、リアルタイムで異常を検出する方法を示します。

モデルの動作

一般に、モデルの精度は、スライディング ウィンドウ内のデータが多いほど向上します。 指定されたスライディング ウィンドウ内のデータは、その時間枠の通常の値範囲の一部として扱われます。 このモデルは、現在のイベントが異常であるかどうかを確認するために、スライディング ウィンドウ上のイベント履歴のみを考慮します。 スライディング ウィンドウが移動すると、古い値はモデルのトレーニングから削除されます。

関数は、これまでに見たものに基づいて特定の法線を確立することによって動作します。 外れ値は、信頼水準内で確立された正規値と比較することによって識別されます。 ウィンドウ サイズは、異常が発生したときにそれを認識できるように、モデルを正常な動作にトレーニングするために必要な最小イベントに基づいている必要があります。

モデルの応答時間は、過去の多数のイベントと比較する必要があるため、履歴サイズとともに長くなります。 パフォーマンスを向上させるために、必要な数のイベントのみを含めることをお勧めします。

時系列のギャップは、モデルが特定の時点でイベントを受け取らなかったことが原因である可能性があります。 この状況は、代入ロジックを使用して Stream Analytics によって処理されます。 同じスライディング ウィンドウの履歴サイズと期間を使用して、イベントが到着すると予想される平均レートが計算されます。

ここで使用可能な異常ジェネレーターを使用して、さまざまな異常パターンのデータを IoT Hub にフィードできます。 Azure Stream Analytics ジョブは、これらの異常検出関数を使用して、この IoT Hub から読み取り、異常を検出するように設定できます。

スパイクとディップ

時系列イベント ストリームの一時的な異常は、スパイクとディップと呼ばれます。 スパイクとディップは、機械学習ベースのオペレーター AnomalyDetection_SpikeAndDipを使用して監視できます。

スパイクとディップの異常の例

同じスライディング ウィンドウで、2 番目のスパイクが最初のスパイクよりも小さい場合、小さいスパイクの計算されたスコアは、指定された信頼水準内で最初のスパイクのスコアと比較して十分に有意ではない可能性があります。 モデルの信頼度を下げてみて、このような異常を検出できます。 ただし、アラートが多すぎる場合は、より高い信頼区間を使用できます。

次のクエリ例では、120 イベントの履歴を持つ 2 分間のスライディング ウィンドウで 1 秒あたり 1 イベントの均一な入力レートを想定しています。 最後の SELECT ステートメントは、スコアと異常ステータスを 95%の信頼度で抽出して出力します。

WITH AnomalyDetectionStep AS
(
    SELECT
        EVENTENQUEUEDUTCTIME AS time,
        CAST(temperature AS float) AS temp,
        AnomalyDetection_SpikeAndDip(CAST(temperature AS float), 95, 120, 'spikesanddips')
            OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
    FROM input
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS float) AS
    SpikeAndDipScore,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS bigint) AS
    IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep

変更点

時系列イベント ストリームの永続的な異常は、レベルの変化や傾向など、イベント ストリーム内の値の分布の変化です。 Stream Analytics では、このような異常は Machine Learning ベースの AnomalyDetection_ChangePoint 演算子を使用して検出されます。

持続的な変化は、急上昇や急降下よりもはるかに長く続き、壊滅的なイベントを示している可能性があります。 永続的な変化は通常、肉眼では見えませんが、 AnomalyDetection_ChangePoint 演算子で検出できます。

次の図は、レベル変更の例です。

レベル変更異常の例

次の図は、トレンドの変化の例です。

トレンド変化の異常の例

次のクエリ例では、履歴サイズが 1,200 イベントの 20 分間のスライディング ウィンドウで 1 秒あたり 1 イベントの均一な入力レートを想定しています。 最後の SELECT ステートメントは、スコアと異常ステータスを 80%の信頼度で抽出して出力します。

WITH AnomalyDetectionStep AS
(
    SELECT
        EVENTENQUEUEDUTCTIME AS time,
        CAST(temperature AS float) AS temp,
        AnomalyDetection_ChangePoint(CAST(temperature AS float), 80, 1200) 
        OVER(LIMIT DURATION(minute, 20)) AS ChangePointScores
    FROM input
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(ChangePointScores, 'Score') AS float) AS
    ChangePointScore,
    CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') AS bigint) AS
    IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep

パフォーマンス特性

これらのモデルのパフォーマンスは、履歴サイズ、ウィンドウ期間、イベント負荷、および関数レベルのパーティション分割が使用されているかどうかによって異なります。 このセクションでは、これらの構成について説明し、1 秒あたり 1 K、5 K、10K のイベント取り込みレートを維持する方法のサンプルを提供します。

  • 履歴サイズ - これらのモデルは 、履歴サイズと直線的に実行されます。 履歴サイズが長いほど、モデルが新しいイベントをスコアリングするのにかかる時間が長くなります。 これは、モデルが新しいイベントを履歴バッファー内の過去の各イベントと比較するためです。
  • ウィンドウ期間 - ウィンドウ期間は 、履歴サイズで指定された数のイベントを受信するのにかかる時間を反映する必要があります。 ウィンドウにそれほど多くのイベントがない場合、Azure Stream Analytics は欠落している値を補完します。 したがって、CPU 消費量は履歴サイズの関数です。
  • イベント負荷 - イベント負荷が大きいほど、モデルによって実行される作業が多くなり、CPU 消費量に影響します。 ジョブは、ビジネスロジックがより多くの入力パーティションを使用するのが理にかなっていると仮定して、恥ずかしいほど並列にすることでスケールアウトできます。
  • 関数レベルのパーティション分割 - 関数レベルのパーティション分割 は、異常検出関数呼び出し内で PARTITION BY を使用して行われます。 このタイプのパーティション分割では、複数のモデルの状態を同時に維持する必要があるため、オーバーヘッドが増加します。 機能レベルのパーティション分割は、デバイス レベルのパーティション分割などのシナリオで使用されます。

リレーションシップ

履歴サイズ、ウィンドウ期間、および合計イベント負荷は、次のように関連しています。

windowDuration (ミリ秒単位) = 1000 * historySize / (1 秒あたりの合計入力イベント / 入力パーティション数)

deviceId で関数を分割する場合は、異常検出関数呼び出しに "PARTITION BY deviceId" を追加します。

所見

次の表に、非分割ケースの単一ノード (6 SU) のスループット観測値を示します。

履歴サイズ (イベント) ウィンドウ期間 (ミリ秒) 1 秒あたりの合計入力イベント
六十 55 2,200
600 728 1,650
6,000 10,910 1,100

次の表に、パーティション分割ケースの 1 つのノード (6 SU) のスループット観測値を示します。

履歴サイズ (イベント) ウィンドウ期間 (ミリ秒) 1 秒あたりの合計入力イベント デバイス数
六十 1,091 1,100 10
600 10,910 1,100 10
6,000 218,182 <550 10
六十 21,819 550 100
600 218,182 550 100
6,000 2,181,819 <550 100

上記のパーティション分割されていない構成を実行するためのサンプル コードは、Azure サンプルの Streaming At Scale リポジトリ にあります。 このコードでは、関数レベルのパーティション分割を行わずに Stream Analytics ジョブを作成し、Event Hubs を入力と出力として使用します。 入力負荷は、テスト クライアントを使用して生成されます。 各入力イベントは 1 KB の json ドキュメントです。 イベントは、JSON データを送信する IoT デバイスをシミュレートします (最大 1 K デバイス)。 履歴サイズ、ウィンドウ期間、および合計イベント負荷は、2 つの入力パーティションで異なります。

見積もりの精度を高めるには、ご使用のシナリオに合わせてサンプルをカスタマイズしてください。

ボトルネックの特定

パイプラインのボトルネックを特定するには、Azure Stream Analytics ジョブの [メトリック] ウィンドウを使用します。 スループットについての [Input/Output Events](入出力イベント) および [透かしの遅延] または [Backlogged Events](バックログされたイベント) を確認して、ジョブが入力速度に対応しているかどうかを確認します。 Event Hubs のメトリックスについては、スロットルされた要求 を検索し、その結果に基づいてしきい値ユニットを調整します。 Azure Cosmos DB メトリックスについては、スループットの下の [パーティション キーの範囲ごとの使用された最大 RU/秒] を確認して、パーティション キーの範囲が均一に消費されていることを確認します。 Azure SQL DB については、 [ログ IO] および [CPU] を監視します。

デモ ビデオ

次のステップ