amagaeru blog

Sansan DSOC 分析コンペ勉強会参加メモ

connpas リンク

https://sansan.connpass.com/event/202716/

発表

脱! Deepでポン🎶 ハイパラチューニング芸人を卒業するために

Hidehisa Araiさん @kaggle_araisan

speakerdeck.com

  • 対象視聴者(初級〜)
    • 誰もが思いつくことを越えるためには?
    • 解法の背景(なぜやったのか)を掴むためには?
  • 面白くなるステップ
    • 課題の難しいところ理解する
      • train/testの分布の違い(データのノイズ量、クラスの比率)
      • testにはtrainになかったラベルがある
      • CV/LBが相関しない
    • 課題を小さな課題に分解する
      • 対策可能になるまで小さくする
    • 小さな課題を置換する
  • まとめ
    • 出てきた課題を潰していく->いつの間にか他の人がやっていないことに至る
    • 個々の課題を潰した人が上位にいく

質問

  • 英語が苦手な場合の取り組み方
  • 抽出された課題の管理
    • Trelloを活用
    • タグをつける(Todo、doing、worked、not worked、pending)
  • やることが見つからなかった場合
    • 今回の話がそれに当たるはず
    • (もっとやっていた場合)コンペをやる中で得た知識の元に改めて課題を見直す
  • 既存の分野をそもそも知らない場合

memo

  • Zenhubでコンペチケット管理していたけど、「解像度が良くない」話は参考になった
  • not workedをIceBoxに入れてしまっていたので、後から見てやったかどうかわからなくなったのを思い出した

その他参考

qiita.com

atmaCup#6 を開催したときの体制や流れについて

黒木裕鷹さん @kur0cky / Sansan株式会社 DSOC 研究員

speakerdeck.com

前提の話
実務よりの裏側
  • 携わった部署
    • R&Dと人事部が表だってatmaとコミュニケーション
    • 法務部(情報公開のチェックや契約関連)
    • 広報
  • 企画からふりかえり
    • 企画から当日まで半年近い期間かかってる
    • 企画
      • サマーインターンの見通しが不明瞭
      • オンラインイベントでもコンペ開催したい(オフは経験あり)
    • 選定の基準
      • 媒体の選択(参加人数、費用、秘密保持契約が結べるか、などなど)
      • 強いDS人材。広く技術者がターゲット -> SNSでも盛り上がれる!
    • タスク選定
      • R&D数名から複数案だし(問題が出せないことをまず避ける)
      • Sansanらしいデータを使いたい(名刺)
      • 仮タスク設計で実際に解いてみる
      • 企業が特定されないようにマスクの工夫
      • データ正規化(human computationのエキスパートが活躍)
    • atmaさんとのやりとり
      • atam、sansanでお互いにタスクを解いてみてleak確認
    • 周知・集客
      • connpas
      • 抽選(強い人が漏れても、広く参加者を取った)
    • コンペ期間中
      • Twitter実況で盛り上げ
      • 複数賞用意
    • ふりかえり
      • KPT実地
      • P:データのハンドリングが難しかったかも?
      • T:データ量調整で短期間化が望めるかも?

質問

  • シェイクやleakを回避するために意識したこと
    • 過去にあったleak事例を調査
    • データが生まれる背景を考えて漏れを調べた
    • 自分が実際に解いてみたり、強い人に解いてもらう
  • 今後のコンペ開催はあるか
    • 黒木さんは開催したい!
  • コンペが採用に繋がったか
    • 実際に繋がった例がある
  • コンペ参加者数と強く相関するのは賞金
    • kaggleは強い傾向がある
    • 各媒体の参加人数は大きくブレないかも
  • コンペ開催で大変だったこと
    • このタスクが行けるかどうか
    • 情報ごとに出して良いかの判断
  • よかったこと
    • コンペとして成立していること(leakなさそう)
    • 盛り上がっている様は嬉しい

memo

  • 開催側が楽しんでいる様子がビシビシ伝わってくる
  • leak問題はどこのコンペサイトでも問題になっているので開催側複数でコンペ設定失敗事例を共有する会面白そう

その他参考

Colaboratoryで分析コンペをする時のテクニック集

カレーちゃんさん @currypurin

speakerdeck.com

  • colabの概要
  • テクニック
    • colabかkaggle notebookかを判定可能(同じコードで動かせる)
    • notebook名を取得できる
    • 学習速度比較(content直下の方がやや速いがセッション切断時にリセット)
    • content直下にデータを置くのにkaggle apiを使おう
    • githubでcolab管理できる

質問

  • コンペでの計算機資源配分
    • colab 5: その他(GCP/AWS): 5
  • RAM不足回避するには
    • バッチサイズを下げる
  • colab proと同じスペックのGCPならどっち使うか
    • V100使ってたら1weekで5万くらいいった
    • colab proの方がお得

memo

  • githubと連携してコード履歴残すの良さそう
  • colab proが待ち遠しい

その他参考

recruit.gmo.jp


Noisy label 対抗記 ~Cassava Leaf Disease Classification~

齋藤慎一朗さん @sinchir0 / Sansan株式会社 DSOC 研究員

speakerdeck.com

  • コンペ概要
    • cassavaの画像から病気判別
    • 評価指標は Accuracy
    • 重要ポイント
      • 画像のラベリングミスが多い(Noisy label)
      • PublicLBの評価もNoisy
      • LB上でスコアに大きな差がなかった
  • なぜ取り組んだか
    • 画像に取り組みたい
    • testもNoisy対策できれば良いのでは?
  • どう対抗するか
    • 学習データにfitさせすぎない
      • 多くのモデルでアンサンブル
      • noisyを考慮できるlossを使う
      • noisy labelのデータを処理する
    • testもnoisyなデータに対抗
      • cleanlabの考え方を応用
      • 誤り分布を利用
  • 反省点
    • discussionやnotebookを追うのに時間を使ってしまった
    • CVの計算など一部適当になった

質問

  • ノイズの乗り方はtrain/testで同じと仮定するか
    • Yes
  • シェイクするモデルとシェイクしないモデルはどういう違いがあるか
    • アンサンブルをたくさんしていると生き残りやすい?
    • testにしか存在しないラベルを考慮できたか
  • noisyラベルの先行研究でもtestのノイズがあるとしていたか
    • trainがノイジーでtestがクリーンが多い(というかそれ以外みたことない)

memo

  • 「testもnoisy」は良くわからない..
  • CV計算が適当になった、は身につまされる話

その他参考

github.com

ai.googleblog.com

atmaCupなど短期間コンペのススメと戦い方について

paoさん @pppaaaooo

speakerdeck.com

