Python
- Python
- CSV読み込み
- 普通のファイル読み込み
- csv -> json
- jsonを読み込む
- dict -> jsonファイル出力する
- ディレクトリ配下を再帰的に検索する
- __init__をオーバーロードする
- dataclassの初期化
- Enumの実装例
- Enum/dotenv
- dataclassのsort
- クラス宣言とコンストラクタ
- 配列の展開
- 文字列の配列を何らかの文字列で連結する
- str -> int -> bool
- datetime
- 型の判定
- APIを実行する
- 部分配列の取得
- @staticmethodと@classmethodの違い
- 依存モジュールの一覧化
- venv(仮想環境)
- virtualenv
- format
- for
- 重複排除(list -> set)
- メソッド呼び出し時に副作用的に別の処理を呼び出す
- マルチプロセス
- unittest
- boto3
- pyenv
- sqlite3
- ラムダ式
- フォーマットを読み込んで置換する
- グラフを生成する
- ssh接続する
- mainの戻り値を指定する
- pandas
- oracledb
CSV読み込み
# -*- coding: utf-8 -*-
import csv
import sys
from model import ReleaseModel
from util import Util
if __name__ == '__main__':
args = sys.argv
filepath = args[1]
release_list = []
with open(filepath, encoding='utf8', newline='\n') as f:
csvreader = csv.reader(f, quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
for row in csvreader:
model = ReleaseModel(row)
release_list.append(model)
print(model)
for job in Util.get_deploy_job(release_list):
print(f'- {job}')
# -*- coding: utf-8 -*-
from dataclasses import dataclass
@dataclass
class ReleaseModel:
title: str
repository: str
deploy: set
def __init__(self, line: list):
index = 0
self.title = line[index]; index += 1
self.repository = line[index]; index += 1
self.deploy = line[index].split('|'); index += 1
"title1","batch","ST0|IKR01|IKR03"
"title2","print","ST0|IKR01|IKR03"
"title3","batch","ST0|IKR01|SIT01"
"title4","webapps","ST0|IKR01|SIT01|ST101"
実行結果
ReleaseModel(title='title1', repository='batch', deploy=['ST0', 'IKR01', 'IKR03'])
ReleaseModel(title='title2', repository='print', deploy=['ST0', 'IKR01', 'IKR03'])
ReleaseModel(title='title3', repository='batch', deploy=['ST0', 'IKR01', 'SIT01'])
ReleaseModel(title='title4', repository='webapps', deploy=['ST0', 'IKR01', 'SIT01', 'ST101'])
- batch-IKR01
- batch-IKR03
- batch-SIT01
- batch-ST0
- print-IKR01
- print-IKR03
- print-ST0
- webapps-IKR01
- webapps-SIT01
- webapps-ST0
- webapps-ST101
- webapps-eob-IKR01
- webapps-eob-SIT01
- webapps-eob-ST0
- webapps-eob-ST101
Util.py
# -*- coding: utf-8 -*-
# from model import ReleaseModel
class Util:
@staticmethod
def get_deploy_job(relase_list: list):
job_list = []
for release in relase_list:
for env in release.deploy:
job_list.append((release.repository, env))
if release.repository == 'webapps':
job_list.append((release.repository + '-eob', env))
return_list = []
for job in set(job_list):
return_list.append(job[0] + '-' + job[1])
return_list.sort()
return return_list
ヘッダ行を読み飛ばす
csvreader = csv.reader(f, quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
next(csvreader)
普通のファイル読み込み
def import_format():
return_list = []
with open(FORMAT_PATH, 'r') as f:
# readlineとか、readlines等の関数もあるが、末尾に改行コードが残る(仕様)。
# 改行を除去したいため、下記を使用する。
for line in f.read().splitlines():
return_list.append(line)
return return_list
csv -> json
pandasを使用する。
(当初はcsvに重複する項目があった場合、json側はリストにするつもりだったが未確認。)
data.csv
a,b,b,c
hoge,piyo,1,fuga
aaa,bb,2,ccc
import pandas as pd
df_data = pd.read_csv('data.csv')
df_data.to_json('data.json', orient='records')
data.json
[{"a":"hoge","b":"piyo","b.1":1,"c":"fuga"},{"a":"aaa","b":"bb","b.1":2,"c":"ccc"}]
csvもどきを読み込む
こういうやつだが・・・
こんなファイルを作ってはいけない!!
これはCSVではない!!
作りたいのであれば別ファイルに分けるべき。
PATTERN_A
a,b,b,c
hoge,piyo,1,fuga
aaa,bb,2,ccc
PATTERN_B
a,a,b,c,c
hoge,1,piyo,fuga,2
jsonを読み込む
dict
import json
with open(REPLACE_DICT_PATH, 'r') as d:
replace_dict = json.load(d)
dataclass
Simple
{
"id" : 1,
"name" : "hogehoge"
}
from dataclasses import dataclass
@dataclass
class SimpleModel:
id: int
name: str
from model SimpleModel
import json
with open("simple.json", "r") as fp:
json_data = json.load(fp)
print(SimpleModel(**json_data))
List
[
{
"id" : 1,
"name" : "top",
"sub" : [
{
"id" : "A1",
"name" : "sub_A1"
},
{
"id" : "A2",
"name" : "sub_A2"
}
]
},
{
"id" : 2,
"name" : "second",
"sub" : [
{
"id" : "B1",
"name" : "sub_B1"
},
{
"id" : "B2",
"name" : "sub_B2"
}
]
}
]
from dataclasses import dataclass, field
from dataclasses_json import dataclass_json
from typing import List
@dataclass
class SubModel:
id: str
name: str
value: str
@dataclass
class JsonModel:
id: int
name: str
sub: List[SubModel] = field(default_factory=list)
from model import JsonModel
import json
with open("sample.json", "r") as fp:
json_data = json.load(fp=fp)
print([JsonModel.from_dict(model) for model in json_data])
dict -> jsonファイル出力する
import json
dict_datas = {}
# dict_datasに値を詰める(省略)
with open('data.json', 'w') as f:
json.dump(dict_datas, f, ensure_ascii=False)
# ensure_asciiは文字をエスケープするかどうかの設定。Falseでしない。
ディレクトリ配下を再帰的に検索する
import glob
files = glob.glob("./**/*.py", recursive=True)
for file in files:
print(file)
__init__をオーバーロードする
Pythonの場合、__init__は1クラスに1つしか作れないため、オーバーロードできない。
@dataclass
をつけた場合、__init__
メソッドが生成されるが、CSV -> dataclassしたい場合は、listで渡したい場合もある。
from dataclasses import dataclass
@dataclass
class SampleDataClass:
id: str
hoge: str
def sets(self, line: list):
index = 0
self.id = list[index]; index += 1
self.hoge = list[index]; index += 1
dataclassの初期化
dataclass
は初期値がない場合は、引数なしでコンストラクタを呼び出せない。
なるべく初期値は設定したほうが良い。
from dataclasses import dataclass
@dataclass
class HogeModel():
id: int = 0
v: str = None
Enumの実装例
Enum/dotenv
pip install python-dotenv
src
.env
HOGE=hoge
config.py
from dotenv import load_dotenv
load_dotenv()
import os
from enum import Enum
class SampleEnum(Enum):
HOGE = os.getenv("HOGE")
app.py
from config import SampleEnum
print(SampleEnum.HOGE)
print(SampleEnum.HOGE.value)
実行結果
$ python app.py
SampleEnum.HOGE
hoge
参考
dataclassのsort
from operator import attrgetter
# ageはdataclassのメンバ変数名。
sorted(person_list, key=attrgetter('age'))
クラス宣言とコンストラクタ
class SampleClass:
def __init__(self, v):
self.v = v
配列の展開
ソース
# -*- coding: utf-8 -*-
import itertools
def unpack_01(original):
""" もっといい書き方があると思うんだけど・・・"""
unpacked = []
for l in original:
for i in l:
unpacked.append(i)
return unpacked
def unpack_02(original):
return list(itertools.chain(*original))
def unpack_03(original):
"""あとから展開したものを追加するだけならこれでOK。"""
return_list = []
for l in original:
return_list.extend(l)
return return_list
def main():
original = [[1,2],[3,4],[5,6]]
print(original)
print(unpack_01(original))
print(unpack_02(original))
print(unpack_03(original))
if __name__ == '__main__':
main()
実行結果
[[1, 2], [3, 4], [5, 6]]
[1, 2, 3, 4, 5, 6]
[1, 2, 3, 4, 5, 6]
参考
文字列の配列を何らかの文字列で連結する
','.join(str_list)
str -> int -> bool
あくまで整数。
値 | True/False |
---|---|
1 | True |
0 | False |
上記以外 | True |
実行例
import sys
def main(args):
i = int(args[1]);
if i:
print(True)
else:
print(False)
if __name__ == '__main__':
main(sys.argv)
$ python app.py 1
True
$ python app.py 0
False
$ python app.py 2
True
$ python app.py -1
True
datetime
str -> datetime , timezone
from datetime import datetime, timezone, timedelta
converted = datetime.strptime("2021-09-11T12:45:18.448117+00:00", "%Y-%m-%dT%H:%M:%S.%f%z")
print(converted)
print(converted.tzinfo)
# timezone変換
# timezoneインスタンスの生成
jst_timezone = timezone(timedelta(hours=9))
jst = converted.astimezone(jst_timezone)
print(jst)
print(jst.tzinfo)
<class 'datetime.datetime'>
2021-09-11 12:45:18.448117+00:00
UTC
2021-09-11 21:45:18.448117+09:00
UTC+09:00
datetime -> str
import datetime
now = datetime.datetime.now()
print(now)
print(now.strftime("%Y%m%d"))
2023-12-12 23:49:39.010985
20231212
int -> datetime -> str
import sys
import datetime
def convert_unixtime_to_hhmm(unixtime):
time = datetime.datetime.fromtimestamp(unixtime)
print(time)
formatted_time = time.strftime("%H:%M")
return formatted_time
if __name__ == '__main__':
args = sys.argv
print(convert_unixtime_to_hhmm(int(args[1])/1000000))
# 正しくはないが下記でも動く。
# print(convert_unixtime_to_hhmm(int(str(args[1][0:10]))))
$ python app.py 1704206837620459
2024-01-02 23:47:17.620459
23:47
日付計算
import datetime
now = datetime.datetime.now()
print(now)
print(now + datetime.timedelta(days=1))
2023-12-12 23:48:52.882272
2023-12-13 23:48:52.882272
型の判定
if type(value) is str:
#
参考
APIを実行する
requests
をinstallする。- Azure DevOpsのリポジトリIDを取得するサンプル。
import requests
def getURL(organization:str, project:str) -> str:
return f'https://dev.azure.com/{organization}/{project}/_apis/git/repositories?api-version=7.0'
if __name__ == '__main__':
# 起動引数の取得
# args = sys.argv
# args[0]はpythonのファイル名。
# 実際の引数はargs[1]から。
result = requests.get(
getURL(
ImportEnvKeyEnum.ORGANIZATION.value,
ImportEnvKeyEnum.PROJECT.value),
auth=('git', ImportEnvKeyEnum.Token.value)).json()
logger.info(result)
for repo in result['value']:
print(repo['id'])
参考
部分配列の取得
先頭を取得する
l = [1,2,3,4,5]
part = l[:3]
print(part)
[1, 2, 3]
末尾を取得する
l = [1,2,3,4,5]
part = l[3:]
print(part)
[4, 5]
間を取得する
l = [1,2,3,4,5]
part = l[1:3]
print(part)
[2, 3]
@staticmethodと@classmethodの違い
ChatGPT-3.5に下記の質問をした。
Pythonで@staticmethodと@classmethodの違いを教えてください。
回答
@staticmethod
と@classmethod
は、Pythonにおけるクラスメソッドの異なる種類を示すデコレータです。両者は、クラス自体から呼び出すことができるメソッドを定義するために使用されますが、異なる動作を持っています。
@staticmethod
(静的メソッド):- クラスのインスタンスとは関係なく、クラス自体から呼び出すことができるメソッドです。
- インスタンス変数やクラス変数へのアクセスは直接行えません。そのため、通常の関数として使用されることがあります。
- 引数に
self
やcls
を含まないため、メソッド内でこれらのキーワードを使用することはありません。
class MyClass:
@staticmethod
def static_method():
print("This is a static method.")
# クラスから直接呼び出し
MyClass.static_method()
@classmethod
(クラスメソッド):- クラス自体と関連がありますが、クラス変数へのアクセスが可能です。
- 引数の1つ目に
cls
(クラス自体)が渡され、クラス変数やメソッドにアクセスするために使用されます。 - サブクラスからも継承されるため、オーバーライドすることもできます。
class MyClass:
class_variable = 10
@classmethod
def class_method(cls):
print(f"This is a class method. Class variable: {cls.class_variable}")
# クラスから呼び出し
MyClass.class_method()
使い分け:
@staticmethod
は、クラスと密接な関係がなく、単に名前空間を提供するためのメソッドに適しています。@classmethod
は、クラスのコンテキストを必要とするメソッド(例えば、ファクトリーメソッド)や、クラス変数にアクセスが必要なメソッドに適しています。
どちらのデコレータも、クラスのインスタンスを作成せずに直接クラスから呼び出せる点が共通しています。
依存モジュールの一覧化
取得
ファイル名は何でもいいが、requirements.txtにするのが通例。
pip freeze > requirements.txt
導入
pip install -r requirements.txt
venv(仮想環境)
仮想環境構築
python3 -m venv ${環境名}
環境名は「venv」とかでOK。
アクティベート
アクティベートすると「${環境名}」が表示される。
Linux
source ${環境名}/bin/activate
Windows
.\${環境名}\Scripts\activate
デアクティベート
deactivate
参考
virtualenv
pip install -U pip
pip install --user --upgrade virtualenv
virtualenv aip-env
source aip-env/bin/activate
参考
format
x = 1
print(f"いまはこの表記が主流。 x = {x}")
for
# -*- coding: utf-8 -*-
dic = {0:'hoge', 1:'piyo'}
# キーを取得
for k in dic:
print('k : {}'.format(k))
# 値だけを取得
for v in dic.values():
print('v : {}'.format(v))
# 両方取得
for (k, v) in dic.items():
print('k : {} , v : {}'.format(k,v))
k : 0
k : 1
v : hoge
v : piyo
k : 0 , v : hoge
k : 1 , v : piyo
重複排除(list -> set)
deploy_jobs = []
deploy_jobs.append(('batch','ST0'))
deploy_jobs.append(('batch','IKR01'))
deploy_jobs.append(('batch','ST0'))
print(set(deploy_jobs))
メソッド呼び出し時に副作用的に別の処理を呼び出す
# -*- coding: utf-8 -*-
from logging import getLogger, config
import os
from logutil import LogUtil
PYTHON_APP_HOME = os.getenv('PYTHON_APP_HOME')
LOG_CONFIG_FILE = ['config', 'log_config.json']
log_conf = LogUtil.get_log_conf(os.path.join(PYTHON_APP_HOME, *LOG_CONFIG_FILE))
config.dictConfig(log_conf)
class SampleController():
def __init__(self) -> None:
pass
def print_log_info_only(self) -> None:
logger = getLogger("controller.SampleController.print_log_info_only")
logger.info("This is an INFO log")
logger.debug("This DEBUG log will not be shown")
def print_log_debug(self) -> None:
logger = getLogger("controller.SampleController.print_log_debug")
logger.info("This is an INFO log")
logger.debug("This is a DEBUG log")
デコレータを使う
from functools import wraps
from logging import getLogger, config
import os
def dynamic_logger(logger_name):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
logger = getLogger(logger_name)
setattr(args[0], 'logger', logger) # インスタンスにロガーを設定
return func(*args, **kwargs)
return wrapper
return decorator
class SampleController():
def __init__(self):
pass
@dynamic_logger("controller.SampleController.print_log_info_only")
def print_log_info_only(self):
self.logger.info("This is an INFO log")
self.logger.debug("This DEBUG log will not be shown")
@dynamic_logger("controller.SampleController.print_log_debug")
def print_log_debug(self):
self.logger.info("This is an INFO log")
self.logger.debug("This is a DEBUG log")
クラス全体にデコレータを適用する
def apply_logger(cls):
for attr_name, attr_value in cls.__dict__.items():
if callable(attr_value): # メソッドかどうか確認
logger_name = f"{cls.__name__}.{attr_name}"
decorated = dynamic_logger(logger_name)(attr_value)
setattr(cls, attr_name, decorated)
return cls
@apply_logger
class SampleController():
def __init__(self):
pass
def print_log_info_only(self):
self.logger.info("This is an INFO log")
self.logger.debug("This DEBUG log will not be shown")
def print_log_debug(self):
self.logger.info("This is an INFO log")
self.logger.debug("This is a DEBUG log")
__getattribute__
を使用する
def apply_logger(cls):
for attr_name, attr_value in cls.__dict__.items():
if callable(attr_value): # メソッドかどうか確認
logger_name = f"{cls.__name__}.{attr_name}"
decorated = dynamic_logger(logger_name)(attr_value)
setattr(cls, attr_name, decorated)
return cls
@apply_logger
class SampleController():
def __init__(self):
pass
def print_log_info_only(self):
self.logger.info("This is an INFO log")
self.logger.debug("This DEBUG log will not be shown")
def print_log_debug(self):
self.logger.info("This is an INFO log")
self.logger.debug("This is a DEBUG log")
マルチプロセス
ProcessPoolExecutor
import csv
from concurrent.futures import ProcessPoolExecutor
replace_dict_1 = {
"A" : "hoge",
"B" : "fuga",
"C" : "piyo",
"D" : "foo",
"E" : "bar",
"F" : "baz"
}
replace_dict_3 = {
1 : "one",
2 : "two",
3 : "three",
4 : "four",
5 : "five",
6 : "six",
7 : "seven",
8 : "eight",
9 : "nine",
10 : "ten"
}
def process_row(row):
# Replace the first column using replace_dict_1
row[0] = replace_dict_1.get(row[0], row[0])
# Replace the third column using replace_dict_3
try:
key = int(row[2])
row[2] = replace_dict_3.get(key, row[2])
except ValueError:
pass
return row
with open('list.tsv', 'r', newline='') as f:
reader = csv.reader(f, delimiter='\t')
rows = list(reader)
with ProcessPoolExecutor() as executor:
processed_rows = list(executor.map(process_row, rows))
print(processed_rows)
配列をマルチプロセスで処理する
# -*- coding: utf-8 -*-
from multiprocessing import Pool
SQUARE_COUNT = 10
def square(i):
return i*i
if __name__ == '__main__':
int_list = list(range(SQUARE_COUNT))
print(f'int_list : {int_list}')
# 第1引数に関数、第2引数に処理する配列を受ける
with Pool(4) as pool:
result = pool.map(square, int_list)
print(result)
int_list : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
配列ではないがマルチプロセスで処理する
# -*- coding: utf-8 -*-
from multiprocessing import Pool
import random
import datetime
RANDOM_SQUARE_COUNT = 10000000
def square(i):
return random.random() * random.random()
def multi_process_performance(process_count):
start = datetime.datetime.now()
with Pool(process_count) as pool:
pool.map(square, range(RANDOM_SQUARE_COUNT))
finish = datetime.datetime.now()
print(f'process {process_count} , time : {finish - start}')
if __name__ == '__main__':
multi_process_performance(1)
multi_process_performance(4)
multi_process_performance(100)
process 1 , time : 0:00:03.838766
process 4 , time : 0:00:00.768664
process 100 , time : 0:00:00.709970
unittest
テスト実行前にメソッドを実行する
import unittest
class TestSampleClass(unittest.TestClass):
@classmethod
def setUpClass(cls):
# ここにやりたいことを書く
boto3
サービスのclientを取得する
pyenv
インストール可能なバージョン一覧
pyenv install -l | less
特定バージョンのPythonインストール
pyenv install ${python_version}
参考
- pyenv:github
- インストールに失敗したら参照する。
インストール済みバージョンの一覧
pyenv versions
切り替え
pyenv local ${python_version}
アンインストール
pyenv uninstall ${python_version}
切り替わらないとき
set by PYENV_VERSION environment variable
$ pyenv version
3.10.8 (set by PYENV_VERSION environment variable)
PYENV_VERSIONが優先されるため、切り替わらない。
下記を実行する。
pyenv shell --unset
参考
sqlite3
バインドする
cur.execute("select * from hoge_table where id = ?",(id))
cur.execute("select * from hoge_table where id = :id",({"id": id}))
ラムダ式
配列から特定要素を削除する
# 空文字を削除
result_list = list(filter(lambda s: s != '', origin_list))
フォーマットを読み込んで置換する
至って普通。
config.json
{
"format" : "$hoge$"
}
app.py
import json
# ファイルパス
file_path = "config.json"
# ファイルを読み込む
with open(file_path, "r") as file:
config_data = json.load(file)
# 読み込んだデータを表示する
print(config_data)
print(config_data["format"].replace("$hoge$", "fuga"))
{'format': '$hoge$'}
fuga
グラフを生成する
ssh接続する
paramikoを使用する。
(実装例は使用する機会が合った時に記載する。)
mainの戻り値を指定する
基本的に0が正常終了。
import sys
def main():
try:
v1 = sys.argv[1]
v2 = sys.argv[2]
sys.exit(0)
except IndexError:
sys.exit("引数が足りません")
# ここに処理を記述する
if __name__ == "__main__":
main()
pandas
oracledb
selectを投げる
import oracledb
try:
with oracledb.connect(
user=,
password=,
dsn='IP:Port/接続名') as connection:
with connection.cursor() as cursor:
for r in cursor.execute(sql, parameter_dict):
# r[0] : 1項目
# r[1] : 2項目