Apache Avro はデータ シリアル化システムです。 Avro には次の機能があります。
- 豊富なデータ構造。
- コンパクトで高速なバイナリ データ形式。
- 永続データを格納するコンテナー ファイル。
- リモート プロシージャ呼び出し (RPC)。
- 動的言語との簡単な統合。 データ ファイルの読み取りまたは書き込みや、RPC プロトコルの使用や実装には、コード生成は必要ありません。 オプションの最適化としてのコード生成。静的型指定の言語にのみ実装する価値があります。
Avro データ ソースでは、次の機能がサポートされます。
- スキーマ変換: Apache Spark SQL と Avro レコード間の自動変換。
- パーティション分割: 追加の構成を行わない、パーティション分割されたデータの簡単な読み書き。
- 圧縮: Avro をディスクに書き込むときに使用する圧縮。 サポートされる種類は
uncompressed、snappy、deflateです。 圧縮レベルを指定することもできます。 - レコード名:
recordNameおよびrecordNamespaceとともにパラメーターのマップを渡すことによる、レコード名と名前空間。
「ストリーミング Avro データの読み取りと書き込み」も参照してください。
構成
Avro データ ソースの動作は、さまざまな構成パラメーターを使用して変更できます。
読み取り時に .avro 拡張子のないファイルを無視するには、Hadoop 構成で avro.mapred.ignore.inputs.without.extension パラメーターを設定します。 既定では、 falseです。
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
書き込み時に圧縮を構成するには、次の Spark プロパティを設定します。
- 圧縮コーデック:
spark.sql.avro.compression.codec。 サポートされているコーデックはsnappyとdeflateです。 既定のコーデックはsnappyです。 - 圧縮コーデックが
deflateの場合は、spark.sql.avro.deflate.levelで圧縮レベルを設定できます。 既定のレベルは-1です。
これらのプロパティは、クラスター Spark 構成で、または実行時に、spark.conf.set() を使用して設定できます。 次に例を示します。
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Databricks Runtime 9.1 LTS 以降では、ファイルを読み取る際に mergeSchema オプションを指定することで、Avro での既定のスキーマ推論動作を変更できます。
mergeSchema を true に設定すると、1 つのファイルから読み取りスキーマを推論するのではなく、ターゲット ディレクトリ内の一連の Avro ファイルからスキーマが推論され、マージされます。
Avro -> Spark SQL 変換でサポートされる型
このライブラリではすべての Avro 型の読み取りをサポートしています。 Avro 型と Spark SQL 型の次のマッピングが使用されます。
| Avro 型 | Spark SQL 型 |
|---|---|
| Boolean | BooleanType |
| 整数 (int) | インテジャータイプ |
| 長い | ロングタイプ (LongType) |
| フロート | フロート型 |
| ダブル | DoubleType |
| バイト | バイナリタイプ |
| ひも | 文字列型 |
| レコード (record) | StructType |
| イーナム | 文字列型 |
| 配列 | アレイタイプ |
| 地図 | マップタイプ |
| 修正済み | バイナリタイプ |
| 組合 | 「共用体型」を参照してください。 |
共用体型
Avro データ ソースでは、union 型の読み取りがサポートされています。 Avro では、次の 3 つの型が union 型と見なされます。
-
union(int, long)はLongTypeにマップされます。 -
union(float, double)はDoubleTypeにマップされます。 -
union(something, null)。ただし、somethingはサポートされている Avro 型です。 これは、somethingがnullableに設定されたtrueと同じ Spark SQL 型にマップされます。
その他の union 型はすべて複合型です。 これらは、StructType のメンバーに従って、フィールド名が member0、member1 などの union にマップされます。 これは、Avro と Parquet の間で変換する場合の動作と一致します。
論理型
Avro データ ソースでは、次の Avro 論理型の読み取りがサポートされています。
| Avro 論理型 | Avro 型 | Spark SQL 型 |
|---|---|---|
| 日付 | 整数 (int) | デートタイプ |
| timestamp-millは | 長い | タイムスタンプタイプ |
| タイムスタンプ-マイクロ | 長い | タイムスタンプタイプ |
| 小数点 | 修正済み | デシマルタイプ |
| 小数点 | バイト | デシマルタイプ |
注意
Avro データ ソースは、Avro ファイルに存在するドキュメント、エイリアス、その他のプロパティを無視します。
Spark SQL -> Avro 変換でサポートされる型
このライブラリでは、すべての Spark SQL 型の Avro への書き込みをサポートしています。 ほとんどの型で、Spark 型から Avro 型へのマッピングは簡単です (たとえば、IntegerType は int に変換されます)。いくつかの特殊な場合の一覧を次に示します。
| Spark SQL 型 | Avro 型 | Avro 論理型 |
|---|---|---|
| バイトタイプ | 整数 (int) | |
| ShortType | 整数 (int) | |
| バイナリタイプ | バイト | |
| デシマルタイプ | 修正済み | 小数点 |
| タイムスタンプタイプ | 長い | タイムスタンプ-マイクロ |
| デートタイプ | 整数 (int) | 日付 |
また、オプション avroSchema で出力 Avro スキーマ全体を指定し、Spark SQL 型を他の Avro 型に変換することもできます。
次の変換は既定では適用されず、ユーザー指定の Avro スキーマが必要になります。
| Spark SQL 型 | Avro 型 | Avro 論理型 |
|---|---|---|
| バイトタイプ | 修正済み | |
| 文字列型 | イーナム | |
| デシマルタイプ | バイト | 小数点 |
| タイムスタンプタイプ | 長い | timestamp-millは |
例
これらの例では、episodes.avro ファイルを使用します。
スカラ (プログラミング言語)
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
この例では、カスタム Avro スキーマを示します。
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
この例では、Avro 圧縮オプションを示します。
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
この例では、パーティション分割された Avro レコードを示します。
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
この例では、レコード名と名前空間を示します。
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python(プログラミング言語)
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
SQL 内の Avro データをクエリするには、データ ファイルをテーブルまたは一時ビューとして登録します。
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
ノートブックの例: Avro ファイルの読みl込みと書き込み
次のノートブックは、Avro ファイルを読み書きする方法を示しています。