最近工作是在太忙了,无奈,也没空更新博客,职业上也从研发变成了产品,有小半年没写代码了,怕自己手生的不行,给自己两天时间,写了点东西,之前做搞机器学习,搞深度学习,但一直对依赖全场景数据喂模型的方向有点感冒,因为数据又贵又难搞全,企业靠这个发家有点难,且本身需要企业具有很大的体量,另收集数据-训练-部署三板斧就当做AI的自进化说法感觉有点勉强,不谈特定场景妄图一个AI模型解决通用问题的都是大忽悠,咱们还是等待学术界的大佬们能真正提出新的理论来造福业内吧。
一直在想,有没有一种更好适应的算法方向可以不那么依赖数据的,或者说能依赖数据少一些,这样对于一个产品也许有更好的成本控制和生命力加成,因此想了想是不是强化学习有可能,顺势就了解了下,花了几天时间简单学习了下入门皮毛,对最简单的Q-Learning先进行了复现。
设定环境为一个5x5的棋盘,墙壁四周无法通行,碰到炸弹直接死亡,找到宝石就算过关。
关于理论知识和排版我下次总结补充哈,先把我前两天写的代码放在这,先上传个效果:
训练刚开始随机探索,:
训练几十轮之后,基本直奔终点:
代码部分:
环境Env(为啥不使用我之前写的pycharm加pyqt5.0进行环境开发,我表示,忘光了,赶时间,选个最简单tk来用~):
import tkinter as tk from PIL import ImageTk from PIL import Image import time class Env: def __init__(self): self.grid_size = 100 self.win = tk.Tk() self.pic_player, self.pic_diamond, self.pic_boom1, self.pic_boom2, self.pic_boom3, self.pic_boom4 = self.__load_img() self.__init_win() self.canvas = self.__init_rc() self.texts= self.__produce_text() self.canvas.pack() # self._init_test_case() # self.win.mainloop() def __init_win(self): self.win.title('Grid World') # self.win.geometry("500x300") def __init_rc(self): canvas = tk.Canvas(self.win, width=500, height=720, bg='white') for h in range(5): for v in range(5): canvas.create_rectangle(self.grid_size * v, self.grid_size * h, self.grid_size * (v + 1), self.grid_size * (h + 1)) trans_pixel = int(self.grid_size / 2) self.player = canvas.create_image(trans_pixel + self.grid_size * 0, trans_pixel + self.grid_size * 0, image = self.pic_player) self.diamond = canvas.create_image(trans_pixel + self.grid_size * 4, trans_pixel + self.grid_size * 4, image=self.pic_diamond) self.boom1 = canvas.create_image(trans_pixel + self.grid_size * 1, trans_pixel + self.grid_size * 1, image=self.pic_boom1) self.boom2 = canvas.create_image(trans_pixel + self.grid_size * 3, trans_pixel + self.grid_size * 1, image=self.pic_boom2) self.boom3 = canvas.create_image(trans_pixel + self.grid_size * 1, trans_pixel + self.grid_size * 3, image=self.pic_boom3) self.boom4 = canvas.create_image(trans_pixel + self.grid_size * 3, trans_pixel + self.grid_size * 3, image=self.pic_boom4) return canvas def __load_img(self): pic_resize = int (self.grid_size / 2) player = ImageTk.PhotoImage(Image.open("player.png").resize((pic_resize, pic_resize))) diamond = ImageTk.PhotoImage(Image.open("diamond.png").resize((pic_resize, pic_resize))) boom1 = ImageTk.PhotoImage(Image.open('boom.png').resize((pic_resize, pic_resize))) boom2 = ImageTk.PhotoImage(Image.open('boom.png').resize((pic_resize, pic_resize))) boom3 = ImageTk.PhotoImage(Image.open('boom.png').resize((pic_resize, pic_resize))) boom4 = ImageTk.PhotoImage(Image.open('boom.png').resize((pic_resize, pic_resize))) return player, diamond, boom1, boom2, boom3, boom4 def __produce_text(self): texts = [] x = self.grid_size / 2 y = self.grid_size / 6 for h in range(5): for v in range(5): up = self.canvas.create_text(x + h * self.grid_size, y + v * self.grid_size,text=0) down = self.canvas.create_text(x + h *self.grid_size, self.grid_size - y + v * self.grid_size, text = 0) left = self.canvas.create_text(y + h*self.grid_size, x + v*self.grid_size,text = 0) right = self.canvas.create_text(self.grid_size-y + h*self.grid_size, x + v*self.grid_size,text = 0) texts.append({"up": up, "down": down, "left": left, "right": right}) return texts def _win_d_update(self): self.win.update() time.sleep(0.1) class GridWorld(Env): def __init__(self): super().__init__() self._win_d_update() def player_move(self, x, y): # x横向移动向右,y纵向移动向下 self.canvas.move(self.player, x * self.grid_size, y*self.grid_size) self._win_d_update() def reset(self): # 重置为起始位置 x, y = self.canvas.coords(self.player) self.canvas.move(self.player, -x + self.grid_size/2, -y + self.grid_size/2) self._win_d_update() return self.get_state(self.player) def get_state(self, who): x, y = self.canvas.coords(who) state = [int(x/self.grid_size), int(y/self.grid_size)] return state def update_val(self, num, arrow, val): pos = num[0] * 5 + num[1] x, y = self.canvas.coords(self.texts[pos][arrow]) self.canvas.delete(self.texts[pos][arrow]) self.texts[pos][arrow] = self.canvas.create_text(x, y, text=val) # self._win_d_update() def exec_calc(self, action): # 执行一次决策 feedback = 'alive' # alive, stop, dead 分别对应通过,撞墙,炸死 next_state = [] next_h, next_v, reward = 0.0, 0.0, 0.0 h, v = self.get_state(self.player) if action == 0: # up next_h = h next_v = v - 1 # self.player_move(0, -1) elif action == 1: # down next_h = h next_v = v + 1 # self.player_move(0, 1) elif action == 2: # left next_h = h - 1 next_v = v # self.player_move(-1, 0) elif action == 3: # right next_h = h + 1 next_v = v # self.player_move(1, 0) else: print('programmer bug ...') next_state = [next_h, next_v] boom1, boom2, boom3, boom4 = self.get_state(self.boom1), self.get_state(self.boom2), self.get_state( self.boom3), self.get_state(self.boom4) diamond = self.get_state(self.diamond) if next_h < 0 or next_v < 0 or next_h > 4 or next_v >4: # 超过边界 reward = -1 feedback = 'stop' elif next_state == boom1 or next_state == boom2 or next_state == boom3 or next_state == boom4: # 炸弹区域 reward = -100 feedback = 'dead' elif next_state == diamond: # 获得的通关物品 reward = 500 else: reward = 0 return feedback, next_state, reward def update_view(self, state, action, next_state, q_val): action_list = ['up', 'down', 'left', 'right'] self.player_move(next_state[0]-state[0], next_state[1]-state[1]) self.update_val(state, action_list[action], round(q_val, 2)) def attach(self): # 到达终点,返回True , 未到达,返回False return str(self.get_state(self.player)) == str(self.get_state(self.diamond))智能体Agent代码:
import numpy as np import env class Agent: def __init__(self): self.actions = [0, 1, 2, 3] # up down left right self.q_table = dict() self.__init_q_table() self.epsilon = 0.1 self.learning_rate = 0.1 self.gamma = 0.8 # print(self.q_table) def __init_q_table(self): for v in range(5): for h in range(5): self.q_table[str([h, v])] = [0.0, 0.0, 0.0, 0.0] def get_action(self, state): # 根据状态选取下一个动作,但不对无法通过的区域进行选取 action_list = self.q_table[str(state)] pass_action_index = [] for index, val in enumerate(action_list): if val >= 0: pass_action_index.append(index) # 使用epsilon greedy来进行动作选取 if np.random.rand() <= self.epsilon: # 进行探索 return np.random.choice(pass_action_index) else: # 直接选取q最大值 max_val = action_list[pass_action_index[0]] max_list = [] for i in pass_action_index: # 最大值相同且不止一个则随机选个最大值 if max_val < action_list[i]: max_list.clear() max_val = action_list[i] max_list.append(i) elif max_val == action_list[i]: max_list.append(i) return np.random.choice(max_list) def update_q_table(self, feedback, state, action, reward, next_state): # Q(s,a) = Q(s,a) + lr * { reward + gamma * max[Q(s`,a`)] - Q(s,a) } q_s_a = self.q_table[str(state)][action] # 取出对应当前状态动作的q值 if feedback == 'stop': q_ns_a = 0 # 撞墙时不存在下一状态,属于原地不变 else : q_ns_a = np.max(self.q_table[str(next_state)]) # 贝尔曼方程更新 # self.q_table[str(state)][action] = q_s_a + self.learning_rate * ( # reward + self.gamma * q_ns_a - q_s_a # ) self.q_table[str(state)][action] = (1 - self.learning_rate) * q_s_a + self.learning_rate * (reward + self.gamma * q_ns_a) # print(self.q_table) return self.q_table[str(state)][action] if __name__ == '__main__': np.random.seed(0) env = env.GridWorld() agent = Agent() for ep in range(2000): if ep < 100 : agent.epsilon = 0.2 else: agent.epsilon = 0.1 state = env.reset() print('第{}轮训练开始 ... '.format(ep + 1)) while not env.attach(): action = agent.get_action(state) # 产生动作 # print(action) feedback, next_state, reward = env.exec_calc(action) # 计算状态 q_val = agent.update_q_table(feedback, state, action, reward, next_state) # 更新Q表 if feedback == 'stop': env.update_view(state, action, state, q_val) continue elif feedback == 'dead': env.update_view(state, action, next_state, q_val) break else: env.update_view(state, action, next_state, q_val) state = next_state # 状态改变代码里出现的细节点和博客要讲的理论知识,我后续有空补充哈!
