次の方法で共有


Spark Submit タスクの廃止通知&移行ガイド

Warnung

Spark 送信タスクは非推奨となり、削除が保留中です。 このタスクの種類の使用は、新しいユース ケースでは禁止され、既存の顧客には強く推奨されません。 このタスクの種類の元のドキュメントについては、Spark Submit(レガシー)を参照してください。 移行の手順については、お読みください。

Spark Submit が非推奨になる理由

Spark Submit タスクの種類は、JARNotebook、または Python スクリプト タスクに含まれていない技術的な制限と機能のギャップにより非推奨になっています。 これらのタスクにより、Databricks の機能との統合が向上し、パフォーマンスが向上し、信頼性が向上します。

非推奨の措置

Databricks は、廃止に関連して次の措置を実施しています。

  • 制限付き作成: 2025 年 11 月以降、前月に Spark Submit タスクを使用したユーザーのみが、新しい Spark 送信タスクを 作成できます。 例外が必要な場合は、アカウント サポートにお問い合わせください。
  • DBR バージョンの制限: Spark Submit の使用は、既存の DBR バージョンとメンテナンス リリースに制限されます。 Spark Submit を使用する既存の DBR バージョンは、機能が完全にシャットダウンされるまで、セキュリティとバグ修正のメンテナンス リリースを引き続き受け取ります。 DBR 17.3 以降と 18.x 以降では、このタスクの種類はサポートされません。
  • UI の警告: 警告は、Spark Submit タスクが使用されている Databricks UI 全体に表示され、通信は既存のユーザーのアカウントのワークスペース管理者に送信されます。

JVM ワークロードを JAR タスクに移行する

JVM ワークロードの場合は、 Spark Submit タスクを JAR タスクに移行します。 JAR タスクは、Databricks とのより優れた機能サポートと統合を提供します。

移行するには、次の手順に従います。

  1. ジョブに新しい JAR タスクを作成します。
  2. Spark Submit タスク パラメーターから、最初の 3 つの引数を特定します。 通常、次のパターンに従います。 ["--class", "org.apache.spark.mainClassName", "dbfs:/path/to/jar_file.jar"]
  3. --class パラメーターを削除します。
  4. JAR タスクの Main クラスとしてメイン クラス名 (たとえば、org.apache.spark.mainClassName) を設定します。
  5. JAR タスク構成で JAR ファイルへのパス (たとえば、 dbfs:/path/to/jar_file.jar) を指定します。
  6. Spark Submit タスクの残りの引数を JAR タスク パラメーターにコピーします。
  7. JAR タスクを実行し、期待どおりに動作することを確認します。

JAR タスクの構成の詳細については、 JAR タスクを参照してください。

R ワークロードの移行

Spark 送信タスクから直接 R スクリプトを起動する場合は、複数の移行パスを使用できます。

オプション A: ノートブック タスクを使用する

R スクリプトを Databricks ノートブックに移行します。 ノートブック タスクでは、クラスターの自動スケーリングを含む一連の機能がサポートされ、Databricks プラットフォームとの統合が向上します。

オプション B: ノートブック タスクから R スクリプトをブートストラップする

Notebook タスクを使用して R スクリプトをブートストラップします。 次のコードを使用してノートブックを作成し、ジョブ パラメーターとして R ファイルを参照します。 必要に応じて、R スクリプトで使用されるパラメーターを追加するように変更します。

dbutils.widgets.text("script_path", "", "Path to script")
script_path <- dbutils.widgets.get("script_path")
source(script_path)

Spark Submit タスクを使用するジョブを検索する

次の Python スクリプトを使用して、Spark Submit タスクを含むワークスペース内のジョブを識別できます。 有効な 個人用アクセスまたはその他のトークン が必要であり、 ワークスペースの URL を 使用する必要があります。

オプション A: 高速スキャン (この最初の永続的ジョブのみを実行)

