Notebook のユーティリティ (NotebookUtils) は、Fabric Notebook で一般的なタスクを簡単に実行するのに役立つビルトイン パッケージです。 MSSparkUtils では、ファイル システムの操作、環境変数の取得、ノートブックの連結、シークレットの操作を実行できます。 NotebookUtils パッケージは、PySpark (Python)、Scala、SparkR ノートブック、Fabric パイプラインで利用できます。
Note
- MsSparkUtils の名前が NotebookUtils に正式に変更されています。 既存のコードは 下位互換性 を維持し、重大な変更は発生しません。 継続的なサポートと新機能へのアクセスを確保するために、notebookutils にアップグレード することを強くお勧めします 。 mssparkutils 名前空間は今後廃止される予定です。
- NotebookUtils は、Spark 3.4 (Runtime v1.2) 以降 で動作するように設計されています。 今後、notebookutils 名前空間では、すべての新機能と更新プログラムが排他的にサポートされます。
ファイルシステム ユーティリティ
notebookutils.fs には、Azure Data Lake Storage (ADLS) Gen2 や Azure Blob Storage など、さまざまなファイル システムを操作するためのユーティリティが用意されています。 Azure Data Lake Storage Gen2 および Azure Blob Storage へのアクセスを適切に構成するようにしてください。
次のコマンドを実行して、使用可能なメソッドの概要を取得します。
notebookutils.fs.help()
Output
notebookutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
fastcp(from: String, to: String, recurse: Boolean = true): Boolean -> Copies a file or directory via azcopy, possibly across FileSystems
mv(from: String, to: String, createPath: Boolean = false, overwrite: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use notebookutils.fs.help("methodName") for more info about a method.
NotebookUtils でファイル システムを操作する方法は、Spark API と同じです。 notebookutils.fs.mkdirs() と Fabric lakehouse の使用例を次に示します。
| Usage | HDFS ルートからの相対パス | ABFS ファイル システムの絶対パス | ドライバー ノード内のローカル ファイル システムの絶対パス |
|---|---|---|---|
| 既定以外のレイクハウス | サポートされていません | notebookutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") | notebookutils.fs.mkdirs("file:/<new_dir>") |
| 既定のレイクハウス | 'Files' または 'Tables' の下のディレクトリ: notebookutils.fs.mkdirs("Files/<new_dir>") | notebookutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") | notebookutils.fs.mkdirs("file:/<new_dir>") |
既定のレイクハウスの場合、ファイル パスはお使いのノートブックにマウントされ、既定のファイル キャッシュ タイムアウトは 120 秒です。 これは、ファイルは、レイクハウスから削除された場合でも、ノートブックのローカル一時フォルダーに 120 秒間キャッシュされることを意味します。 タイムアウト規則を変更する場合は、既定の Lakehouse ファイル パスのマウントを解除し、別の fileCacheTimeout 値で再マウントできます。
既定以外の Lakehouse 構成の場合は、Lakehouse パスのマウント中に適切な fileCacheTimeout パラメーターを設定できます。 タイムアウトを 0 に設定すると、最新のファイルがレイクハウス サーバーから確実にフェッチされます。
ファイルを一覧表示する
ディレクトリの内容を一覧表示するには、notebookutils.fs.ls('ディレクトリ パス') を使用します。 例えば次が挙げられます。
notebookutils.fs.ls("Files/tmp") # The relatvie path may work with different base path, details in below
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # The absolute path, like: ABFS file system
notebookutils.fs.ls("file:/tmp") # The full path of the local file system of driver node
notebookutils.fs.ls() API は、ノートブックの種類に応じて、相対パスを使用する場合の動作が異なります。
Spark ノートブック内: 相対パスは、既定のレイクハウスの ABFSS パスを基準とします。 たとえば、
notebookutils.fs.ls("Files")は、既定の Lakehouse のFilesディレクトリをポイントします。例えば次が挙げられます。
notebookutils.fs.ls("Files/sample_datasets/public_holidays.parquet")Python ノートブックの: 相対パスは、ローカル ファイル システムの作業ディレクトリに対する相対パスであり、既定では /home/trusted-service-user/work です。 そのため、既定の Lakehouse の
notebookutils.fs.ls("/lakehouse/default/Files")ディレクトリにアクセスするには、相対パスFilesの代わりに完全パスを使用する必要があります。例えば次が挙げられます。
notebookutils.fs.ls("/lakehouse/default/Files/sample_datasets/public_holidays.parquet")
ファイルのプロパティを表示します
このメソッドでは、ファイルのプロパティ (ファイル名、ファイル パス、ファイル サイズ、ディレクトリとファイルの区別) を返します。
files = notebookutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
新しいディレクトリの作成
このメソッドでは、指定されたディレクトリが存在しない場合に、そのディレクトリを作成し、必要に応じて親ディレクトリを作成します。
notebookutils.fs.mkdirs('new directory name')
notebookutils.fs.mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
notebookutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
ファイルのコピー
このメソッドでは、ファイルまたはディレクトリをコピーし、ファイル システム間のコピー アクティビティをサポートします。 すべてのファイルとディレクトリを再帰的にコピーするように recurse=True を設定します。
notebookutils.fs.cp('source file or directory', 'destination file or directory', recurse=True)
Note
OneLake ショートカットの
パフォーマンスの高いコピー ファイル
この方法では、特に大量のデータを扱う場合に、ファイルのコピーまたは移動をより効率的に行うことができます。 Fabric のパフォーマンスを向上させるには、従来の fastcp メソッドの代わりに cp を使用することをお勧めします。
notebookutils.fs.fastcp('source file or directory', 'destination file or directory', recurse=True)
Considerations:
-
notebookutils.fs.fastcp()では、OneLake 内のファイルのリージョン間のコピーはサポートされていません。 この場合は、代わりにnotebookutils.fs.cp()を使用できます。 - OneLake ショートカットの
制限により、 を使用して S3/GCS 型のショートカットからデータをコピーする必要がある場合は、abfss パスではなくマウントされたパスを使用することをお勧めします。
ファイル コンテンツのプレビュー
このメソッドでは、指定したファイルの最初の 'maxBytes' バイトまでを、UTF-8 でエンコードされた文字列として返します。
notebookutils.fs.head('file path', maxBytes to read)
ファイルの移動
このメソッドでは、ファイルまたはディレクトリを移動し、ファイル システム間の移動をサポートします。
notebookutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
notebookutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.
ファイルを書き込む
このメソッドでは、指定した文字列を UTF-8 でエンコードしてファイルに書き込みます。
notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
ファイルへのコンテンツの追加
このメソッドでは、指定した文字列を UTF-8 でエンコードしてファイルに追加します。
notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Considerations:
-
notebookutils.fs.append()とnotebookutils.fs.put()は、原子性の保証がないため、同じファイルへの同時書き込みはサポートされていません。 -
notebookutils.fs.appendループでforAPI を使って同じファイルに書き込む場合、書き込みの繰り返しの間に、0.5 から 1 秒程度のsleepステートメントを追加することをお勧めします。 この推奨事項は、notebookutils.fs.appendAPI の内部flush操作が非同期であるため、短い遅延がデータの整合性を確保するのに役立ちます。
ファイルまたはディレクトリの削除
このメソッドでは、ファイルまたはディレクトリを削除します。 すべてのファイルとディレクトリを再帰的に削除するように recurse=True を設定します。
notebookutils.fs.rm('file path', recurse=True)
ディレクトリのマウント/マウント解除
詳細な使用方法については、「ファイルのマウントとマウント解除」を参照してください。
ノートブック ユーティリティ
Notebook のユーティリティを使用して、ノートブックを実行したり、値を持つノートブックを終了したりします。 次のコマンドを実行して、使用可能なメソッドの概要を取得します。
notebookutils.notebook.help()
Output:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map, workspace: String): String -> This method runs a notebook and returns its exit value.
runMultiple(DAG: Any): Map[String, MsNotebookRunResult] -> Runs multiple notebooks concurrently with support for dependency relationships.
validateDAG(DAG: Any): Boolean -> This method check if the DAG is correctly defined.
Below methods are only support Fabric Notebook.
create(name: String, description: String = "", content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = ""): Artifact -> Create a new Notebook.
get(name: String, workspaceId: String = ""): Artifact -> Get a Notebook by name or id.
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact -> Update a Artifact by name.
delete(name: String, workspaceId: String = ""): Boolean -> Delete a Notebook by name.
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact] -> List all Notebooks in the workspace.
updateDefinition(name: String, content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = "") -> Update the definition of a Notebook.
Use notebookutils.notebook.help("methodName") for more info about a method.
Note
Notebook ユーティリティは、Apache Spark ジョブ定義 (SJD) には適用されません。
Notebookの参照
このメソッドでは、Notebookを参照し、その終了値を返します。 関数呼び出しの入れ子は、対話形式またはパイプラインで、Notebookで実行できます。 参照されたNotebookは、この関数を呼び出すNotebookの Spark プールで実行されます。
notebookutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)
例えば次が挙げられます。
notebookutils.notebook.run("Sample1", 90, {"input": 20 })
Fabric ノートブックでは、 ワークスペース ID を指定することで、複数のワークスペース間でのノートブックの参照もサポートしています。
notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")
セルの出力で参照の実行のスナップショット リンクを開くことができます。 このスナップショットでは、コードの実行結果をキャプチャし、参照の実行を簡単にデバッグできるようにします。
Considerations:
- ワークスペース間参照Notebookは、ランタイム バージョン 1.2 以降 でサポートされています。
-
[ノートブック リソース] でファイルを使用する場合は、参照先のノートブックの
notebookutils.nbResPathを使用して、対話型実行と同じフォルダーを指していることを確認します。 - 参照実行を使用すると、子ノートブックは、親と同じレイクハウスを使用する場合、親のレイクハウスを継承する場合、またはどちらも定義していない場合にのみ実行できます。 子が親ノートブックとは異なるレイクハウスを指定した場合、実行はブロックされます。 このチェックをバイパスするには、
useRootDefaultLakehouse: True設定します。
複数のNotebook参照を並列で実行する
notebookutils.notebook.runMultiple() メソッドを使用すると、複数のNotebookを並列で、または定義済みのトポロジ構造で実行できます。 この API は、Spark セッション内でマルチスレッド実装メカニズムを使用しています。つまり、参照ノートブックの実行でコンピューティング リソースが共有されます。
notebookutils.notebook.runMultiple() を使用すると、以下のことができます。
各Notebookが完了するのを待たずに、複数のNotebookを同時に実行します。
単純な JSON 形式を使用して、Notebookの依存関係と実行順序を指定します。
Spark コンピューティング リソースの使用を最適化し、Fabric プロジェクトのコストを削減します。
出力内の各Notebook実行レコードのスナップショットを表示し、Notebook タスクを容易にデバッグ/監視します。
各エグゼクティブ アクティビティの終了値を取得し、ダウンストリーム タスクで使用します。
notebookutils.notebook.help("runMultiple") を実行して、例や詳細な使用方法を見つけることもできます。
このメソッドを使用して一連のNotebookを並列で実行する簡単な例を次に示します。
notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])
ルート Notebookからの実行結果は次のとおりです。
notebookutils.notebook.runMultiple()を使用してトポロジ構造を持つノートブックを実行する例を次に示します。 コード エクスペリエンスを使用してNotebookを簡単にオーケストレーションするには、このメソッドを使用します。
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "NotebookSimple", # activity name, must be unique
"path": "NotebookSimple", # notebook path
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
},
{
"name": "NotebookSimple2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200}
},
{
"name": "NotebookSimple2.2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
}
],
"timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
"concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
ルート Notebookからの実行結果は次のとおりです。
DAG が正しく定義されているかどうかを確認するメソッドも提供しています。
notebookutils.notebook.validateDAG(DAG)
Considerations:
- 複数のNotebook実行での並列処理の次数は、Spark セッションの使用可能なコンピューティング リソースの合計に制限されます。
- Spark Notebook の場合、同時実行ノートブックの既定の数は 50 ですが、Python Notebook の場合は既定で 25 です。 この値はカスタマイズできますが、過度の並列処理は、コンピューティング リソースの使用率が高いため、安定性とパフォーマンスの問題につながる可能性があります。 問題が発生した場合は、DAG パラメーターのコンカレンシー フィールドを調整して、ノートブックを複数の
runMultiple呼び出しに分離するか、 コンカレンシー を減らすことを検討してください。 - DAG 全体の既定のタイムアウトは 12 時間で、子ノートブックの各セルの既定のタイムアウトは 90 秒です。 タイムアウトを変更するには、DAG パラメーターで timeoutInSeconds フィールドと timeoutPerCellInSeconds フィールドを設定します。
Notebookを終了する
このメソッドでは、値を指定してNotebookを終了します。 関数呼び出しの入れ子は、対話形式またはパイプラインで、Notebookで実行できます。
ノートブックから exit() 関数を対話形式で呼び出すと、Fabric ノートブックは例外をスローし、後続のセルの実行をスキップし、Spark セッションを維持します。
exit() 関数を呼び出すパイプラインでノートブックのオーケストレーションを行うと、ノートブックのアクティビティは終了値で返ります。 これにより、パイプラインの実行は完了し、Spark セッションは停止します。
参照されているノートブックで exit() 関数を呼び出すと、Fabric Spark は参照先ノートブックのそれ以上の実行を停止し、 run() 関数を呼び出すメイン ノートブック内の次のセルを実行し続けます。 たとえば、Notebook1 には 3 つのセルがあり、2 番目のセルで exit() 関数を呼び出します。 Notebook2 には 5 つのセルがあり、3 番目のセルに run(notebook1) を呼び出します。 Notebook2 を実行すると、 exit() 関数にヒットすると、Notebook1 は 2 番目のセルで停止します。 その後、Notebook2 の 4 番目のセルと 5 番目のセルは引き続き実行されます。
notebookutils.notebook.exit("value string")
Note
exit() 関数は、現在のセル出力を上書きします。 他のコード ステートメントの出力が失われないようにするには、別のセルで notebookutils.notebook.exit() を呼び出します。
例えば次が挙げられます。
次の 2 つのセルを含む Sample1 ノートブック:
セル 1 は、既定値が 10 に設定された 入力 パラメーターを定義します。
セル 2 は、 入力 を終了値として使用してノートブックを終了します。
Sample1 は、既定値を使用して別のノートブックで実行できます。
exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)
Output:
Notebook is executed successfully with exit value 10
別のノートブックで Sample1 を実行し、 入力 値を 20 に設定できます。
exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
Output:
Notebook is executed successfully with exit value 20
ノートブックの成果物を管理する
notebookutils.notebook には、プログラムによってノートブック項目を管理するための専用のユーティリティが用意されています。 これらの API は、ノートブック項目の作成、取得、更新、削除を簡単に行うのに役立ちます。
これらのメソッドを効果的に利用するには、次の使用例を検討してください。
ノートブックの作成
with open("/path/to/notebook.ipynb", "r") as f:
content = f.read()
artifact = notebookutils.notebook.create("artifact_name", "description", "content", "default_lakehouse_name", "default_lakehouse_workspace_id", "optional_workspace_id")
ノートブックの内容の取得
artifact = notebookutils.notebook.get("artifact_name", "optional_workspace_id")
ノートブックの更新
updated_artifact = notebookutils.notebook.update("old_name", "new_name", "optional_description", "optional_workspace_id")
updated_artifact_definition = notebookutils.notebook.updateDefinition("artifact_name", "content", "default_lakehouse_name", "default_Lakehouse_Workspace_name", "optional_workspace_id")
ノートブックの削除
is_deleted = notebookutils.notebook.delete("artifact_name", "optional_workspace_id")
ワークスペース内のノートブックの一覧表示
artifacts_list = notebookutils.notebook.list("optional_workspace_id")
ユーザー データ関数 (UDF) ユーティリティ
notebookutils.udf では、ノートブックのコードとユーザー データ関数 (UDF) を統合するために設計されたユーティリティが提供されています。 これらのユーティリティを使うと、同じワークスペース内または異なるワークスペース間で UDF 項目から関数にアクセスできます。 その後、必要に応じて UDF 項目内で関数を呼び出すことができます。
UDF ユーティリティの使用方法の例を次に示します。
# Get functions from a UDF item
myFunctions = notebookutils.udf.getFunctions('UDFItemName')
# Or from another workspace
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId')
# Display function and item details
display(myFunctions.functionDetails)
display(myFunctions.itemDetails)
# Invoke a function
myFunctions.functionName('value1', 'value2')
# Or with named parameters
myFunctions.functionName(parameter1='value1', parameter2='value2')
UDF から関数を取得する
myFunctions = notebookutils.udf.getFunctions('UDFItemName')
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId')
var myFunctions = notebookutils.udf.getFunctions("UDFItemName")
var myFunctions = notebookutils.udf.getFunctions("UDFItemName", "workspaceId")
myFunctions <- notebookutils.udf.getFunctions("UDFItemName")
myFunctions <- notebookutils.udf.getFunctions("UDFItemName", "workspaceId")
関数を呼び出す
myFunctions.functionName('value1', 'value2'...)
val res = myFunctions.functionName('value1', 'value2'...)
myFunctions$functionName('value1', 'value2'...)
UDF 項目の詳細を表示する
display([myFunctions.itemDetails])
display(Array(myFunctions.itemDetails))
myFunctions$itemDetails()
UDF の関数の詳細を表示する
display(myFunctions.functionDetails)
display(myFunctions.functionDetails)
myFunctions$functionDetails()
資格情報ユーティリティ
認証情報ユーティリティを使用すると、アクセス トークンを取得し、Azure Key Vault のシークレットを管理できます。
次のコマンドを実行して、使用可能なメソッドの概要を取得します。
notebookutils.credentials.help()
Output:
Help on module notebookutils.credentials in notebookutils:
NAME
notebookutils.credentials - Utility for credentials operations in Fabric
FUNCTIONS
getSecret(akvName, secret) -> str
Gets a secret from the given Azure Key Vault.
:param akvName: The name of the Azure Key Vault.
:param secret: The name of the secret.
:return: The secret value.
getToken(audience) -> str
Gets a token for the given audience.
:param audience: The audience for the token.
:return: The token.
help(method_name=None)
Provides help for the notebookutils.credentials module or the specified method.
Examples:
notebookutils.credentials.help()
notebookutils.credentials.help("getToken")
:param method_name: The name of the method to get help with.
DATA
creds = <notebookutils.notebookutils.handlers.CredsHandler.CredsHandler...
FILE
/home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/notebookutils/credentials.py
トークンを取得する
getToken は、特定の対象ユーザーと名前 (省略可能) の Microsoft Entra トークンを返します。 次の一覧に、現在利用可能な対象ユーザー キーを示します。
- ストレージ オーディエンス リソース: "storage"
- Power BI リソース: "pbi"
- Azure Key Vault リソース: "keyvault"
- Synapse RTA KQL DB リソース: "kusto"
トークンを取得するには、以下のコマンドを実行します。
notebookutils.credentials.getToken('audience Key')
Considerations:
'pbi' をオーディエンスとするトークンスコープは、時間の経過と共に変化する可能性があります。 現在、次のスコープがサポートされています。
notebookutils.credentials.getToken("pbi") を呼び出すと、ノートブックがサービス プリンシパルで実行されている場合、返されるトークンのスコープは制限されます。 トークンには、完全な Fabric サービス スコープがありません。 ノートブックがユーザー ID で実行されている場合、トークンには引き続き完全な Fabric サービス スコープがありますが、セキュリティが強化されると変更される可能性があります。 トークンが完全な Fabric サービス スコープを持っていることを確認するには、 notebookutils.credentials.getToken API の代わりに MSAL 認証を使用します。 詳細については、「 Microsoft Entra ID を使用した認証」を参照してください。
サービス プリンシパル ID の下で対象ユーザー キー pbi を使用して notebookutils.credentials.getToken を呼び出すときにトークンが持つスコープの一覧を次に示します。
- Lakehouse.ReadWrite.All
- MLExperiment.ReadWrite.All
- MLModel.ReadWrite.All
- Notebook.ReadWrite.All
- SparkJobDefinition.ReadWrite.All
- Workspace.ReadWrite.All
- Dataset.ReadWrite.All
シークレットを取得する
getSecret は、ユーザー資格情報を使用して、指定した Azure Key Vault のエンドポイントの Azure Key Vault シークレットを返します。
notebookutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')
ファイルのマウントとマウント解除
Fabric では、Microsoft Spark Utilities パッケージの次のマウント シナリオをサポートしています。 マウント、マウント解除、getMountPath()、mounts() API を使用して、リモート ストレージ (ADLS Gen2) をすべての作業ノード (ドライバー ノードとワーカー ノード) にアタッチできます。 ストレージ マウント ポイントを確立した後は、データがローカル ファイル システムに格納されているかのように、ローカル ファイル API を使用してアクセスできます。
ADLS Gen2 アカウントをマウントする方法
次の例では、Azure Data Lake Storage Gen2 をマウントする方法を示します。 Blob Storage のマウントも同様に機能します。
この例では、storegen2 という名前の Data Lake Storage Gen2 アカウントが 1 つあり、ノートブックの Spark セッションで /test にマウントする mycontainer という名前のコンテナーがあることを前提としています。
mycontainer という名前のコンテナーをマウントするには、notebookutils が最初にコンテナーにアクセスするアクセス許可があるかどうかを確認する必要があります。 現在、Fabric では、トリガーマウント操作に対して accountKey と sastoken の 2 つの認証方法がサポートされています。
Shared Access Signature トークンまたはアカウント キーを使ってマウントする
NotebookUtils では、ターゲットをマウントする際に、アカウント キーまたは Shared Access Signature (SAS) トークンをパラメーターとして明示的に渡すことができます。
セキュリティ上の理由から、アカウント キーまたは SAS トークンを Azure Key Vault に保存することをお勧めします (次のスクリーンショットを参照)。 その後、 notebookutils.credentials.getSecret API を使用して取得できます。 Azure Key Vault の詳細については、「Azure Key Vault のマネージド ストレージ アカウント キーについて」をご覧ください。
accountKey メソッドのサンプル コード:
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
sastoken のサンプル コード:
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
マウント パラメーター:
- fileCacheTimeout: BLOB は、既定で 120 秒間ローカルの一時フォルダーにキャッシュされます。 この間、blobfuse は、ファイルが最新であるかどうかを確認しません。 このパラメーターは、既定のタイムアウト時間を変更するように設定できます。 複数のクライアントから同時にファイルの変更操作が行われる場合は、ローカル ファイルとリモート ファイルとの間に不整合が発生するのを避けるために、キャッシュ時間を短縮するか、0 に変更し、常にサーバーから最新のファイルを取得することをお勧めします。
- timeout: マウント操作のタイムアウトは、既定では 120 秒です。 このパラメーターは、既定のタイムアウト時間を変更するように設定できます。 実行者が多すぎる場合や、マウントのタイムアウトが発生する場合は、値を増やすことをお勧めします。
これらのパラメーターの使用例を示します。
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"fileCacheTimeout": 120, "timeout": 120}
)
Note
セキュリティ上の理由から、コードに認証情報を直接埋め込まないことをお勧めします。 資格情報をさらに保護するために、ノートブックの出力に表示されるすべてのシークレットが編集されます。 詳細については、「秘密の編集」を参照してください。
Lakehouseをマウントする方法
レイクハウスを /<mount_name> にマウントするサンプル コード:
notebookutils.fs.mount(
"abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse",
"/<mount_name>"
)
notebookutils fs API を使用してマウント ポイントの下のファイルにアクセスする
マウント操作の主な目的は、ユーザーがローカル ファイル システム API を使って、リモート ストレージ アカウントに格納されているデータにアクセスできるようすることです。 また、 notebookutils fs API とマウントされたパスをパラメーターとして使用して、データにアクセスすることもできます。 このパス形式は少し異なります。
マウント API を使用して、Data Lake Storage Gen2 コンテナー mycontainer を /test にマウントしたとします。 ローカル ファイル システム API を使ってそのデータにアクセスするときのパスの形式は、次のようになります。
/synfs/notebook/{sessionId}/test/{filename}
notebookutils fs API を使用してデータにアクセスする場合は、 getMountPath() を使用して正確なパスを取得することをお勧めします。
path = notebookutils.fs.getMountPath("/test")
ディレクトリを一覧表示します。
notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")ファイルの内容を読み取ります。
notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")ディレクトリを作成します。
notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
ローカル パスを使用してマウント ポイント内のファイルにアクセスする
マウント ポイント内のファイルは、標準のファイル システムを使って簡単に読み書きできます。 Python の例を次に示します。
#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
既存のマウント ポイントをチェックする方法
notebookutils.fs.mounts() API を使用して、既存のすべてのマウント ポイント情報を確認できます。
notebookutils.fs.mounts()
マウント ポイントをマウント解除する方法
マウント ポイントのマウントを解除するには、次のコードを使用します (この例では /test )。
notebookutils.fs.unmount("/test")
既知の制限事項
現在のマウントはジョブ レベルの構成です。マウント ポイントが存在するかどうかを確認するには、 mounts API を使用することをお勧めします。
マウント解除メカニズムは自動で適用されません。 アプリケーションの実行が完了したときに、マウント ポイントをマウント解除してディスク領域を解放するには、コードでマウント解除 API を明示的に呼び出す必要があります。 そうしないと、マウント ポイントは、アプリケーションの実行が完了した後もノードに存在します。
ADLS Gen1 ストレージ アカウントのマウントはサポートされていません。
Lakehouse ユーティリティ
notebookutils.lakehouse は、レイクハウス項目の管理用に調整されたユーティリティを提供します。 これらのユーティリティを使用すると、ユーザーはレイクハウス アーティファクトを簡単に作成、取得、更新、削除できます。
メソッドの概要
notebookutils.lakehouseで提供される使用可能なメソッドの概要を次に示します。
# Create a new Lakehouse artifact
create(name: String, description: String = "", definition: ItemDefinition = null, workspaceId: String = ""): Artifact
# Create Lakehouse with Schema Support
create(name: String, description: String = "", definition: {"enableSchemas": True}): Artifact
# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact
# Get a Lakehouse artifact with properties
getWithProperties(name: String, workspaceId: String = ""): Artifact
# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact
# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean
# List all Lakehouse artifacts
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact]
# List all tables in a Lakehouse artifact
listTables(lakehouse: String, workspaceId: String = "", maxResults: Int = 1000): Array[Table]
# Starts a load table operation in a Lakehouse artifact
loadTable(loadOption: collection.Map[String, Any], table: String, lakehouse: String, workspaceId: String = ""): Array[Table]
使用例
これらのメソッドを効果的に利用するには、次の使用例を検討してください。
レイクハウスの作成
artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")
# Create Lakehouse with Schema Support
artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", {"enableSchemas": True})
レイクハウスの取得
artifact = notebookutils.lakehouse.get("artifact_name", "optional_workspace_id")
artifact = notebookutils.lakehouse.getWithProperties("artifact_name", "optional_workspace_id")
レイクハウスの更新
updated_artifact = notebookutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")
レイクハウスの削除
is_deleted = notebookutils.lakehouse.delete("artifact_name", "optional_workspace_id")
ワークスペース内のレイクハウスの一覧表示
artifacts_list = notebookutils.lakehouse.list("optional_workspace_id")
レイクハウス内のすべてのテーブルの一覧表示
artifacts_tables_list = notebookutils.lakehouse.listTables("artifact_name", "optional_workspace_id")
レイクハウスでのテーブル読み込み操作の開始
notebookutils.lakehouse.loadTable(
{
"relativePath": "Files/myFile.csv",
"pathType": "File",
"mode": "Overwrite",
"recursive": False,
"formatOptions": {
"format": "Csv",
"header": True,
"delimiter": ","
}
}, "table_name", "artifact_name", "optional_workspace_id")
追加情報
各メソッドとそのパラメーターの詳細については、notebookutils.lakehouse.help("methodName") 関数を利用します。
ランタイム ユーティリティ
セッション コンテキスト情報を表示する
notebookutils.runtime.context を使用して、現在のライブ セッションのコンテキスト情報 (Notebook名、デフォルトのLakehouse、ワークスペース情報、パイプライン実行であるかどうかなど) を取得できます。
notebookutils.runtime.context
プロパティの概要を次の表に示します。
| Parameter | Explanation |
|---|---|
currentNotebookName |
現在のノートブックの名前 |
currentNotebookId |
現在のノートブックの一意の ID |
currentWorkspaceName |
現在のワークスペースの名前 |
currentWorkspaceId |
現在のワークスペースの ID |
defaultLakehouseName |
既定のレイクハウスの表示名 (定義されている場合) |
defaultLakehouseId |
既定のレイクハウスの ID (定義されている場合) |
defaultLakehouseWorkspaceName |
既定の lakehouse のワークスペース名 (定義されている場合) |
defaultLakehouseWorkspaceId |
既定の lakehouse のワークスペース ID (定義されている場合) |
currentRunId |
参照実行においては、現在の実行ID |
parentRunId |
入れ子になった実行を含む参照実行では、この ID は親実行 ID です |
rootRunId |
入れ子になった実行を含む参照実行では、この ID はルート実行 ID です |
isForPipeline |
実行がパイプライン用かどうか |
isReferenceRun |
現在の実行が参照実行であるかどうか |
referenceTreePath |
ネスト化した参照ランのツリー構造は、監視 L2 ページのスナップショット階層にのみ使用されます。 |
rootNotebookId |
(参照実行のみ)ルートノートブックの参照実行ID。 |
rootNotebookName |
(参照実行のみ)参照実行のルート ノートブックの名前。 |
rootWorkspaceId |
(参照実行の場合のみ) ルートノートブックのワークスペースID。 |
rootWorkspaceName |
(参照実行のみ)参照実行のルート ノートブックのワークスペース名。 |
activityId |
現在のアクティビティの Livy ジョブ ID |
hcRepId |
高コンカレンシー モードの REPL ID |
clusterId |
Synapse Spark クラスターの ID |
poolName |
使用されている Spark プールの名前 |
environmentId |
ジョブが実行されている環境 ID |
environmentWorkspaceId |
環境のワークスペース ID |
userId |
現在のユーザーのユーザー ID |
userName |
現在のユーザーのユーザー名 |
セッション管理
対話型セッションの停止
停止ボタンを手動でクリックする代わりに、コードで API を呼び出して対話型セッションを停止する方が便利な場合があります。 このような場合に備えて、コードによる対話型セッションの停止をサポートする API notebookutils.session.stop() を提供しています。これは、Scala と PySpark で使用できます。
notebookutils.session.stop()
notebookutils.session.stop() API は、バックグラウンドで現在の対話型セッションを非同期的に停止します。 また、Spark セッションを停止し、セッションによって占有されているリソースを解放するため、同じプール内の他のセッションで使用できます。
Python インタープリターを再起動する
notebookutils.session ユーティリティを使用すると、Python インタープリターを再起動できます。
notebookutils.session.restartPython()
Considerations:
- ノートブック参照の実行ケースでは、
restartPython()は、参照されている現在のノートブックの Python インタープリターのみを再起動します。 - まれに、Spark リフレクション メカニズムが原因でコマンドが失敗することがあります。再試行を追加すると、問題を軽減できます。
変数ライブラリ ユーティリティ
Note
ノートブックの "変数ライブラリ ユーティリティ" はプレビュー段階です。
変数ライブラリを使用すると、ノートブック コードで値をハードコーディングすることを回避できます。 コードを変更する代わりに、ライブラリ内の値を更新できます。 ノートブックは変数ライブラリを参照してそれらの値を取得します。 この方法では、一元管理されたライブラリを利用することで、チームやプロジェクト間でのコードの再利用が簡素化されます。
次のコマンドを実行して、使用可能なメソッドの概要を取得します。
notebookutils.variableLibrary.help()
Output
[Preview] notebookutils.variableLibrary is a utility to Variable Library.
Below is overview about the available methods:
get(variableReference: String): String
-> Run the variable value with type.
getLibrary(variableLibraryName: String): VariableLibrary
-> Get the variable library.
Use notebookutils.variableLibrary.help("methodName") for more info about a method.
変数ライブラリで変数を定義する
notebookutils.variableLibraryを使用する前に、最初に変数を定義します。
Notebook から変数ライブラリを取得する
samplevl = notebookutils.variableLibrary.getLibrary("sampleVL")
samplevl.test_int
samplevl.test_str
val samplevl = notebookutils.variableLibrary.getLibrary("sampleVL")
samplevl.test_int
samplevl.test_str
samplevl <- notebookutils.variableLibrary.getLibrary("sampleVL")
samplevl.test_int
samplevl.test_str
変数を動的に使用する例。
samplevl = notebookutils.variableLibrary.getLibrary("sampleVL")
file_path = f"abfss://{samplevl.Workspace_name}@onelake.dfs.fabric.microsoft.com/{samplevl.Lakehouse_name}.Lakehouse/Files/<FileName>.csv"
df = spark.read.format("csv").option("header","true").load(file_path)
display(df)
参照で 1 つの変数にアクセスする
notebookutils.variableLibrary.get("$(/**/samplevl/test_int)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_str)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_bool)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_int)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_str)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_bool)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_int)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_str)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_bool)")
Note
-
notebookutils.variableLibraryAPI では、同じワークスペース内の変数ライブラリへのアクセスのみがサポートされます。 - 参照実行中の子ノートブックでは、ワークスペース間での変数ライブラリの取得はサポートされていません。
- ノートブック コードは、変数ライブラリのアクティブな値セットで定義されている変数を参照します。
既知の問題
1.2 より前のランタイム バージョンを使用し、
notebookutils.help()を実行する場合、一覧に示されている fabricClientPBIClient API は現在サポートされていません。今後使用できるようになります。 さらに、今のところ Credentials API は Scala ノートブックでサポートされていません。セッション管理に notebookutils.session ユーティリティを使用する場合、Python ノートブックは stop、 restartPython API をサポートしていません。
現在、SPN は変数ライブラリ ユーティリティではサポートされていません。