・kaggleのコンペ期間は2~3ヶ月が大半。長くない?

  • コンペの長さによる比較
    • 長い
      • 新しいことに時間をかけられる/失敗をカバー可能
      • 外れた時のダメージ
    • 短い
      • 効率的に学びを得られる/限られた時間で精度向上を学べる
      • 新しいチャレンジの難易度が高い
    • おすすめは両方に参加
  • 精度の分解
    • 基本的手法やデータ・タスクに応じたアイデアの積み重ねで成立
    • 精度の最終到達点は期間によって違う
    • 長期間コンペは全部のアイデアを組み込まないと勝てない
    • 短期間コンペは重要なアイデアを押さえておけば勝ちに近づく
  • 限られた時間で何をすべきか
    • 思いつくアイデアからやることを効果的に絞る
    • 効果 x 実装時間 x 成功確率で決める
  • どこで実力に差が出るか
    • どんな技が使えるか(経験やinput量、サーベイEDAで技を増やせる)
    • どの技を選ぶか(時間が短いほど有効、相性大事)
    • 技の消費MP(実装時間を減らす、スニペットやパイプラインにしておく)
    • 命中確率や効果をいかに正しく見積もるか(仮説根拠、使用経験、Feature Importance、Discussionなどなど)

質問

  • 成功した際の効果の推定は当たるか
    • 比較的当たる
    • データの質やタスクを分解することで当たりやすくなる
  • 効果が高そうな技(手法)を覚える方法
    • 一個ずつやっていくしかない
    • コンペ参加はおすすめ
    • 解法だけだと身に付きづらい

memo

  • 思いついたものを思いついた順にやっていた(脳死でした) -> zenhubのestimate使えるか?
  • 技マシン欲しい
  • 技マシン欲しい

所感

  • 分析コンペの取り組み方の話が多く、参考になった。
  • しかも、短期間コンペのatmaCupが今週末(3/5)から始まるのですぐに試せるのでは!?
  • そもそもsolafuneとprobspace放置しているのどうにかしろ

YoutubeDataAPIから取得したデータをBigQueryに入れるまでの備忘録

はじめに

昨年末に勉強として作ったデータパイプラインについての備忘録。

目次

構想

データ取得元

  • YoutubeDataAPI v3
    • タイトルや説明文などのテキストデータ、視聴回数やレーティングなどの数値データど色々ある
    • 自分の視聴履歴も取得して推薦したい(vtuberの切り抜き動画の見逃しを減らす)

使用クラウド

  • GCP
    • YoutubeDataAPIがそもそもGCP管轄
    • 今回使っていないがCloud Composerに少し触れたことがあった

やりたいこと

vtuberの切り抜き動画情報の取得と蓄積

  • 一日前に投稿された動画の現時点での情報を一回取得する -> Cloud Scheduler
  • サーバーを立てて動かしておくほどの継続的、もしくは複数回の処理ではないのでイベント駆動で実行 -> Cloud Functions
  • 取得時のデータはそのまま残しておきたい -> Cloud Storage -Cloud SchedulerとCloud Functionsを繋げる方法にhttpリクエストとPub/Subを使うやり方があるが、情報の多いPub/Subでやろう -> Pub/Sub
  • データをBigQueryに入れるまでやってみたい -> BigQuery
  • Cloud Storageにファイルが出現したらBigQueryにデータを追加する操作もCloud Functionsでやろう

使用サービスについての概要

YoutubeDataAPI v3

Cloud Scheduler

Pub/Sub

Cloud Functions

Cloud Storage

BigQuery

実際に作ったアーキテクチャ

f:id:amagaeru1113:20210122151222j:plain

  1. Cloud Schedulerで毎日0時に発火
  2. Pub/Subを通してCloud Functions(動画情報取得)を実行
  3. YoutubeDataAPIから情報諸々取得してCloud StorageにJSONを出力
  4. Cloud Storageに新規ファイルが出現したことをトリガーにCloud Functions(BigQuery格納)を実行

分かったこと

サービス間の接続部分は要事前調査

  • 最初csv形式で出力していたが、カンマのせいでカラム数が合わなかったりする
  • BQに挿入できるファイル形式の確認(jsonもいける)
  • jsonで入出力できるかをサービス間で確認すると見通しが良い(GCF→GCS、GCF→BQ)
  • ローカルからコマンドでテストファイルをアップロードしてそれが流れるかを確認したりした

コード実行時に想定している環境が異なる場合がある

  • ファイル名はfile_{実行時刻}.jsonとしており、GCFを即時実行をして日本時間か確認したところ、出来ていなかった
  • ログを確認したところ、即時実行の際使用されるresource.regionがus-centralだったためと判明
  • コード中で現在時刻を使うことはなかったが場合によっては注意

取得した動画情報には検索キーワードに関連しないものが取得されることがある

  • 何回か試してデータをプールして、関連のない動画が含まれないかを検証することが大切
  • ただし動画情報の取得にかかるコストが一日の上限を超えてしまい、取得できなくなることが何回かあった
  • これを許容してBQでの確認も場合によっては仕方ないかもしれない
  • 関連のない動画の判定には検索キーワードがタイトル、説明文、タグのどれにも含まれていないことを利用して除外

今後やりたいこと

また機会を見て次のことを試したい

  • 自分のYoutube視聴履歴を取得
  • 視聴傾向から、おすすめ動画を推薦
  • 特定動画における視聴回数の時間推移

おわりに

扱うデータは小さいが、APIからデータ収集する流れを通しでやる経験になりました。
特にファイル形式の問題や実行時のtimezoneの問題・BigQueryに挿入するテーブルの想定など、考えなければいけないことが多かったです。
見返してみて、自分しか使わないということもあって諸々詰めが甘いですが、本来はステークホルダーと共にテーブル一つについても共有理解を深めることが重要かと思いました。

また、Cloud Functionsを使いましたが実行時間やメモリの制限もあるため実際には他の選択肢も十分に見る必要があります。 例えば、サーバーレスのまま融通のきくCloud Runを使うとか、定期実行やデータ加工まで一箇所にまとめられるCloud Composerを使うとか...
それぞれ料金体系についての知識が必要ですがコスト換算して何が見合うかを考えるとよりサービス利用に関する理解が深まりそうです。 ちなみにCloud Functionsでは呼び出し回数や実行時間で料金発生し、Cloud Composerでは実行環境やネットワーク使用量に基づき分単位で料金発生するそうです。

参考

YoutubeDataAPI関連

GCP関連

エラー関連

付録

Cloud Functionsで使用しているrequirements.txtとmain.pyベタハリ

collect-videos-function

requirements.txt

google-api-python-client
google-cloud-storage
oauth2client

main.py

import os
import sys
import json

from random import random
from datetime import datetime, timedelta

from google.cloud import storage
from apiclient.discovery import build