このスクリプトは、永続的なジョブ ( /jobs/create または Web インターフェイスを使用して作成) のみをスキャンし、 /runs/submitを介して作成されたエフェメラル ジョブは含まれません。 これは、はるかに高速であるため、Spark Submit の使用状況を識別するための推奨される第 1 行の方法です。

#!/usr/bin/env python3
"""
Requirements:
    databricks-sdk>=0.20.0

Usage:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com"
    export DATABRICKS_TOKEN="your-token"
    python3 list_spark_submit_jobs.py

Output:
    CSV format with columns: Job ID, Owner ID/Email, Job Name

Incorrect:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com/?o=12345678910"
"""

import csv
import os
import sys
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import PermissionDenied


def main():
    # Get credentials from environment
    workspace_url = os.environ.get("DATABRICKS_HOST")
    token = os.environ.get("DATABRICKS_TOKEN")

    if not workspace_url or not token:
        print(
            "Error: Set DATABRICKS_HOST and DATABRICKS_TOKEN environment variables",
            file=sys.stderr,
        )
        sys.exit(1)

    # Initialize client
    client = WorkspaceClient(host=workspace_url, token=token)

    # Scan workspace for persistent jobs with Spark Submit tasks
    # Using list() to scan only persistent jobs (faster than list_runs())
    print(
        "Scanning workspace for persistent jobs with Spark Submit tasks...",
        file=sys.stderr,
    )
    jobs_with_spark_submit = []
    total_jobs = 0

    # Iterate through all jobs (pagination is handled automatically by the SDK)
    skipped_jobs = 0
    for job in client.jobs.list(expand_tasks=True, limit=25):
        try:
            total_jobs += 1
            if total_jobs % 1000 == 0:
                print(f"Scanned {total_jobs} jobs total", file=sys.stderr)

            # Check if job has any Spark Submit tasks
            if job.settings and job.settings.tasks:
                has_spark_submit = any(
                    task.spark_submit_task is not None for task in job.settings.tasks
                )

                if has_spark_submit:
                    # Extract job information
                    job_id = job.job_id
                    owner_email = job.creator_user_name or "Unknown"
                    job_name = job.settings.name or f"Job {job_id}"

                    jobs_with_spark_submit.append(
                        {"job_id": job_id, "owner_email": owner_email, "job_name": job_name}
                    )
        except PermissionDenied:
            # Skip jobs that the user doesn't have permission to access
            skipped_jobs += 1
            continue

    # Print summary to stderr
    print(f"Scanned {total_jobs} jobs total", file=sys.stderr)
    if skipped_jobs > 0:
        print(
            f"Skipped {skipped_jobs} jobs due to insufficient permissions",
            file=sys.stderr,
        )
    print(
        f"Found {len(jobs_with_spark_submit)} jobs with Spark Submit tasks",
        file=sys.stderr,
    )
    print("", file=sys.stderr)

    # Output CSV to stdout
    if jobs_with_spark_submit:
        writer = csv.DictWriter(
            sys.stdout,
            fieldnames=["job_id", "owner_email", "job_name"],
            quoting=csv.QUOTE_MINIMAL,
        )
        writer.writeheader()
        writer.writerows(jobs_with_spark_submit)
    else:
        print("No jobs with Spark Submit tasks found.", file=sys.stderr)


if __name__ == "__main__":
    main()

オプション B: 包括的なスキャン (低速、過去 30 日間の一時的なジョブを含む)

/runs/submitを使用して作成されたエフェメラル ジョブを識別する必要がある場合は、このより包括的なスクリプトを使用します。 このスクリプトは、( /jobs/create を介して作成された) 永続的なジョブと一時ジョブの両方を含む、ワークスペース内の過去 30 日間のすべてのジョブ実行をスキャンします。 このスクリプトは、大規模なワークスペースで実行するのに数時間かかる場合があります。

