適用対象:
Azure Data Factory
Azure Synapse Analytics
ヒント
Microsoft Fabric の Data Factory (企業向けのオールインワン分析ソリューション) を試してみてください。 Microsoft Fabric では、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法については、こちらを参照してください。
この記事では、Azure Data Factory および Azure Data Factory Synapse Analytics パイプラインで Copy アクティビティを使用して、MongoDB データベースとの間で双方向にデータをコピーする方法について説明します。 この記事は、コピー アクティビティの概要を示しているコピー アクティビティの概要に関する記事に基づいています。
重要
新しい MongoDB コネクタでは、ネイティブ MongoDB サポートが強化されています。 下位互換性のためにのみ現状のままサポートされているレガシ MongoDB コネクタをソリューションで使用している場合は、MongoDB コネクタ (レガシ)に関するページを参照してください。
サポートされる機能
この MongoDB コネクタでは、次の機能がサポートされます。
| サポートされる機能 | IR |
|---|---|
| Copy アクティビティ (ソース/シンク) | (1) (2) |
① Azure 統合ランタイム ② セルフホステッド統合ランタイム
ソースおよびシンクとしてサポートされているデータ ストアの一覧については、「サポートされているデータ ストア」の表を参照してください。
具体的には、この MongoDB コネクタでは 4.2 までのバージョンがサポートされます。 4.2 より新しいバージョンが必要な作業の場合は、さらに包括的なサポートと機能を提供する MongoDB Atlas と MongoDB Atlas コネクタの使用を検討してください。
前提条件
データ ストアがオンプレミス ネットワーク、Azure 仮想ネットワーク、または Amazon Virtual Private Cloud 内にある場合は、それに接続するようセルフホステッド統合ランタイムを構成する必要があります。
データ ストアがマネージド クラウド データ サービスである場合は、Azure Integration Runtime を使用できます。 ファイアウォール規則で承認されている IP にアクセスが制限されている場合は、Azure Integration Runtime の IP を許可リストに追加できます。
また、Azure Data Factory のマネージド仮想ネットワーク統合ランタイム機能を使用すれば、セルフホステッド統合ランタイムをインストールして構成しなくても、オンプレミス ネットワークにアクセスすることができます。
Data Factory によってサポートされるネットワーク セキュリティ メカニズムやオプションの詳細については、「データ アクセス戦略」を参照してください。
作業の開始
パイプラインでコピー アクティビティを実行するには、次のいずれかのツールまたは SDK を使用します。
- データのコピー ツール
- Azure Portal
- .NET SDK
- Python SDK
- Azure PowerShell
- REST API
- Azure Resource Manager テンプレート
UI を使用して MongoDB のリンク サービスを作成する
次の手順を使用して、Azure portal UI で MongoDB のリンク サービスを作成します。
Azure Data Factory または Synapse ワークスペースの [管理] タブに移動し、[リンク サービス] を選択して、[新規] をクリックします。
- Azureデータファクトリー
- Azure Synapse
MongoDB を検索し、MongoDB コネクタを選択します。
サービスの詳細を構成し、接続をテストして、新しいリンク サービスを作成します。
コネクタの構成の詳細
次のセクションでは、MongoDB コネクタに固有の Data Factory エンティティを定義するために使用されるプロパティについて詳しく説明します。
リンクされたサービスのプロパティ
MongoDB のリンクされたサービスでは、次のプロパティがサポートされます。
| プロパティ | 説明 | 必須 |
|---|---|---|
| 型 | type プロパティは、次のように設定する必要があります: MongoDbV2 | はい |
| connectionString | MongoDB 接続文字列 (例: mongodb://[username:password@]host[:port][/[database][?options]]) を指定します。 詳細については、MongoDB のマニュアルの接続文字列に関するページを参照してください。 接続文字列を Azure Key Vault に格納することもできます。 詳細については、「Azure Key Vault への資格情報の格納」を参照してください。 |
はい |
| データベース | アクセスするデータベースの名前。 | はい |
| connectVia | データ ストアに接続するために使用される統合ランタイム。 詳細については、「前提条件」セクションを参照してください。 指定されていない場合は、既定の Azure Integration Runtime が使用されます。 | いいえ |
例:
{
"name": "MongoDBLinkedService",
"properties": {
"type": "MongoDbV2",
"typeProperties": {
"connectionString": "mongodb://[username:password@]host[:port][/[database][?options]]",
"database": "myDatabase"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
データセットのプロパティ
データセットの定義に使用できるセクションとプロパティの完全な一覧については、「データセットとリンクされたサービス」を参照してください。 MongoDB データセットでは、次のプロパティがサポートされます。
| プロパティ | 説明 | 必須 |
|---|---|---|
| 型 | データセットの type プロパティは、次のように設定する必要があります: MongoDbV2Collection | はい |
| collectionName | MongoDB データベースのコレクション名前。 | はい |
例:
{
"name": "MongoDbDataset",
"properties": {
"type": "MongoDbV2Collection",
"typeProperties": {
"collectionName": "<Collection name>"
},
"schema": [],
"linkedServiceName": {
"referenceName": "<MongoDB linked service name>",
"type": "LinkedServiceReference"
}
}
}
コピー アクティビティのプロパティ
アクティビティの定義に利用できるセクションとプロパティの完全な一覧については、パイプラインに関する記事を参照してください。 このセクションでは、MongoDB ソースとシンクでサポートされるプロパティの一覧を示します。
ソースとしての MongoDB
コピー アクティビティの source セクションでは、次のプロパティがサポートされます。
| プロパティ | 説明 | 必須 |
|---|---|---|
| 型 | コピー アクティビティのソースの type プロパティは、次のように設定する必要があります: MongoDbV2Source | はい |
| フィルタ | クエリ演算子を使用して選択フィルターを指定します。 コレクション内のすべてのドキュメントを返すには、このパラメーターを省略するか、空のドキュメント ({}) を渡します。 | いいえ |
| cursorMethods.project | プロジェクションのドキュメントで返されるフィールドを指定します。 一致するドキュメント内のすべてのフィールドを返すには、このパラメーターを省略します。 | いいえ |
| cursorMethods.sort | 一致するドキュメントがクエリによって返される順序を指定します。 「cursor.sort()」を参照してください。 | いいえ |
| cursorMethods.limit | サーバーが返すドキュメントの最大数を指定します。 「cursor.limit()」を参照してください。 | いいえ |
| cursorMethods.skip | スキップするドキュメントの数と、MongoDB が結果を返すときの開始位置を指定します。 「cursor.skip()」を参照してください。 | いいえ |
| batchSize | MongoDB インスタンスからの応答の各バッチで返されるドキュメントの数を指定します。 ほとんどの場合、バッチ サイズを変更しても、ユーザーまたはアプリケーションへの影響はありません。 Azure Cosmos DB では各バッチのサイズが 40 MB を超過しないように制限されていますが、これはドキュメントが batchSize の数だけ存在するときの合計サイズなので、ドキュメントのサイズが大きくなる場合はこの値を減らしてください。 | いいえ (既定値は 100) |
ヒント
このサービスは、厳格モードでの BSON ドキュメントの利用をサポートしています。 フィルター クエリがシェル モードではなく厳格モードであることを確認してください。 詳細については、MongoDB のマニュアルを参照してください。
例:
"activities":[
{
"name": "CopyFromMongoDB",
"type": "Copy",
"inputs": [
{
"referenceName": "<MongoDB input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "MongoDbV2Source",
"filter": "{datetimeData: {$gte: ISODate(\"2018-12-11T00:00:00.000Z\"),$lt: ISODate(\"2018-12-12T00:00:00.000Z\")}, _id: ObjectId(\"5acd7c3d0000000000000000\") }",
"cursorMethods": {
"project": "{ _id : 1, name : 1, age: 1, datetimeData: 1 }",
"sort": "{ age : 1 }",
"skip": 3,
"limit": 3
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
シンクとしての MongoDB
コピー アクティビティの sink セクションでは、次のプロパティがサポートされます。
| プロパティ | 説明 | 必須 |
|---|---|---|
| 型 | Copy アクティビティのシンクの type プロパティには MongoDbV2Sink を設定する必要があります。 | はい |
| writeBehavior | MongoDB データを書き込む方法について説明します。 使用可能な値は、Insert、Upsert です。 upsert の動作は、同じ _id を持つドキュメントが既に存在する場合、そのドキュメントを置き換えます。それ以外の場合は、ドキュメントを挿入します。注: 元のドキュメントまたは列マッピングで _id が指定されていない場合は、サービスによってドキュメントの _id が自動的に生成されます。 つまり、upsert が期待どおりに動作するには、ドキュメントに ID があることを確認する必要があります。 |
いいえ (既定値は insert です) |
| writeBatchSize | writeBatchSize プロパティにより、各バッチで書き込むドキュメントのサイズが制御されます。 パフォーマンスを向上させるには writeBatchSize の値を大きくしてみて、ドキュメントのサイズが大きい場合は値を小さくしてみます。 | いいえ (既定値は 10,000) |
| writeBatchTimeout | タイムアウトするまでに一括挿入操作の完了を待つ時間です。許容される値は期間です。 | いいえ (既定値は 00:30:00 - 30 分) |
ヒント
JSON ドキュメントをそのままインポートするには、「JSON ドキュメントをインポートまたはエクスポートする」セクションを参照してください。表形式のデータからコピーするには、「スキーマ マッピング」を参照してください。
例
"activities":[
{
"name": "CopyToMongoDB",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Document DB output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "MongoDbV2Sink",
"writeBehavior": "upsert"
}
}
}
]
JSON ドキュメントのインポートとエクスポート
この MongoDB コネクタを使用すると、簡単に次を実行できます。
- 2 つの MongoDB コレクション間でドキュメントをそのままコピーします。
- Azure Cosmos DB、Azure Blob Storage、Azure Data Lake Store、その他のサポートされているファイルベースのストアなど、さまざまなソースから MongoDB に JSON ドキュメントをインポートします。
- JSON ドキュメントを MongoDB コレクションからさまざまなファイル ベースのストアにエクスポートします。
このようなスキーマに依存しないコピーを実現するには、データセットの "構造" ("スキーマ" とも呼ばれる) のセクションと、コピー アクティビティでのスキーマ マッピングをスキップします。
MongoDB のデータ型マッピング
MongoDB からデータをコピーする場合、MongoDB データ型からサービスによって内部的に使用される中間データ型への次のマッピングが使用されます。 コピー アクティビティでソースのスキーマとデータ型がシンクにマッピングされるしくみについては、スキーマとデータ型のマッピングに関する記事を参照してください。
| MongoDB のデータ型 | 中間サービス データ型 |
|---|---|
| 日付 | Int64 |
| オブジェクト識別子 | String |
| Decimal128 | String |
| タイムスタンプ | 最も重要な 32 ビット -> Int64 最下位の 32 ビット -> Int64 |
| String | String |
| Double | String |
| Int32 | Int64 |
| Int64 | Int64 |
| ブール値 | ブール値 |
| Null | Null |
| JavaScript | String |
| 正規表現 | String |
| 最小キー | Int64 |
| 最大キー | Int64 |
| Binary | String |
MongoDB コネクタのライフサイクルとアップグレード
次の表は、MongoDB コネクタのさまざまなバージョンのリリース ステージと変更ログを示しています。
| Version | リリース段階 | 変更ログ |
|---|---|---|
| MongoDB (レガシ) | サポート終了 | / |
| MongoDB | GA バージョンあり | • 同等の MongoDB クエリのみをサポートします。 • Double は文字列データ型として読み取られます。 |
MongoDB リンク サービスをアップグレードする
リンク サービスと関連するクエリをアップグレードするのに役立つ手順を次に示します。
新しい MongoDB リンク サービスを作成し、リンク サービス プロパティを参照してそれを構成します。
古い MongoDB リンク サービスを参照する SQL クエリをパイプラインで使用する場合は、それらを同等の MongoDB クエリに置き換えます。 置き換えの例については、次の表を参照してください。
SQL クエリ (SQL query) 同等の MongoDB クエリ SELECT * FROM usersdb.users.find({})SELECT username, age FROM usersdb.users.find({}, {username: 1, age: 1})SELECT username AS User, age AS Age, statusNumber AS Status, CASE WHEN Status = 0 THEN "Pending" CASE WHEN Status = 1 THEN "Finished" ELSE "Unknown" END AS statusEnum LastUpdatedTime + interval '2' hour AS NewLastUpdatedTime FROM usersdb.users.aggregate([{ $project: { _id: 0, User: "$username", Age: "$age", Status: "$statusNumber", statusEnum: { $switch: { branches: [ { case: { $eq: ["$Status", 0] }, then: "Pending" }, { case: { $eq: ["$Status", 1] }, then: "Finished" } ], default: "Unknown" } }, NewLastUpdatedTime: { $add: ["$LastUpdatedTime", 2 * 60 * 60 * 1000] } } }])SELECT employees.name, departments.name AS department_name FROM employees LEFT JOIN departments ON employees.department_id = departments.id;db.employees.aggregate([ { $lookup: { from: "departments", localField: "department_id", foreignField: "_id", as: "department" } }, { $unwind: "$department" }, { $project: { _id: 0, name: 1, department_name: "$department.name" } } ])
関連コンテンツ
Copy アクティビティでソースおよびシンクとしてサポートされるデータ ストアの一覧については、サポートされるデータ ストアに関するセクションを参照してください。