def search_videos(query, api_key, max_pages=10, maxResults=50):
    """APIからデータ取得
    """
    today = datetime.today()
    publishedBefore = datetime.strftime(today, '%Y-%m-%d') + 'T00:00:00Z'
    yesterday = today - timedelta(days=1)
    publishedAfter = datetime.strftime(yesterday, '%Y-%m-%d') + 'T00:00:00Z'

    youtube = build('youtube', 'v3', developerKey=api_key, cache_discovery=False)

    search_request = youtube.search().list(
        part='id',
        q=query,
        publishedAfter=publishedAfter,
        publishedBefore=publishedBefore,
        type='video',
        maxResults=maxResults,
    )

    i = 0
    while search_request and i < max_pages:
        search_response = search_request.execute()
        video_ids = [item['id']['videoId'] for item in search_response['items']]

        videos_response = youtube.videos().list(
            part='snippet,statistics',
            id=','.join(video_ids)
        ).execute()

        yield videos_response['items']

        search_request = youtube.search().list_next(search_request, search_response)
        i += 1


def _check_value(video_info, key1, key2):
    """情報が取得できない場合Noneを代入
    """
    return video_info[key1][key2] if video_info[key1].get(key2) else None


def _make_one_dict(video_info):
    """取得情報を辞書に変換
    """
    tmp_dict = {}

    tmp_dict['video_id'] = video_info['id']
    tmp_dict['video_url'] = "http://youtube.com/watch?v=" + video_info['id']
    tmp_dict['etag'] = video_info['etag']
    tmp_dict['kind'] = video_info['kind']

    tstr = video_info['snippet']['publishedAt'].replace('T', ' ').replace('Z', '')
    tmp_dict['publishedAt'] = tstr

    tmp_dict['channelId'] = _check_value(video_info, 'snippet', 'channelId')
    tmp_dict['title'] = _check_value(video_info, 'snippet', 'title')
    tmp_dict['description'] = _check_value(video_info, 'snippet', 'description')
    tmp_dict['channelTitle'] = _check_value(video_info, 'snippet', 'channelTitle')
    tmp_list = video_info['snippet']['tags'] if video_info['snippet'].get('tags') else []
    tmp_dict['tags'] = '|'.join(tmp_list)
    tmp_dict['categoryId'] = _check_value(video_info, 'snippet', 'categoryId')
    tmp_dict['defaultAudioLanguage'] = _check_value(video_info, 'snippet', 'defaultAudioLanguage')

    tmp_dict['commentCount'] = _check_value(video_info, 'statistics', 'commentCount')
    tmp_dict['favoriteCount'] = _check_value(video_info, 'statistics', 'favoriteCount')
    tmp_dict['likeCount'] = _check_value(video_info, 'statistics', 'likeCount')
    tmp_dict['viewCount'] = _check_value(video_info, 'statistics', 'viewCount')

    return tmp_dict


def _select_one_that_includes_keyword(KEYWORD, video_infos_dict):
    """検索キーワードを含まないデータを除外
    """
    def _check_keyword(keyword, string):
        """検索キーワードが含まれるか判定
        """
        return True if keyword in string else False

    ret = []
    for i in range(len(video_infos_dict)):
        title = video_infos_dict[i]['title'] if video_infos_dict[i]['title'] else ''
        desc = video_infos_dict[i]['description'] if video_infos_dict[i]['description'] else ''
        tags = video_infos_dict[i]['tags'] if video_infos_dict[i]['tags'] else ''
        checks = [
            _check_keyword(KEYWORD, title),
            _check_keyword(KEYWORD, desc),
            _check_keyword(KEYWORD, tags)
        ]

        if KEYWORD in title or KEYWORD in desc or KEYWORD in tags:
            ret.append(video_infos_dict[i])
        else:
            continue

    return ret


def main(data, context):

    YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY')
    QUERY = 'にじさんじ, 切り抜き'
    KEYWORD = 'にじさんじ'
    MAX_PAGES = 10
    MAX_RESULTS = 50

    video_infos, count = {}, 0
    for items_per_page in search_videos(QUERY, YOUTUBE_API_KEY, MAX_PAGES, MAX_RESULTS):
        for item in items_per_page:
            video_infos[str(count)] = dict(item)
            count += 1

    video_infos_dict = []
    for i in range(len(video_infos)):
        tmp_data = video_infos[str(i)]
        tmp_dict = _make_one_dict(tmp_data)
        video_infos_dict.append(tmp_dict)

    video_infos_include_keyword = _select_one_that_includes_keyword(KEYWORD, video_infos_dict)

    if video_infos_include_keyword == []:
        sys.exit()

    video_infos_json = [json.dumps(i) for i in video_infos_include_keyword]
    res = '\n'.join(video_infos_json)

    client = storage.Client()
    bucket_name = 'youtube_video_data'
    blob_name = 'kirinuki_{}.json'.format(datetime.now().strftime('%Y%m%d_%H%M%S'))
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob.upload_from_string(data=res, content_type='application/json')

GithubActionsでBotの定期実行ワークフロー作成

はじめに

リポジトリを見直していたら先月作成したGithubActionsを使ったTweet取得&Discord投稿Botの定期実行ワークフローを見つけたので、
これについて書きます。Discordで作成しましたが、webhookURLを取得できれば多分Slackでもできます。(要確認)

改めて見返すとコード汚な!どこかで綺麗にできないか試そう...

コードはこちらのgithubリポジトリ

目次

概要

作成したのはTweet取得Botで、GithubActionsを使って定期実行しています。 f:id:amagaeru1113:20210106213720p:plain

特徴は大まかに以下の通りです。

  • GithubActionsでワークフローを定期実行
  • Twitterから指定の単語を含むツイートの取得
  • 取得したツイートの内、favが5つ以上のツイート及びURLを含むもののみ保持
  • Discordチャンネルに投稿

動かしてみると画像のようにチャンネルに投稿されます。

f:id:amagaeru1113:20210106213550p:plain

GithubActionsについて

今回ワークフローはGithubActionsを使って定義しています。
GithubActionsとはGithub謹製のCI/CDツールです。Githubにpushされたコードの自動テスト・デプロイであったり、イベント(pull request、push, etc)などに紐付けてプログラムを実行するなど様々なことができます。もちろん定期実行もできます。

github.co.jp

Tweet取得&Discord投稿Botについて

名前の通り、Tweetを取得してDiscordチャンネルに投稿してくれるBotです。

今回のBotを動かすには以下の2つの情報が必要なので事前に用意しておきます。

  • TwitterAPIのアクセストーク
  • DiscordチャンネルのwebhookURL

上記を取得するのに参考にしたページを記載しておきます。

developer.twitter.com

また、ファイルは以下の通りです。

DiscordBot_MLTweets/
├── README.md
├── config.py
├── functions.py
├── main.py
├── my_dataclasses.py
├── poetry.lock
└── pyproject.toml

使用するライブラリなどはpyproject.tomlに記載してあります。

discordwebhook = "^1.0.0"
requests = "^2.24.0"
tweepy = "^3.9.0"
toml = "^0.10.1"

