def 小児科医():

かけだし小児科医が仕事の合間にプログラミングを勉強するブログです。

当直表作成プログラムを作りたい話

 

 

言い訳


…お久しぶりになってしまった。

 

言い訳させていただくと、

 

①仕事が忙しい

全国の小児科医の皆様もそうだと思うが6月入ってから感染症の流行がえぐい。

午前外来が午後まで終わらないのは当たり前、病棟も満床が基本。

母数が増えたせいで重症例もちょいちょい。

というわけでお昼休みに勉強してたのが昼飯すら食えず。

 

②学会準備

なんか学会発表が重なった関係で仕事後も永遠にスライド作りに明け暮れる日々(現在進行形)。来月には一旦落ち着くはず。

 

③プログラミングで沼にハマった

これが今回の話。

最近本読んだり講義を聞いてばっかりのインプットばかりだったので、何か作ってみようということに。

何を作っているか悩んでいる時に、隣で上司が当直表作りに頭を抱えていたので、最近勉強した数理最適化を使って自動化できるんじゃね?と。

んで案の定うまくいかなくてウンウン悩んでたらブログのこととか忘れてました。

 

 

①目的関数・制約条件の設定

最終的な目標は

スタッフが1ヶ月の不在日、当直不可日を記入する→1ヶ月分の当直、当番表の出来上がり。

が、まずはスタッフ5人(+非常勤1人)として当直の日だけでも決めれるようにしたい。

というわけで、

 

【目的関数】

各フタッフごとに当直不可日を0、当直可能日を1としたcsvファイルAを作って(ゆくゆくは作るのも自動化したい)、実際に当直する日を1、しない日を0とした当直表Bを作った時に任意のスタッフx、日付yで

A(x,y)-B(x,y)の合計が最小となること

 

【制約条件】

・同じスタッフが2日連続当直にはならない

・各スタッフの当直数の差が1以内

 

でやってみた。初学者が恐ろしいものに手を出してしまったのでそもそもの設定が間違っている可能性はある。

②挫折:最大流問題

とりあえず前回数理最適化を勉強した時、最大流問題がシフトスケジュールと相性良かったのを思い出した。↓

 

defpediatric.hatenablog.com

のでまずは最大流問題でできるかやってみた。

が、当直表作成において最も必要な制約条件「同じスタッフが2日連続当直にはならない」が反映できない。っていうかこの時点で、「あ、思ったよりむずい。」と絶望したのを覚えている。

なんかいろいろ試したり、しまいにはchatGPTに書かせてみたりもしたんだが、こっちの知識不足でうまく指示を伝えられずに何回やっても、

こんな感じ。いや2連直だめって言ってんじゃん。
ここから2週間くらい進まなくなったので、最大流問題はあきらめた。

頭いい人誰か教えてください。

 

 

 

③なんかぐちゃぐちゃやってたらできた:遺伝アルゴリズム

最適化アルゴリズムには向き不向きがあることは勉強したので、次は一番汎用性が高そうな遺伝アルゴリズムでやってみることにした↓

defpediatric.hatenablog.com

結論から言うと、効率がいいかは不明(多分良くはない)が、とりあえず動くものはできた。

 

パラメータ設定
import numpy as np
import pandas as pd
import openpyxl

#希望表の読み込み
"""当直日を決める
不可日を0としてそれ以外を1とする"""
x = pd.read_csv('当直表.csv', index_col=0, parse_dates=[0])

schedule = x.fillna(1)
F_data = x["F"]

#一世代あたりの個体数
n = 10
#突然変異率
mutation_rate = 0.05
#交差確率
crossover_rate = 0.5

#確率調整
probability = [0.2, 0.2, 0.2, 0.2, 0.2,]


#第一世代の作成
staff = ['A','B','C','D','E']

generation_1 = np.array([np.random.choice(staff, len(x.index), p=probability) for _ in range(n)])

 

A〜Eが常勤、Fが非常勤。

非常勤の当直日はこっちから指定できるはずなので、とりあえずA〜Eで作って世代決定の直前に組み込む。

 

各パラメータはググりながら何となくうまくいく数値にした。

 

評価関数
class GATSP:
#評価
def evaluation(generation):

parent_score = np.zeros(generation.shape[0])
for i in range(0, generation.shape[0]):
parent = generation[i, :]
for b in range(len(x.index)):
select_staff = schedule[f"{generation[i, b]}"]
#当直不可日がに当直になっていたらペナルティ2
if select_staff[b] == 0:

parent_score[i] += 2
for a in range(len(x.index) -1):

#2連直になったらペナルティ2
if parent[a] == parent[a + 1]:

parent_score[i] += 2
count_A = np.count_nonzero(parent == 'A')

count_B = np.count_nonzero(parent == 'B')
count_C = np.count_nonzero(parent == 'C')
count_D = np.count_nonzero(parent == 'D')
count_E = np.count_nonzero(parent == 'E')
if count_A > 5:
parent_score[i] +=1
if count_B > 5:

parent_score[i] +=1
if count_C > 5:

parent_score[i] +=1
if count_D > 5:

parent_score[i] +=1
if count_E > 5:

parent_score[i] +=1
return parent_score

 

なんか目的関数と制約条件うまく分けれなかったからペナルティスコアを作って満たせなかったら+ポイントにしてポイント0を目指す方向にした。