#!/usr/bin/env python3
"""
Requirements:
    databricks-sdk>=0.20.0

Usage:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com"
    export DATABRICKS_TOKEN="your-token"
    python3 list_spark_submit_runs.py

Output:
    CSV format with columns: Job ID, Run ID, Owner ID/Email, Job/Run Name

Incorrect:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com/?o=12345678910"
"""

import csv
import os
import sys
import time
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import PermissionDenied


def main():
    # Get credentials from environment
    workspace_url = os.environ.get("DATABRICKS_HOST")
    token = os.environ.get("DATABRICKS_TOKEN")

    if not workspace_url or not token:
        print(
            "Error: Set DATABRICKS_HOST and DATABRICKS_TOKEN environment variables",
            file=sys.stderr,
        )
        sys.exit(1)

    # Initialize client
    client = WorkspaceClient(host=workspace_url, token=token)

    thirty_days_ago_ms = int((time.time() - 30 * 24 * 60 * 60) * 1000)

    # Scan workspace for runs with Spark Submit tasks
    # Using list_runs() instead of list() to include ephemeral jobs created via /runs/submit
    print(
        "Scanning workspace for runs with Spark Submit tasks from the last 30 days... (this will take more than an hour in large workspaces)",
        file=sys.stderr,
    )
    runs_with_spark_submit = []
    total_runs = 0
    seen_job_ids = set()

    # Iterate through all runs (pagination is handled automatically by the SDK)
    skipped_runs = 0
    for run in client.jobs.list_runs(
        expand_tasks=True,
        limit=25,
        completed_only=True,
        start_time_from=thirty_days_ago_ms,
    ):
        try:
            total_runs += 1
            if total_runs % 1000 == 0:
                print(f"Scanned {total_runs} runs total", file=sys.stderr)

            # Check if run has any Spark Submit tasks
            if run.tasks:
                has_spark_submit = any(
                    task.spark_submit_task is not None for task in run.tasks
                )

                if has_spark_submit:
                    # Extract job information from the run
                    job_id = run.job_id if run.job_id else "N/A"
                    run_id = run.run_id if run.run_id else "N/A"
                    owner_email = run.creator_user_name or "Unknown"
                    # Use run name if available, otherwise try to construct a name
                    run_name = run.run_name or (
                        f"Run {run_id}" if run_id != "N/A" else "Unnamed Run"
                    )

                    # Track unique job IDs to avoid duplicates for persistent jobs
                    # (ephemeral jobs may have the same job_id across multiple runs)
                    key = (job_id, run_id)
                    if key not in seen_job_ids:
                        seen_job_ids.add(key)
                        runs_with_spark_submit.append(
                            {
                                "job_id": job_id,
                                "run_id": run_id,
                                "owner_email": owner_email,
                                "job_name": run_name,
                            }
                        )
        except PermissionDenied:
            # Skip runs that the user doesn't have permission to access
            skipped_runs += 1
            continue

    # Print summary to stderr
    print(f"Scanned {total_runs} runs total", file=sys.stderr)
    if skipped_runs > 0:
        print(
            f"Skipped {skipped_runs} runs due to insufficient permissions",
            file=sys.stderr,
        )
    print(
        f"Found {len(runs_with_spark_submit)} runs with Spark Submit tasks",
        file=sys.stderr,
    )
    print("", file=sys.stderr)

    # Output CSV to stdout
    if runs_with_spark_submit:
        writer = csv.DictWriter(
            sys.stdout,
            fieldnames=["job_id", "run_id", "owner_email", "job_name"],
            quoting=csv.QUOTE_MINIMAL,
        )
        writer.writeheader()
        writer.writerows(runs_with_spark_submit)
    else:
        print("No runs with Spark Submit tasks found.", file=sys.stderr)


if __name__ == "__main__":
    main()

ヘルプが必要ですか?

追加のヘルプが必要な場合は、アカウント サポートにお問い合わせください。