ワークフローの実装

Twitterの取得について多くの記事があるため、参考にした記事を記事最下部に記載します。
ここではGithubActionsの設定についてコメントを挿入していく形で紹介します。

name: Sent Tweets News

# onでトリガーとなるイベントの設定
# ここでは定期実行のためcronで設定します
on:
    schedule:
        - cron: '0 0 * * *'

# 実行するjobの設定
jobs:
    sent:

        # 実行環境の設定:ubuntu
        name: Sent News
        runs-on: ubuntu-latest

        # 細かい処理について設定
        steps:
            - name: Checkout
              uses: actions/checkout@v2.0.0

            # pythonのバージョンを指定
            - name: Set up Python 3.8
              uses: actions/setup-python@v2
              with: 
                python-version: 3.8

            # poetryをinstall    
            - name: Install Poetry
              run: |
                curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python

            # poetryのパスを使用可能にする
            - name: Add path for Poetry
              run: echo "$HOME/.poetry/bin" >> $GITHUB_PATH

            # poetry install
            - name: Install Dependencies
              run: poetry install --no-interaction

            # 環境変数の設定と実行
            - name: Run py
              env:
                DISCORD_CHANNEL_WEBHOOK: ${{ secrets.DISCORD_CHANNEL_WEBHOOK }}
                TWITTER_API_KEY: ${{ secrets.TWITTER_API_KEY }}
                TWITTER_API_TOKEN: ${{ secrets.TWITTER_API_TOKEN}}
                TWITTER_SECRET_KEY: ${{ secrets.TWITTER_SECRET_KEY}}
                TWITTER_SECRET_TOKEN: ${{ secrets.TWITTER_SECRET_TOKEN}}
              run: poetry run python main.py $DISCORD_CHANNEL_WEBHOOK $TWITTER_API_KEY $TWITTER_API_TOKEN $TWITTER_SECRET_KEY $TWITTER_SECRET_TOKEN

大まかな手順としては以下のようになっています。

  • ワークフロー名の決定とトリガーイベントの設定
  • 実行環境の指定(ubuntu-latest)
  • jobsの作成
  • stepsで行う処理の作成

また、今回使用した環境変数(アクセストークン、webhookURL)の部分ですが、元々ローカルでtomlに格納していましたが、
そのままではgithubにpushすると他人に見えてしまいます。 そこでGithub -> Settings -> Secretsにこれらを設定し、GithubActions上で注入することで見えないようにしました。

おわりに

簡単ではありますが、定期実行するワークフローの実装について紹介しました。 Botの実装としてはまだまだ直さないといけない点がいくつもあるのですが、全体としてはこの機能で終わりです。


GithubActionsは様々なaction(処理の塊)を企業が作成したりしていますが、その他のactionについては意図せずバグが入っていたりするなど使用には注意が必要です。 現状は自作した方が良いという話ですが、セキュリティやバグなどの問題をクリアしてこれらactionも使えるようになるともっと利便性が増しそうです。

参考

最後に参考にしたページを列挙させていただきます。

Discordチャンネルへの通知例

はじめに

Discordに通知する方法を調べる機会があったので、簡単な通知例を作成しました。

コード等はこちらです。 blogCode/DiscordNotify at master · amagaeru1113/blogCode · GitHub

実行環境は以下です。

  • pyenv version = 3.8.5
  • poetry:ライブラリ管理
    • requests
    • toml
    • discordwebhook

Discord

Discordはゲームに特化したチャットツールです。Slackと同じような使い方ができますが、特徴としてはVC(ボイスチャンネル)に入って複数人で話すことが簡単な点です。リモートワークで少なくなった雑談が可能になります。

実際、チームレベルでDiscordを使用してVCに基本ログインしておく等の使用をしている事例を聞きます。

通知

webhook url取得

通知を行うために必要なwebhook urlをまずは取得します。 詳しくは公式 docs:Webhooksへの序章にありますが、手順を確認します。

1.Discordサーバーの通知したいチャンネルで「チャンネルを編集」をクリック

クリックするとチャンネル説明や権限、招待周りの設定ができる画面が表示されます。 f:id:amagaeru1113:20201231204206p:plain

2.画像左側の「連携サービス」をクリック

連携サービスで導入したBotやwebhook urlの取得ができます。

3.ウェブフックを作成

画像では「ウェブフックの確認」となっていますが、一つも取得していない場合、作成ボタンのみ表示されます。 f:id:amagaeru1113:20201231204235p:plain

4.ウェブフックURLをコピーをクリック

コピーボタンクリックでwebhook urlを取得できます。

f:id:amagaeru1113:20201231204323p:plain

通知例

requestsを使う

webhook urlを取得したらrequestsを使って通知できます。 次の場合に適しているかもしれません。

  • discordのためにライブラリインポートするのもなあって場合
  • 凝った表示を作成したい場合

以下のコードでは

  • テキストのみの通知
  • URLを含むテキストの通知
  • 画像・テキストを含み埋め込み表現を用いた通知

の三種類の通知を実行します。

参考:埋め込み表現

import requests
import json
import toml


def simple_message(webhook_url, message):
    main_content = {'content': message}
    headers = {'Content-Type': 'application/json'}
    requests.post(webhook_url, json.dumps(main_content), headers=headers)


def message_and_image(webhook_url, image_url, message):
    embeds = [
        {
            'description': 'タラモアデュー',
            'color': 16776960, # サイドバーの色
            'image': {
                'url': image_url
            }
        }
    ]

    main_content = {
        'content': message,
        'embeds': embeds
    }

    headers = {'Content-Type': 'application/json'}
    requests.post(webhook_url, json.dumps(main_content), headers=headers)


if __name__ == '__main__':
    obj = toml.load('config.toml')
    webhook_url = obj['webhookurl']
    image_url = obj['image_url']
    qiita_url = obj['qiita_url']

    message1 = '\n sample message from requests'
    message2 = f'\n sample url\n  {qiita_url}'

    simple_message(webhook_url, message1)
    simple_message(webhook_url, message2)
    message_and_image(webhook_url, image_url, message1)

上記のコードを実行すると、以下画像のように表示されます。 f:id:amagaeru1113:20201231204441p:plain

discordwebhooksを使う

discord webhook用のライブラリである10mohi6/discord-webhook-pythonを使って通知できます。

次の場合に適しているかもしれません。

  • サクッと通知したい場合
  • 自由度がそんなにいらない場合

requestsのコードと同様に

  • テキストのみの通知
  • URLを含むテキストの通知
  • 画像・テキストを含み埋め込み表現を用いた通知

の三種類の通知を実行します。

参考:埋め込み表現

import toml
from discordwebhook import Discord

def simple_message(webhook_url, message):
    discord = Discord(url=webhook_url)
    discord.post(content=message)