なぜかペナルティに重み付けがされてるけ別に0目指すから関係なくね、と今になって思う。

 

交差

# 選択
def selection(generation, parent_score):

sum_score = 0
sup_parents = None
for i in range(len(parent_score)):

sum_score += parent_score[i]
ave = sum_score / len(parent_score)
for i in range(len(parent_score)):
if parent_score[i] <= ave:
if sup_parents is None:
sup_parents = np.array(generation[i])
else:
sup_parents = np.vstack*1
return sup_parents

#交差
def crossover(pair):

if pair.ndim > 1:
for i in range(0, len(pair) - 1):
parent1 = generation[i]
parent2 = generation[i + 1]
for b in range(len(x.index)):
if np.random.rand() < crossover_rate:
crossover_point = b
crossover_1 = parent1[crossover_point]
crossover_2 = parent2[crossover_point]
parent1[crossover_point] = crossover_2
parent2[crossover_point] = crossover_1
add_childs = np.vstack((parent1, parent2))
if 'childs' in locals():
childs = np.vstack((childs, add_childs))
else:
childs = add_childs
else:
childs = np.array([np.random.choice(staff, len(x.index), p=probability) for _ in range(1)])
return childs

ここでの選択は「次世代個体の選択」もそうだけど「交叉と突然変異に使う親の選択」。一応優秀なやつ同士で交差させたほうがいいかなぁと思って。(その結果めっちゃ初期収束したりもした。)

初学者なので当初「一点交差」でやろうとしていたので、死ぬほど効率が悪い、と言うかもう全くの別物ができてしまう状態であった。

んで色々調べた結果「一様交差」を採用。少し計算量が減った。

 

突然変異
#突然変異
def mutation(pair, mutation_rate):

# 個体の各遺伝子に対して突然変異を適用
if pair.ndim > 1:

for i in range(0, pair.shape[0]):
for m in range(len(pair[i])):
if np.random.rand() < mutation_rate:
pair[i, m] = np.random.choice(['A', 'B', 'C', 'D', 'E'])
else:
pair = np.array([np.random.choice(staff, len(x.index), p=probability) for _ in range(1)])
return pair

まぁここは特に。

交差のところでもそうなんだけど世代を経る時に個体数が減ってたまに親がいなくなる(選ばれなくなる)ことがあったので個体数減ったら勝手にランダムなやつが生えるようにしている。多分正しいやり方ではないがうまくいったのでよしとしてしまっている。

 

次世代作成
def new_generation(generation, pair, childs):
generation = np.vstack((generation, pair, childs))
for i in range(len(generation)-1):
for a in range(len(x.index)-1):
if F_data[a] == 1 :
generation[i, a] = 'F'
generation_first = generation[0]

smart_generation = generation_first
for i in range(len(generation) -1):
if (generation[i] != generation[0]).any() and (generation[i] != generation_first).any():
smart_generation = np.vstack((smart_generation, generation[i]))
generation_first = generation[i]
if len(smart_generation.shape) <= 1 :
add_gene = np.array([np.random.choice(staff, len(x.index), p=probability) for _ in range(4)])
smart_generation = np.vstack((smart_generation, add_gene))
evaluation = GATSP.evaluation(smart_generation)

df_generation = pd.DataFrame(smart_generation, index=evaluation)
df_sorted = df_generation.sort_index()
sorted_generation = df_sorted.values
top_generation = sorted_generation[:8]
# 行数からランダムにインデックスを選びます。
randomRows = np.random.choice(smart_generation.shape[0], n-8, replace=False)

# 選んだインデックスの行を抽出します。
other_generation = smart_generation[randomRows, :]

new_generation = np.vstack((top_generation, other_generation))
return new_generation

1世代10個体として、

優秀な親+交差してできた子供+突然変異したやつの中で、スコアが低いtop8を選ぶ。それらは次世代に採用。

残り2枠は敗者復活戦で選ばれなかったやつからランダムで2個選ぶことにした。

あ、ここで非常勤枠を強制入力しています。

 

繰り返し処理
max_iterations = 100  # 最大繰り返し回数を設定
generation = generation_1

for _ in range(max_iterations):
generation_score = GATSP.evaluation(generation)
if _ % 20 == 0:
print(generation_score)
zeroCount = np.count_nonzero(generation_score == 0)
# 0の数があるかどうかを判定します。
if zeroCount >= n:

print(generation_score)
break
pair = GATSP.selection(generation, generation_score)

childs = GATSP.crossover(pair) # 選択された親に対して交叉を行う
new_pair = GATSP.mutation(pair, mutation_rate)

generation = GATSP.new_generation(pair, new_pair, childs) # 交叉で生成された子供を新しい世代の作成に使用する
else:

print("Max iterations reached without finding a perfect score.")

 

今回は個体全てのスコアが0になるまでやってるけどそれでも30世代くらいでできる。一つ作るだけなら5回くらい。

最初は10000世代とかやってもうまくいかなくてパソコンがグイングイン鳴ってた。

ここから制約増やすとうまくいかなくなったりするんだろうな。

 

④今後

とりあえず動くものはできたけど、全然実用化できないので要修正な感じ。

だけどこればっかりやってると機械学習とかの勉強が進まないので、ペースダウンして暇な時にいじるくらいにしたい。

でも勉強したこと使って色々作ってみるのは楽しい。

 

*1:sup_parents, generation[i]