Python3でAvroの読み書き

概要

週末にPythonでAvroの読み書きをする用事ができたので、やり方を確認した記録。

Python3.5を利用。

@CretedDate 2016/08/12
@Versions python3.5, python3-avro1.8.1

インストール

pipからインストール。Python3の場合はavro-python3を入れる。avroを入れるとSyntax Errroで動かない。

pip install avro-python3

データの保存

今回の用事はHTTPリクエストした結果を保存しておくことだったので、URL、TOP URL(そのサイトの一番上のURL)、ヘッダから取ったencoding、contentsのbyte配列で入れるスキーマを用意する。

{
  "namespace": "jp.mwsoft.example.response",
  "type": "record",
  "name": "page",
  "fields": [
      {"name": "url", "type": "string"},
      {"name": "top_url", "type": "string"},
      {"name": "header_encoding", "type": "string"},
      {"name": "contents", "type": "bytes"}
  ]
}

上記をexample.avscという名前で保存しておく。

import avro.schema, requests
from avro.datafile import DataFileWriter
from avro.io import DatumWriter

url = "http://www.mwsoft.jp/"
resp = requests.get(url)

with open("example.avsc", "rt") as avsc:
    schema = avro.schema.Parse(avsc.read())

with DataFileWriter(open("contents.avro", "wb"), DatumWriter(), schema) as writer:
    writer.append({
        "url": url,
        "top_url": url,
        "header_encoding": requests.utils.get_encoding_from_headers(resp.headers),
        "contents": resp.content
    })

データの読み込み

読み込みの方はschemaを定義する必要がないので、書き込みよりも楽。

from avro.datafile import DataFileReader
from avro.io import DatumReader

with DataFileReader(open("contents.avro", "rb"), DatumReader()) as reader:
    for resp in reader:
        print(resp)

データの追記

Avroは追記も可能。openでwb(binaryで書き込み)なのを、ab+(binaryで追記)にしてappendするだけでいい。

注意点として、追記する際はDataFileWriterにschemaは設定しない。設定するとschema - record - schema - recordみたいなデータができてしまって読み取れなくなる。

import avro.schema, requests
from avro.datafile import DataFileWriter
from avro.io import DatumWriter

top_url = "http://www.mwsoft.jp/"
url = "http://www.mwsoft.jp/programming/"
resp = requests.get(url)

with DataFileWriter(open("contents.avro", "ab+"), DatumWriter()) as writer:
    writer.append({
        "url": url,
        "top_url": top_url,
        "header_encoding": requests.utils.get_encoding_from_headers(resp.headers),
        "contents": resp.content
    })
from avro.datafile import DataFileReader
from avro.io import DatumReader

with DataFileReader(open("contents.avro", "rb"), DatumReader()) as reader:
    for resp in reader:
        print(resp['url'])
            #=> http://www.mwsoft.jp/
            #=> http://www.mwsoft.jp/programming/

圧縮

codecを設定していない場合は、圧縮されていない状態でファイルが出力される。今回作成したファイルをcatで見てみると、生データが読める状態で格納されていることがわかる。

codecを指定しておけば、deflateもしくはsnappyで圧縮して出力できる。

import avro.schema, requests
from avro.datafile import DataFileWriter
from avro.io import DatumWriter

url = "http://www.mwsoft.jp/"
resp = requests.get(url)

with open("example.avsc", "rt") as avsc:
    schema = avro.schema.Parse(avsc.read())

with DataFileWriter(open("contents.avro", "wb"), DatumWriter(), writer_schema=schema, codec='deflate') as writer:
    writer.append({
        "url": url,
        "top_url": url,
        "header_encoding": requests.utils.get_encoding_from_headers(resp.headers),
        "contents": resp.content
    })

これでdeflateで圧縮された状態で保存される。

追記する際も圧縮しないケースと同じで、ab+で追記モードにして実行すればいい。追記の場合はcodecを明記する必要はない。追記対象のファイルのヘッダ部分を読み込んでcodecを自動で設定してくれる処理が付いている。codecは指定しても既存ファイルの内容が優先され、指定は無視される。

url = "http://www.mwsoft.jp/programming/"
resp = requests.get(url)

with DataFileWriter(open("contents.avro", "ab+"), DatumWriter()) as writer:
    writer.append({
        "url": url,
        "top_url": top_url,
        "header_encoding": requests.utils.get_encoding_from_headers(resp.headers),
        "contents": resp.content
    })

圧縮形式についてはファイルのヘッダに記述されているので、読み込む際は特に形式を意識する必要はない。