def message_and_image(webhook_url, image_url, message):
    discord = Discord(url=webhook_url)
    discord.post(
        content=message,
        embeds=[
            {
                'description': 'タラモアデュー',
                'color': 65535, # サイドバーの色
                'image': {'url': image_url},
                'footer': {
                    'text': message,
                },
            }
        ],
    )


if __name__ == '__main__':
    obj = toml.load('config.toml')
    webhook_url = obj['webhookurl']
    image_url = obj['image_url']
    qiita_url = obj['qiita_url']

    message1 = 'sample message from discordwebhook'
    message2 = f'\n sample url\n  {qiita_url}'

    simple_message(webhook_url, message1)
    simple_message(webhook_url, message2)
    message_and_image(webhook_url, image_url, message1)

上記のコードを実行すると、以下画像のように表示されます。 f:id:amagaeru1113:20201231204511p:plain

おわりに

今回、requestsとdiscordwebhookを使った場合の通知例を作成しました。

コード数・コード内容がほぼ変わりませんでしたが、discord通知の部分であることがdiscordwebhookを使うとわかりやすいか?とは思いました。 (どちらがより通知周りで嬉しいかに至れてません...)

requestsとdiscordwebhookでの違いが顕著に出る例が見つかればまた追記します。

ちなみに通知ではなく、諸々機能を搭載したBotを作成したい場合はwebhook urlではなくアクセストークンの取得が必要らしいです。
以下のページが参考になるかと思います。 - Pythonで実用Discord Bot(discordpy解説) - Rapptz/discord.py

参考

kedro + mlflowを用いた実験の備忘録

はじめに

今回は以前作成したDockerコンテナ上でtitanicデータを用いた実験を行います。
実験ステップを追っていくので長いです。

目次

環境

以前作成したDockerイメージです。 こちらを起動し、以降マウントしたディレクトリでファイル作成など行っていきます。

github.com

データ

titanicデータはkaggleのページからはダウンロードしてきます。

www.kaggle.com

kedroの流れ

ファイル構成

kedro newで作成したファイル構成(pycacheは除外)です。 丸のついた数字は行う捜査の説明する箇所と順番に対応しています。

kedro_project/titanic_tutorial/
├── README.md
├── __init__.py
├── conf
│   ├── README.md
│   ├── base
│   │   ├── catalog.yml----------------------------②
│   │   ├── credentials.yml
│   │   ├── logging.yml
│   │   └── parameters.yml-------------------------③
│   └── local
├── data
│   ├── 01_raw-------------------------------------①
│   │   ├── gender_submission.csv
│   │   ├── test.csv
│   │   └── train.csv
│   ├── 02_intermediate
│   ├── 03_primary
│   ├── 04_feature
│   ├── 05_model_input
│   ├── 06_models
│   ├── 07_model_output
│   └── 08_reporting
├── docs
├── kedro_cli.py
├── logs
├── mlruns-----------------------------------------⑧
├── notebooks
│   ├── check_catalog.ipynb------------------------④
│   └── titanic_on_jupyterlab.ipynb----------------⑤
├── setup.cfg
└── src
    ├── __init__.py
    ├── requirements.txt
    ├── setup.py
    ├── tests
    └── titanic_tutorial---------------------------⑨、⑩
        ├── __init__.py
        ├── hooks.py-------------------------------⑦
        ├── pipelines------------------------------⑥
        │   ├── __init__.py
        │   ├── data_engineering
        │   │   ├── node.py
        │   │   └── pipeline.py
        │   └── data_science
        │       ├── LinearSVM
        │       └── __init__.py
        └── run.py

① データの配置

ファイル構成

入手したデータを01_rawに配置します。

data
└── 01_raw
    ├── gender_submission.csv
    ├── test.csv
    └── train.csv

② カタログ登録

ファイル構成

配置したデータをカタログに記載します。 出力した中間データを使用したい場合、改めてこちら記載します。

conf
├── README.md
└── base
     └── catalog.yml

今回、ダウンロードしたデータはcsv形式なので、次にように書きます。
ローカルに保存したデータであれば、名前・データタイプ・ファイルパスを設定すれば呼び出せます。

train:
  type: pandas.CSVDataSet
  filepath: data/01_raw/train.csv

test:
  type: pandas.CSVDataSet
  filepath: data/01_raw/test.csv

gender:
  type: pandas.CSVDataSet
  filepath: data/01_raw/gender_submission.csv

上記以外に次のファイルシステムからデータを呼び出せます。
詳細はThe Data Catalog

③ パラメータ登録

ファイル構成

使用するパラメータの登録をします。 登録は適宜行います。

conf
  ├── README.md
  └── base
       └── parameters.yml

ここではtest_sizeとrandom_stateを登録しておきます。

test_size: 0.2
random_state: 24

④ データの確認

ファイル構成

catalogに記載されているデータをノートブックで確認します。 jupyter notebook/labを起動し、notebooks/にipynbを作成します

notebooks
   └── check_catalog.ipynb

作成後、データとパラメータを確認します。
%reload_kedro はcatalog、contextが見れるようになるマジックコマンドです。

⑤ プロトタイプ作成

ファイル構成

続いて、notebooksにデータ探索用のノートブックを作成します。

notebooks
  └── prototype_LinearSVM.ipynb

データの前処理をした後、線形SVMで分類を行います。

⑥ パイプライン作成

ファイル構成

プロトタイプのコード作成後、kedroのパイプラインのコードを落とし込みます。
この時、src/{プロジェクト名}/pipelines配下でdata_engineering(データの整形・加工)、data_science(モデルの訓練・検証)を作成します。
こうすることでデータの整形は共通し、モデルの訓練部分のみ変更することができます。

今回は分かりやすくするために、data_science配下にモデル名のディレクトリを作成し、そこに使用する関数とパイプラインを格納します。

pipelines
    ├── __init__.py
    ├── data_engineering
    │       ├── node.py
    │       └── pipeline.py
    └── data_science
            ├── LinearSVM
            │      ├ node.py
            │      └ pipeline.py
            └── __init__.py

pipelineの作成に当たってCreating a pipelineを参考にしました。

data_engineering

node

prototypeで行った処理を関数に分割し、最終的にpreprocessを呼び出すことで処理を複数適用するようにします。

import pandas as pd
from sklearn import preprocessing


def _label_encoding(df: pd.DataFrame) -> (pd.DataFrame, dict):

    df_le = df.copy()
    list_columns_object = df_le.columns[df_le.dtypes == 'object']

    dict_encoders = {}
    for column in list_columns_object:
        le = preprocessing.LabelEncoder()
        mask_nan = df_le[column].isnull()
        df_le[column] = le.fit_transform(df_le[column].fillna('NaN'))

        df_le.loc[mask_nan, column] *= -1
        dict_encoders[column] = le

    return df_le, dict_encoders


