cloud9_note

cloud9に限らないメモ

View on GitHub

Python

CSV読み込み

# -*- coding: utf-8 -*-
import csv
import sys
from model import ReleaseModel
from util import Util

if __name__ == '__main__':
    args = sys.argv
    filepath = args[1]
    
    release_list = []
    with open(filepath, encoding='utf8', newline='\n') as f:
        csvreader = csv.reader(f, quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
        for row in csvreader:
            model = ReleaseModel(row)
            release_list.append(model)
            print(model)
            
    for job in Util.get_deploy_job(release_list):
        print(f'- {job}')
    
# -*- coding: utf-8 -*-

from dataclasses import dataclass

@dataclass
class ReleaseModel:
    title: str
    repository: str
    deploy: set
    def __init__(self, line: list):
        index = 0
        self.title = line[index]; index += 1
        self.repository = line[index]; index += 1
        self.deploy = line[index].split('|'); index += 1
"title1","batch","ST0|IKR01|IKR03"
"title2","print","ST0|IKR01|IKR03"
"title3","batch","ST0|IKR01|SIT01"
"title4","webapps","ST0|IKR01|SIT01|ST101"

実行結果

ReleaseModel(title='title1', repository='batch', deploy=['ST0', 'IKR01', 'IKR03'])
ReleaseModel(title='title2', repository='print', deploy=['ST0', 'IKR01', 'IKR03'])
ReleaseModel(title='title3', repository='batch', deploy=['ST0', 'IKR01', 'SIT01'])
ReleaseModel(title='title4', repository='webapps', deploy=['ST0', 'IKR01', 'SIT01', 'ST101'])
- batch-IKR01
- batch-IKR03
- batch-SIT01
- batch-ST0
- print-IKR01
- print-IKR03
- print-ST0
- webapps-IKR01
- webapps-SIT01
- webapps-ST0
- webapps-ST101
- webapps-eob-IKR01
- webapps-eob-SIT01
- webapps-eob-ST0
- webapps-eob-ST101

Util.py

# -*- coding: utf-8 -*-

# from model import ReleaseModel

class Util:
    @staticmethod
    def get_deploy_job(relase_list: list):
        job_list = []

        for release in relase_list:
            for env in release.deploy:
                job_list.append((release.repository, env))
                if release.repository == 'webapps':
                    job_list.append((release.repository + '-eob', env))

        return_list = []
        for job in set(job_list):
            return_list.append(job[0] + '-' + job[1])
        
        return_list.sort()
        return return_list

ヘッダ行を読み飛ばす

csvreader = csv.reader(f, quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
next(csvreader)

普通のファイル読み込み

def import_format():
     return_list = []
    with open(FORMAT_PATH, 'r') as f:
        # readlineとか、readlines等の関数もあるが、末尾に改行コードが残る(仕様)。
        # 改行を除去したいため、下記を使用する。
        for line in f.read().splitlines():
            return_list.append(line)
    return return_list

csv -> json

pandasを使用する。
(当初はcsvに重複する項目があった場合、json側はリストにするつもりだったが未確認。)

data.csv

a,b,b,c
hoge,piyo,1,fuga
aaa,bb,2,ccc
import pandas as pd

df_data = pd.read_csv('data.csv')
df_data.to_json('data.json', orient='records')

data.json

[{"a":"hoge","b":"piyo","b.1":1,"c":"fuga"},{"a":"aaa","b":"bb","b.1":2,"c":"ccc"}]

csvもどきを読み込む

FakeCSV_convert_to_Json

こういうやつだが・・・
こんなファイルを作ってはいけない!!
これはCSVではない!!
作りたいのであれば別ファイルに分けるべき。

PATTERN_A
a,b,b,c
hoge,piyo,1,fuga
aaa,bb,2,ccc
PATTERN_B
a,a,b,c,c
hoge,1,piyo,fuga,2

jsonを読み込む

dict

import json

with open(REPLACE_DICT_PATH, 'r') as d:
    replace_dict = json.load(d)

dataclass

Simple

{
    "id" : 1,
    "name" : "hogehoge"
}
from dataclasses import dataclass

@dataclass
class SimpleModel:
    id: int
    name: str
from model SimpleModel
import json

with open("simple.json", "r") as fp:
    json_data = json.load(fp)
    print(SimpleModel(**json_data))

List

[
    {
        "id" : 1,
        "name" : "top",
        "sub" : [
            {
                "id" : "A1",
                "name" : "sub_A1"
            },
            {
                "id" : "A2",
                "name" : "sub_A2"
            }
        ]
    },
    {
        "id" : 2,
        "name" : "second",
        "sub" : [
            {
                "id" : "B1",
                "name" : "sub_B1"
            },
            {
                "id" : "B2",
                "name" : "sub_B2"
            }
        ]
    }
]
from dataclasses import dataclass, field
from dataclasses_json import dataclass_json
from typing import List

@dataclass
class SubModel:
    id: str
    name: str
    value: str
    
@dataclass
class JsonModel:
    id: int 
    name: str
    sub: List[SubModel]  = field(default_factory=list)
from model import JsonModel
import json

with open("sample.json", "r") as fp:
    json_data = json.load(fp=fp)
    print([JsonModel.from_dict(model) for model in json_data])

dict -> jsonファイル出力する

import json
dict_datas = {}
# dict_datasに値を詰める(省略)
with open('data.json', 'w') as f:
    json.dump(dict_datas, f, ensure_ascii=False)
    # ensure_asciiは文字をエスケープするかどうかの設定。Falseでしない。

ディレクトリ配下を再帰的に検索する

import glob

files = glob.glob("./**/*.py", recursive=True)
for file in files:
    print(file)

__init__をオーバーロードする

Pythonの場合、__init__は1クラスに1つしか作れないため、オーバーロードできない。
@dataclassをつけた場合、__init__メソッドが生成されるが、CSV -> dataclassしたい場合は、listで渡したい場合もある。


from dataclasses import dataclass

@dataclass
class SampleDataClass:
    id: str
    hoge: str

    def sets(self, line: list):
        index = 0
        self.id = list[index]; index += 1
        self.hoge = list[index]; index += 1

dataclassの初期化

dataclassは初期値がない場合は、引数なしでコンストラクタを呼び出せない。
なるべく初期値は設定したほうが良い。

from dataclasses import dataclass

@dataclass
class HogeModel():
    id: int = 0
    v: str = None

Enumの実装例

Enum/dotenv

pip install python-dotenv

src

.env

HOGE=hoge

config.py

from dotenv import load_dotenv
load_dotenv()

import os
from enum import Enum

class SampleEnum(Enum):
    HOGE = os.getenv("HOGE")

app.py

from config import SampleEnum

print(SampleEnum.HOGE)
print(SampleEnum.HOGE.value)

実行結果

$ python app.py
SampleEnum.HOGE
hoge

参考

dataclassのsort

from operator import attrgetter

# ageはdataclassのメンバ変数名。
sorted(person_list, key=attrgetter('age'))

クラス宣言とコンストラクタ

class SampleClass:
    def __init__(self, v):
        self.v = v

配列の展開

ソース

# -*- coding: utf-8 -*-
import itertools

def unpack_01(original):
    """ もっといい書き方があると思うんだけど・・・"""
    unpacked = []
    for l in original:
        for i in l:
            unpacked.append(i)
    return unpacked

def unpack_02(original):
    return list(itertools.chain(*original))

def unpack_03(original):
    """あとから展開したものを追加するだけならこれでOK。"""
    return_list = []
    for l in original:
        return_list.extend(l)
    return return_list

def main():
    original = [[1,2],[3,4],[5,6]]

    print(original)
    print(unpack_01(original))
    print(unpack_02(original))
    print(unpack_03(original))
    
if __name__ == '__main__':
    main()

実行結果

[[1, 2], [3, 4], [5, 6]]
[1, 2, 3, 4, 5, 6]
[1, 2, 3, 4, 5, 6]

参考

文字列の配列を何らかの文字列で連結する

','.join(str_list)

str -> int -> bool

あくまで整数。

True/False
1 True
0 False
上記以外 True

実行例

import sys

def main(args):
    i = int(args[1]);
    if i:
        print(True)
    else:
        print(False)

if __name__ == '__main__':
    main(sys.argv)
$ python app.py 1
True
$ python app.py 0
False
$ python app.py 2
True
$ python app.py -1
True

datetime

str -> datetime , timezone

from datetime import datetime, timezone, timedelta

converted = datetime.strptime("2021-09-11T12:45:18.448117+00:00", "%Y-%m-%dT%H:%M:%S.%f%z")
print(converted)
print(converted.tzinfo)

# timezone変換
# timezoneインスタンスの生成
jst_timezone = timezone(timedelta(hours=9))
jst = converted.astimezone(jst_timezone)
print(jst)
print(jst.tzinfo)
<class 'datetime.datetime'>
2021-09-11 12:45:18.448117+00:00
UTC
2021-09-11 21:45:18.448117+09:00
UTC+09:00

datetime -> str

import datetime

now = datetime.datetime.now()
print(now)
print(now.strftime("%Y%m%d"))
2023-12-12 23:49:39.010985
20231212

int -> datetime -> str

import sys
import datetime

def convert_unixtime_to_hhmm(unixtime):
    time = datetime.datetime.fromtimestamp(unixtime)
    print(time)
    formatted_time = time.strftime("%H:%M")
    return formatted_time

if __name__ == '__main__':
    args = sys.argv

    print(convert_unixtime_to_hhmm(int(args[1])/1000000))
    
    # 正しくはないが下記でも動く。
    # print(convert_unixtime_to_hhmm(int(str(args[1][0:10]))))
$ python app.py 1704206837620459
2024-01-02 23:47:17.620459
23:47

日付計算

import datetime

now = datetime.datetime.now()
print(now)
print(now + datetime.timedelta(days=1))
2023-12-12 23:48:52.882272
2023-12-13 23:48:52.882272

型の判定

if type(value) is str:
    # 

参考

APIを実行する

import requests

def getURL(organization:str, project:str) -> str:
    return f'https://dev.azure.com/{organization}/{project}/_apis/git/repositories?api-version=7.0'

if __name__ == '__main__':
    # 起動引数の取得
    # args = sys.argv
    # args[0]はpythonのファイル名。
    # 実際の引数はargs[1]から。

    result = requests.get(
        getURL(
            ImportEnvKeyEnum.ORGANIZATION.value,
            ImportEnvKeyEnum.PROJECT.value),
        auth=('git', ImportEnvKeyEnum.Token.value)).json()
    
    logger.info(result)
    
    for repo in result['value']:
        print(repo['id'])

参考

部分配列の取得

先頭を取得する

l = [1,2,3,4,5]
part = l[:3]
print(part)
[1, 2, 3]

末尾を取得する

l = [1,2,3,4,5]
part = l[3:]
print(part)
[4, 5]

間を取得する

l = [1,2,3,4,5]
part = l[1:3]
print(part)
[2, 3]

@staticmethodと@classmethodの違い

ChatGPT-3.5に下記の質問をした。

Pythonで@staticmethodと@classmethodの違いを教えてください。

回答

@staticmethod@classmethodは、Pythonにおけるクラスメソッドの異なる種類を示すデコレータです。両者は、クラス自体から呼び出すことができるメソッドを定義するために使用されますが、異なる動作を持っています。

  1. @staticmethod(静的メソッド):
    • クラスのインスタンスとは関係なく、クラス自体から呼び出すことができるメソッドです。
    • インスタンス変数やクラス変数へのアクセスは直接行えません。そのため、通常の関数として使用されることがあります。
    • 引数に selfcls を含まないため、メソッド内でこれらのキーワードを使用することはありません。
class MyClass:
    @staticmethod
    def static_method():
        print("This is a static method.")

# クラスから直接呼び出し
MyClass.static_method()
  1. @classmethod(クラスメソッド):
    • クラス自体と関連がありますが、クラス変数へのアクセスが可能です。
    • 引数の1つ目に cls(クラス自体)が渡され、クラス変数やメソッドにアクセスするために使用されます。
    • サブクラスからも継承されるため、オーバーライドすることもできます。
class MyClass:
    class_variable = 10
    
    @classmethod
    def class_method(cls):
        print(f"This is a class method. Class variable: {cls.class_variable}")

# クラスから呼び出し
MyClass.class_method()

使い分け:

どちらのデコレータも、クラスのインスタンスを作成せずに直接クラスから呼び出せる点が共通しています。

依存モジュールの一覧化

取得

ファイル名は何でもいいが、requirements.txtにするのが通例。

pip freeze > requirements.txt

導入

pip install -r requirements.txt

venv(仮想環境)

仮想環境構築

python3 -m venv ${環境名}

環境名は「venv」とかでOK。

アクティベート

アクティベートすると「${環境名}」が表示される。

Linux

source ${環境名}/bin/activate

Windows

.\${環境名}\Scripts\activate

デアクティベート

deactivate

参考

virtualenv

pip install -U pip
pip install --user --upgrade virtualenv
virtualenv aip-env
source aip-env/bin/activate

参考

format

x = 1
print(f"いまはこの表記が主流。 x = {x}")

for

# -*- coding: utf-8 -*-

dic = {0:'hoge', 1:'piyo'}

# キーを取得
for k in dic:
    print('k : {}'.format(k))
    
# 値だけを取得
for v in dic.values():
    print('v : {}'.format(v))

# 両方取得
for (k, v) in dic.items():
    print('k : {} , v : {}'.format(k,v))
k : 0
k : 1
v : hoge
v : piyo
k : 0 , v : hoge
k : 1 , v : piyo

重複排除(list -> set)

deploy_jobs = []

deploy_jobs.append(('batch','ST0'))
deploy_jobs.append(('batch','IKR01'))
deploy_jobs.append(('batch','ST0'))

print(set(deploy_jobs))

メソッド呼び出し時に副作用的に別の処理を呼び出す

# -*- coding: utf-8 -*-
from logging import getLogger, config
import os
from logutil import LogUtil

PYTHON_APP_HOME = os.getenv('PYTHON_APP_HOME')
LOG_CONFIG_FILE = ['config', 'log_config.json']

log_conf = LogUtil.get_log_conf(os.path.join(PYTHON_APP_HOME, *LOG_CONFIG_FILE))
config.dictConfig(log_conf)

class SampleController():
    def __init__(self) -> None:
        pass

    def print_log_info_only(self) -> None:
        logger = getLogger("controller.SampleController.print_log_info_only")
        logger.info("This is an INFO log")
        logger.debug("This DEBUG log will not be shown")

    def print_log_debug(self) -> None:
        logger = getLogger("controller.SampleController.print_log_debug")
        logger.info("This is an INFO log")
        logger.debug("This is a DEBUG log")

デコレータを使う

from functools import wraps
from logging import getLogger, config
import os

def dynamic_logger(logger_name):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            logger = getLogger(logger_name)
            setattr(args[0], 'logger', logger)  # インスタンスにロガーを設定
            return func(*args, **kwargs)
        return wrapper
    return decorator


class SampleController():
    def __init__(self):
        pass

    @dynamic_logger("controller.SampleController.print_log_info_only")
    def print_log_info_only(self):
        self.logger.info("This is an INFO log")
        self.logger.debug("This DEBUG log will not be shown")

    @dynamic_logger("controller.SampleController.print_log_debug")
    def print_log_debug(self):
        self.logger.info("This is an INFO log")
        self.logger.debug("This is a DEBUG log")

クラス全体にデコレータを適用する

def apply_logger(cls):
    for attr_name, attr_value in cls.__dict__.items():
        if callable(attr_value):  # メソッドかどうか確認
            logger_name = f"{cls.__name__}.{attr_name}"
            decorated = dynamic_logger(logger_name)(attr_value)
            setattr(cls, attr_name, decorated)
    return cls


@apply_logger
class SampleController():
    def __init__(self):
        pass

    def print_log_info_only(self):
        self.logger.info("This is an INFO log")
        self.logger.debug("This DEBUG log will not be shown")

    def print_log_debug(self):
        self.logger.info("This is an INFO log")
        self.logger.debug("This is a DEBUG log")

__getattribute__を使用する

def apply_logger(cls):
    for attr_name, attr_value in cls.__dict__.items():
        if callable(attr_value):  # メソッドかどうか確認
            logger_name = f"{cls.__name__}.{attr_name}"
            decorated = dynamic_logger(logger_name)(attr_value)
            setattr(cls, attr_name, decorated)
    return cls


@apply_logger
class SampleController():
    def __init__(self):
        pass

    def print_log_info_only(self):
        self.logger.info("This is an INFO log")
        self.logger.debug("This DEBUG log will not be shown")

    def print_log_debug(self):
        self.logger.info("This is an INFO log")
        self.logger.debug("This is a DEBUG log")

マルチプロセス

ProcessPoolExecutor

import csv
from concurrent.futures import ProcessPoolExecutor

replace_dict_1 = {
    "A" : "hoge",
    "B" : "fuga",
    "C" : "piyo",
    "D" : "foo",
    "E" : "bar",
    "F" : "baz"
}
replace_dict_3 = {
    1 : "one",
    2 : "two",
    3 : "three",
    4 : "four",
    5 : "five",
    6 : "six",
    7 : "seven",
    8 : "eight",
    9 : "nine",
    10 : "ten"
}

def process_row(row):
    # Replace the first column using replace_dict_1
    row[0] = replace_dict_1.get(row[0], row[0])
    # Replace the third column using replace_dict_3
    try:
        key = int(row[2])
        row[2] = replace_dict_3.get(key, row[2])
    except ValueError:
        pass
    return row

with open('list.tsv', 'r', newline='') as f:
    reader = csv.reader(f, delimiter='\t')
    rows = list(reader)

with ProcessPoolExecutor() as executor:
    processed_rows = list(executor.map(process_row, rows))

print(processed_rows)

配列をマルチプロセスで処理する

# -*- coding: utf-8 -*-
from multiprocessing import Pool

SQUARE_COUNT = 10

def square(i):
    return i*i

if __name__ == '__main__':
    int_list = list(range(SQUARE_COUNT))
    
    print(f'int_list : {int_list}')
    
    # 第1引数に関数、第2引数に処理する配列を受ける
    with Pool(4) as pool:
        result = pool.map(square, int_list)
        
    print(result)
int_list : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

配列ではないがマルチプロセスで処理する

# -*- coding: utf-8 -*-
from multiprocessing import Pool
import random
import datetime 

RANDOM_SQUARE_COUNT = 10000000

def square(i):
    return random.random() * random.random()

def multi_process_performance(process_count):
    start = datetime.datetime.now()
    with Pool(process_count) as pool:
        pool.map(square, range(RANDOM_SQUARE_COUNT))
    finish = datetime.datetime.now()
    print(f'process {process_count} , time : {finish - start}')

if __name__ == '__main__':
    multi_process_performance(1)
    multi_process_performance(4)
    multi_process_performance(100)

process 1 , time : 0:00:03.838766
process 4 , time : 0:00:00.768664
process 100 , time : 0:00:00.709970

unittest

テスト実行前にメソッドを実行する

import unittest

class TestSampleClass(unittest.TestClass):
    @classmethod
    def setUpClass(cls):
        # ここにやりたいことを書く

boto3

サービスのclientを取得する

pyenv

インストール可能なバージョン一覧

pyenv install -l | less

特定バージョンのPythonインストール

pyenv install ${python_version}

参考

インストール済みバージョンの一覧

pyenv versions

切り替え

pyenv local ${python_version}

アンインストール

pyenv uninstall ${python_version}

切り替わらないとき

set by PYENV_VERSION environment variable

$ pyenv version
3.10.8 (set by PYENV_VERSION environment variable)

PYENV_VERSIONが優先されるため、切り替わらない。
下記を実行する。

pyenv shell --unset
参考

sqlite3

バインドする

cur.execute("select * from hoge_table where id = ?",(id))
cur.execute("select * from hoge_table where id = :id",({"id": id}))

ラムダ式

配列から特定要素を削除する

# 空文字を削除
result_list = list(filter(lambda s: s != '', origin_list))

フォーマットを読み込んで置換する

至って普通。

config.json

{
    "format" : "$hoge$"
}

app.py

import json

# ファイルパス
file_path = "config.json"

# ファイルを読み込む
with open(file_path, "r") as file:
    config_data = json.load(file)

# 読み込んだデータを表示する
print(config_data)
print(config_data["format"].replace("$hoge$", "fuga"))
{'format': '$hoge$'}
fuga

グラフを生成する

ssh接続する

paramikoを使用する。
(実装例は使用する機会が合った時に記載する。)

mainの戻り値を指定する

基本的に0が正常終了。

import sys

def main():
    try:
        v1 = sys.argv[1]
        v2 = sys.argv[2]
        sys.exit(0)
    except IndexError:
        sys.exit("引数が足りません")

    # ここに処理を記述する

if __name__ == "__main__":
    main()

pandas

oracledb

selectを投げる

import oracledb

try:
    with oracledb.connect(
        user=,
        password=,
        dsn='IP:Port/接続名') as connection:

        with connection.cursor() as cursor:
            for r in cursor.execute(sql, parameter_dict):
                # r[0] : 1項目
                # r[1] : 2項目