def _drop_columns(df: pd.DataFrame) -> pd.DataFrame:
    df_prep = df.copy()
    drop_cols = ['Name', 'Ticket', 'PassengerId']
    df_prep = df_prep.drop(drop_cols, axis=1)
    return df_prep


def _process_Age_column(series: pd.Series) -> pd.Series:
    series_prep = series.copy()
    series_prep = series_prep.fillna(series_prep.mean())
    return series_prep


def _process_Embarked_column(series: pd.Series) -> pd.Series:
    series_prep = series.copy()
    series_prep = series_prep.fillna(series_prep.mode()[0])
    return series_prep


def _process_Pclass_column(series: pd.Series) -> pd.Series:
    series_prep = series.copy()
    series_prep = series_prep.astype(str)
    return series_prep


def _process_Cabin_column(series: pd.Series) -> pd.Series:
    series_prep = series.copy()
    series_prep = series_prep.str[0]
    return series_prep


def preprocess(df: pd.DataFrame) -> pd.DataFrame:
    df_prep = df.copy()

    df_prep = _drop_columns(df_prep)
    df_prep['Age'] = _process_Age_column(df_prep['Age'])
    df_prep['Embarked'] = _process_Embarked_column(df_prep['Embarked'])
    df_prep['Pclass'] = _process_Pclass_column(df_prep['Pclass'])
    df_prep['Cabin'] = _process_Cabin_column(df_prep['Cabin'])

    df_prep, _ = _label_encoding(df_prep)

    return df_prep

pipeline

pipeline.pyではnode.pyで定義した関数を呼び出します。
この際、次の4つを定めます。

  • func:使用する関数
  • inputs:入力
  • outputs:出力
from kedro.pipeline import node, Pipeline
from .node import preprocess

def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                func=preprocess,
                inputs="train",
                outputs="train_prep",
                name="preprocess",
            ),
        ],
    )

data_science

node

こちらのnodeでは、データの分割、モデルの学習、モデルの検証及び記録でそれぞれ関数にしました。
mlflowはevaluate_model内で使用し、logをとっています。

import os
from typing import Any, Dict, List
import pandas as pd
import numpy as np
import logging
from datetime import datetime

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.svm import LinearSVC

import mlflow


def split_data(data: pd.DataFrame, parameters: Dict) -> List:
    """Splits data into training and test sets.
        Args:
            data: Source data.
            parameters: Parameters defined in parameters.yml.
        Returns:
            A list containing split data.
    """
    target_col = 'Survived'
    X = data.drop(target_col, axis=1).values
    y = data[target_col].values

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
    )

    return [X_train, X_test, y_train, y_test]


def train_model(X_train: np.ndarray, y_train: np.ndarray, parameters: Dict[str, Any]) -> LinearSVC:

    train_x, valid_x, train_y, valid_y = train_test_split(X_train,
                                                          y_train,
                                                          test_size=parameters['test_size'],
                                                          random_state=parameters['random_state'])

    model = LinearSVC(C=0.1).fit(train_x, train_y)
    return model


def evaluate_model(model: LinearSVC, X_test: np.ndarray, y_test: np.ndarray, parameters: Dict[str, Any]):
    """Calculate the F1 score and log the result.
        Args:
            model: Trained model.
            X_test: Testing data of independent features.
            y_test: Testing data for price.
    """

    prediction = model.predict(X_test)
    prediction = np.where(prediction < 0.5, 0, 1)
    score = round(accuracy_score(y_test, prediction), 3)

    logger = logging.getLogger(__name__)
    logger.info("accuracy score : %.3f.", score)

    file_path = os.path.abspath(__file__).split('/')
    model_name = file_path[-2]
    runName = datetime.now().strftime('%Y%m%d%H%M%S')
    mlflow.start_run(run_name=runName, nested=True)
    mlflow.log_metric("accuracy_score", score)
    mlflow.log_param("test_size",
                     parameters['test_size'])

    mlflow.log_param("random_state",
                     parameters['random_state'])

    mlflow.log_param("model_name",
                     model_name)

    mlflow.end_run()

pipeline

data_engineeringと同様にnodeから関数を呼び出し、pipelineを作成します。
各nodeとfuncは一対一で対応します。
一番最初のnodeのinputsはdata_engineeringの出力とparameters.ymlです。
その後のnodeでは以前のnodeのoutputsをinputsに入れていく形になります。

from kedro.pipeline import node, Pipeline
from .node import split_data, train_model, evaluate_model


def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                func=split_data,
                inputs=["train_prep", "parameters"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
            ),
            node(
                 func=train_model,
                 inputs=["X_train", "y_train", "parameters"],
                 outputs="model"),
            node(
                func=evaluate_model,
                inputs=["model", "X_test", "y_test", "parameters"],
                outputs=None,
            ),
        ],
    )

⑦パイプラインの結合

data_engineering及びdata_scienceのパイプラインの作成後、それらを結合して 全体のパイプラインを作成します。これはhooks.pyで行います

src
 ├── __init__.py
 ├── requirements.txt
 ├── setup.py
 ├── tests
 └── titanic_tutorial
        ├── __init__.py
        ├── hooks.py
        └── pipelines

hooks.pyの大部分はプロジェクト作成時のもので、今回コードを変更するのはimportregister_pipelinesです。

  • importでは作成したdata_engineering/data_scienceパイプラインを呼び出します。
  • register_pipelinesで呼び出したパイプラインを結合します。

結合後、kedro_project/{プロジェクト名}配下でkedro runコマンドを実行することで実験を行います。

参考: Create a pipeline

# Copyright 2020 QuantumBlack Visual Analytics Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
# NONINFRINGEMENT. IN NO EVENT WILL THE LICENSOR OR OTHER CONTRIBUTORS
# BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF, OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# The QuantumBlack Visual Analytics Limited ("QuantumBlack") name and logo
# (either separately or in combination, "QuantumBlack Trademarks") are
# trademarks of QuantumBlack. The License does not grant you any right or
# license to the QuantumBlack Trademarks. You may not use the QuantumBlack
# Trademarks or any confusingly similar mark as a trademark for your product,
# or use the QuantumBlack Trademarks in any other manner that might cause
# confusion in the marketplace, including but not limited to in advertising,
# on websites, or on software.
#
# See the License for the specific language governing permissions and
# limitations under the License.

"""Project hooks."""
from typing import Any, Dict, Iterable, Optional

from kedro.config import ConfigLoader
from kedro.framework.hooks import hook_impl
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline
from kedro.versioning import Journal


#=====================変更箇所=======================
from titanic_tutorial.pipelines.data_engineering import pipeline as de
from titanic_tutorial.pipelines.data_science.LinearSVM import pipeline as ds
#===================================================



class ProjectHooks:
    @hook_impl
    def register_pipelines(self) -> Dict[str, Pipeline]:
        """Register the project's pipeline.

        Returns:
            A mapping from a pipeline name to a ``Pipeline`` object.

        """

        #=====================変更箇所=======================
        de_pipline = de.create_pipeline()
        ds_pipline = ds.create_pipeline()

        return {
            "de": de_pipline,
            "ds": ds_pipline,
            "__default__": de_pipline + ds_pipline,
        }
        #===================================================

    @hook_impl
    def register_config_loader(self, conf_paths: Iterable[str]) -> ConfigLoader:
        return ConfigLoader(conf_paths)

    @hook_impl
    def register_catalog(
        self,
        catalog: Optional[Dict[str, Dict[str, Any]]],
        credentials: Dict[str, Dict[str, Any]],
        load_versions: Dict[str, str],
        save_version: str,
        journal: Journal,
    ) -> DataCatalog:
        return DataCatalog.from_config(
            catalog, credentials, load_versions, save_version, journal
        )


project_hooks = ProjectHooks()

⑧ mlflow uiでログ確認

実験後、kedro runを実行したディレクトリ(kedro_project/{プロジェクト名})で、
改めてmfuiコマンドを実行します。(mfui='mlflow ui --host 0.0.0.0'")

実行後、http://0.0.0.0:5000/にアクセスするとmlflow Experimentsの画面が表示されます。

実験実行後、アクセスするとparameters、accuracy_scoreが記録されていることが分かります。
以下の図ではrandom_stateの変更、モデルの変更を行った際のログも記録されています。

f:id:amagaeru1113:20201222204220p:plain

⑨ パイプライン可視化

mfuiを実行したディレクトリでkvizコマンドを実行します。
実行後、http://0.0.0.0:4141/kedro-vizの画面が表示されます。
(kviz='kedro viz --host 0.0.0.0'")

f:id:amagaeru1113:20201222202628p:plain

⑩ モデルやパラメータを変更して再実験

一度通しでパイプラインを作成できたら、モデルやパラメータを変えて⑤〜⑨のステップを繰り返すことで、実験を進めていきます。

気になった点

現時点で、気になった点は2つです。

  • 今後mlflowで色んなデータのログを取りたくなって記載場所が散逸した際どうするか
  • data_science、data_engineeringで複数の処理・モデルを作った際、組み合わせを試すことがコマンドからできるのか

前者についてはkedroと併用するmlflow周りを改善したpipelineXというライブラリがあるため、こちらを使用して見るのが一つだと思います。

後者については、Kedro’s command line interfaceで、kedro runのオプションでパイプラインを一つ指定して実行するコマンドがあったので、組み合わせで実行させるコマンドがあるかもしれませんがまだ見つけられていないです。

おわりに

kedroを使ってみて、分析初心者にとってこのようなパイプラインのテンプレートは非常にありがたいと感じました。

また、titanicチュートリアルを通してやって見ることで基本はnotebooksで試行錯誤し、data_engineering/data_scienceに諸々コードを固めつつ、パラメータチューニングも含め実験していくイメージを持てました。

今回特徴量生成は行わなかったのですが、もし行う場合はdata_engineeringでの処理は最小限にし、feature_engineeringディレクトリで諸々行うのが置き場所として正しいかと考えています。

src/pipelins
├── __init__.py
├── data_engineering
│   ├── node.py
│   └── pipeline.py
├── data_science
│   ├── LightGBM
│   ├── LinearSVM
│   └── __init__.py
└── feature_engineering
    ├── node.py
    └── pipeline.py

参考

kedro+mlflowで実験を行う環境構築

はじめに

実験管理・ワークフロー管理を行うフレームワークを試すため、今回はkedro・mlflowを導入したDockerコンテナの作成をしました。

作成した環境テンプレートはこちら→amagaeru1113/templates
コンテナ起動までの手順などはREADMEにあります。

背景

背景としては、先日参加したatmaCup#8で行った実験管理があまりにも雑だったので、その勉強やそもそもフレームワークを使って型にはめたいと考えたからです。

ちなみにコンペでは以下のような進め方でした。

  • 特徴量考えて処理に足し引きして評価値を見る
  • コンペ期間中は1day1notebook
  • スプレッドシートに実験を回したときの評価値のスコア、入れた特徴量のリスト、subしたらそのとき表示されたスコアを毎回記録する

f:id:amagaeru1113:20201217210624p:plain

試したいライブラリ

Kedro

Kedroは分析・実験の再現性・保守性・モジュール化されたコード作成のためのOSSです。
適用される概念には、モジュール性、関係の分離、バージョン管理などがあります。

実験を行う際のパイプライン構築のテンプレートとしての利用を考えています。
分析初心者なので、パイプラインを一から作るって難しいなと思っていたところヒットしました。

Welcome to Kedro’s documentation! — Kedro 0.16.6 documentation

github.com

MLflow

MLflow は、機械学習のライフサイクルをエンドツーエンドで管理するためのOSSです。 主に4つの機能があります。

  • パラメータと結果を記録し、比較するための実験の追尾(MLflow Tracking)。
  • 他のデータサイエンティストと共有したり、本番環境に転送したりするために、MLコードを再利用可能で再現性のある形にパッケージ化(MLflow Projects)。
  • 様々なMLライブラリからのモデルを管理し、様々なモデル提供・推論プラットフォームに展開(MLflow Models)。
  • モデルのバージョン管理、ステージ遷移、アノテーションなど、MLflow モデルのライフサイクル全体を共同で管理するための中央モデルストアを提供(MLflow Model Registry)。

主にMLflow Trackingを使って評価値などのログを取りたいです。

MLflow Documentation — MLflow 1.12.1 documentation

github.com

実験環境

実験環境はコンペごとにコンテナを立てることで管理します。
Dockerを使うにあたって、pyenv・poetryをローカル環境にインストール済みであることを想定しています。

また、よく使うライブラリ(pandas、numpyなど)はbuild時点で導入するため、poetryを使ってライブラリ管理します。
その上で実験で使いたいライブラリについてはコンテナ上でinstallします。

作成した環境テンプレートはこちら→amagaeru1113/templates コンテナ起動までの手順などはREADMEにあります。

実行手順やコンテナ起動についてはREADMEに記載があるので、以降では下記4点についてを書きます。

  • Dockerをbuildする前の準備
  • poetryで導入するライブラリ一覧
  • kedroで作成したプロジェクトの構成
  • kedro+mlflow使用した実験の流れ

Dockerをbuildする前の準備

  • pyenvとpoetryをローカル環境にインストール
  • pyenvでディレクトリのpythonバージョンを設定(今回は3.8.5)
  • poetry installでライブラリインストール(pyproject.tomlとpoetry.lockをコピーするため)

pyenv、poetry導入の参考記事

poetryで導入するライブラリ一覧

kedro・mlflow及び必須なライブラリを入れています。
kedro-vizはパイプラインの可視化、lightgbmは次回行うTitanicチュートリアルで使用します。

  • pandas
  • numpy
  • matplotlib
  • scikit-learn
  • mlflow
  • kedro
  • jupyterlab
  • kedro-viz
  • lightgbm

kedroで作成したプロジェクトの構成

Dockerを起動してコンテナに入ったらkedro newコマンドで実験プロジェクトを作成します。
以降、基本的にはそのプロジェクト内しか移動しないです。

プロジェクトの構成例として、Kedro DocsのIris dataset example projectから構成を引用します。

    get-started         # テンプレートの親ディレクトリ
    ├── conf            # プロジェクトの設定ファイル群
    ├── data            # ローカルデータ
    ├── docs            # ドキュメント
    ├── kedro_cli.py    # CLIコマンドコレクション
    ├── logs            # 出力ログ
    ├── notebooks       # jupyterノートブック(srcに移動する前の実験的なコード)
    ├── README.md       # README
    ├── setup.cfg       # kedro test、kedro lint実行する際の設定オプション
    └── src             # ソースコード

主に触るであろうディレクトリは次の3つです。

  • conf
    • catalog.yml:使用するデータの定義
    • parameters.yml: random_stateやモデルのパラメータ管理
  • notebooks
    • 実験的なコードの作成
    • ここで定義した関数もsrcで呼び出し可能
  • src
    • パイプラインの定義など

kedro+mlflow使用した実験の流れ

流れとしてはjupyterlabで分析をして、ある程度まとまってきたデータ処理やモデルの訓練の硬い実装をpiplinesで行います(多分)。
大まかに以下のステップをふむと考えています。

  1. データをdata/01_rawに配置
  2. conf/base/catalog.ymlにデータを記載
  3. notebooksで分析と処理を試し、関数化
  4. src/{プロジェクト名}/pipelines配下にデータ加工と学習のパイプラインを作成
    • data_engineering
      • node:関数化されたデータ加工処理
      • pipeline: データ加工をパイプライン化
    • data_science
      • node: 関数化されたモデルの訓練と検証
      • pipeline: モデルの訓練をパイプライン化
      • mlflow.log_XXXはnodeの関数内に記載
  5. pipelinsをhooks.pyのregister_pipelinesで結合
  6. kedro_project/{プロジェクト名}直下でkedro run
  7. mlflowでログを確認
  8. パイプラインを確認

詳しくはKedro DocsのIris dataset example projectが参考になります。 上のステップよりこっちを見た方がいいです。

おわりに

今回は実験環境をコンテナ上に作成しました。 また、Kedro・MLflowを使用した実験の流れの想定を行いました。

次回はTitanicのデータを使って、どんな手順を踏んだのかを順を追って行きたい...です。

また、kedro+mlflow以外にpipelineXgokartでの実験管理も試してみたいと考えているのでそちらについても備忘録を作成したいです。

最後にkedro+mlflowの参考にpipelineXを紹介してくださったチズチズ様、デモリポジトリのリンクをわざわざ貼ってくださったpipelineX作成者のYousuke Minami様にこの場をお借りして感謝申し上げます。

参考

メモ:pythonでtoml

はじめに

ちょくちょく使うtomlについてのメモです。

コードはこちらのrepoです

TOML

Tom's Obvious, Minimal Language (TOML) とは設定ファイルを記述するための小さな言語です。例えば、以下のような特徴があります。

  • 可読性が高い
  • 標準的なデータ型が利用可能
    • String
    • Integer
    • Float
    • Boolean
    • Datetime
    • Array
  • Table(キーと値からなる集合)と言うデータ型もある
  • 文字の大小を区別する
  • 標準でUTF-8が要求されている

詳しくは日本語docをどうぞ!

実行環境

pyenv、poetryを使用してローカルで実行。 python-version: 3.8.5

tomlをインストール poetry add toml pipを使う場合は pip install toml

ファイル構成

.
├── poetry.lock
├── pyproject.toml
└── src
    ├── example.toml
    ├── new_example.toml
    └── use_toml.py

example.toml

var_str = "var1" # comment out
var_int = 10
var_float = 10.0
var_bool_t = true
var_bool_f = false
var_list = ["a", "b", "c", "d"]


[table1]
t_var_1 = 'Orange'
t_var_2 = 'Banana'


[BigTable]
  name = "foo"

    [[BigTable.Movie]]
        name = "foo1"
        release = 1988

    [[BigTable.Movie]]
        name = "foo2"
        release = 2000
    
    [BigTable.Music]
        name = "mug1"
        author = 'artist'
        release = 2011

簡単な使用例

tomlのimportとファイル読み込み

import toml
example = toml.load('example.toml')

exampleの中身とデータ型

print(example)
# {'var_str': 'var1', 'var_int': 10, 'var_float': 10.0, 'var_bool_t': True, 'var_bool_f': False, 'var_list': ['a', 'b', 'c', 'd'], 'table1': {'t_var_1': 'Orange', 't_var_2': 'Banana'}, 'BigTable': [{'name': 'foo', 'Movide': [{'name': 'トトロ', 'release': 1988}], 'Music': [{'name': '無題', 'author': 'amazarashi', 'release': 2011}]}]}

print(type(example))
# <class 'dict'>

変数の取り出し

print(example["var_str"])
# var1

print(example["var_int"])
# 10

print(example["var_float"])
# 10.0

print('True' if example["var_bool_t"] else 'False')
# True

配列の取り出し

print(example['var_list'])  
# ['a', 'b', 'c', 'd']

x1, x2, x3, x4 = example['var_list']
print(x1, x2, x3, x4)  
# a b c d

テーブルの取り出し

print(example['table1']['t_var_1'])
# Orange

print(example['table1']['t_var_2'])
# Banana

テーブルの配列の取り出し

print(example['BigTable'])
# {'name': 'foo', 'Movie': [{'name': 'foo1', 'release': 1988}, {'name': 'foo2', 'release': 2000}], 'Music': {'name': 'mug1', 'author': 'artist', 'release': 2011}}

print(example['BigTable']['Movie'])
# [{'name': 'foo1', 'release': 1988}, {'name': 'foo2', 'release': 2000}]

ファイルへ追加

example['add_var'] = 'new'

ファイル書き出し

new_example = 'new_example.toml'

with open(new_example, 'wt') as f:
    toml.dump(example, f)

おわりに

tomlはちょくちょく使っていたのですが、改めて整理するために記事にしました。 私はローカルで秘密情報を格納するために使っていて、最近ではTwitterAPIのアクセスキーを格納していました。

ただ、その収集するコードをGithubActionsで動かそうとすると、config.tomlが丸見えになってしまうのでpushはできませんでした。
その時は結局Github側にSecretsとして持たせました。 (周りに見せたくないファイルを隠してくれる機能がGithubに実装されるとそのまま使えるのに)

秘密情報をまとめて置いておけるのがありがたいので今後も使わせていただきます。

参考