ML | Reinforcement Learning Algorithm: Python Implementation Using Q-Learning

Reinforced Learning — it is a type of machine learning paradigm in which the learning algorithm is trained not on preset data, but on the basis of a feedback system. These algorithms are touted as the future of machine learning because they eliminate the cost of collecting and cleaning data.

In this article, we`re going to demonstrate how to implement a basic reinforcement learning algorithm called the Q-Learning technique . In this demo, we are trying to teach a bot to get to its destination using the Q-Learning technique .

Step 1: Import the required libraries

import numpy as np

import pylab as pl

import networkx as nx

Step 2: Define and render the graph

edges = [( 0 , 1 ), ( 1 , 5 ), ( 5 , 6 ), ( 5 , 4 ), ( 1 , 2 ), 

( 1 , 3 ), ( 9 , 10 ), ( 2 , 4 ), ( 0 , 6 ), ( 6 , 7 ),

( 8 , 9 ), ( 7 , 8 ), ( 1 , 7 ), ( 3 , 9 )]

  

goal = 10

G = nx.Graph ()

G.add_edges_from (edges)

pos = nx.spring_layout (G)

nx.draw_networkx_nodes (G, pos)
nx.draw_networkx_edges (G, pos)
nx.draw_networkx_labels (G, pos)
pl.show ()

Note: The above graph may not look the same when you reproduce the code because the networkx in python creates a random graph from given edges.

Step 3: Determine the system reward for the bot

MATRIX_SIZE = 11

M = np.matrix (np.ones (shape = (MATRIX_SIZE, MATRIX_SIZE)))

M * = - 1

 

for point in edges:

print (point)

if point [ 1 ] = = goal:

M [point] = 100

else :

M [point] = 0

  

  if point [ 0 ] = = goal:

M [point [:: - 1 ]] = 100

else :

M [point [:: - 1 ]] = 0

# backspace

 

M [goal, goal] = 100

print (M)

# add a round trip target

Step 4: Identify some utilities to use in training

Q = np.matrix (np.zeros ([MATRIX_SIZE, MATRIX_SIZE]))

  

gamma = 0.75

# learning parameter

initial_state = 1

 
# Determines the available actions for this state

def available_actions (state):

current_state_row = M [state,]

  available_action = np.where (current_state_row & gt; = 0 ) [ 1 ]

  return available_action

  

available_action = available_actions (initial_state)

 
# Selects one of the available actions at random

def sample_next_action (available_actions_range):

next_action = int (np.random.choice (available_action, 1 ))

return next_action

 

  

action = sample_next_action (available_action)

  

def update (current_state, action, gamma):

 

max_index = np.where (Q [action,] = = np. max (Q [action,])) [ 1 ]

  if max_index .shape [ 0 ] & gt; 1 :

max_index = int (np. random.choice (max_index, size = 1 ))

else :

max_index = int (max_index)

max_value = Q [action, max_index]

  Q [current_state, action] = M [current_state, action] + gamma * max_value

if (np. max (Q) & gt; 0 ):

  return (np. sum (Q / np. max (Q) * 100 ))

else :

return ( 0 )

# Updates the Q-Matrix according to the selected path

 
update (initial_state, action, gamma)

Step 5 : Train and evaluate a bot using Q-Matrix

scores = []

for i in range ( 1000 ):

current_state = np.random.randint ( 0 , int (Q.shape [ 0 ]))

available_action = available_actions (cur rent_state)

action = sample_next_action (available_action)

score = update (current_state, action, gamma)

scores.append (score)

 
# print (& quot; Trained matrix Q: & quot;)
# print (Q / np.max (Q) * 100)
# You can uncomment the above two lines to view the trained Q matrix

 
# Testing

current_state = 0

steps = [current_state]

 

while current_state! = 10 :

  

next_step_index = np.where (Q [current_state,] = = np. max (Q [current_state,])) [ 1 ]

  if next_step_index.shape [ 0 ] & gt; 1 :

next_step_index = int (np. random.choice (next_step_index, size = 1 ))

else :

next_step_index = int (next_step_index)

steps .append (next_step_index)

current_state = next_step_index

 

print ( "Most efficient path:" )

print (steps)

 
pl.plot (scores)

pl. xlabel ( `No of iterations` )

pl.ylabel ( `Reward` )

pl.show ()

Now let`s bring this bot to a more realistic setting. Let`s pretend that the bot is a detective and is trying to find out the whereabouts of a large drug racket. He naturally concludes that drug dealers will not sell their products in places where the police are known to frequent them, and the sales points are near the place where drugs are sold. In addition, sellers leave a trail of their products where they sell them, which can help the detective figure out the required location. We want to train our bot to find a location using these ecological clues .

Step 6: Define and render a new graph with environmental clues

# Locating police and drug traces

police = [ 2 , 4 , 5 ]

drug_traces = [ 3 , 8 , 9 ]

 

G = nx.Graph ()

G.add_edges_from (edges)

mapping = { 0 : `0 - Detective` , 1 : ` one ` , 2 : ` 2 - Police` , 3 : `3 - Drug traces` ,

  4 : `4 - Police` , 5 : ` 5 - Police` , 6 : `6` , 7 : `7` , 8 : `Drug traces` ,

  9 : `9 - Drug traces` , 10 : `10 - Drug racket location` }

 

H = nx.relabel_nodes (G, mapping)

pos = nx.spring_layout (H)

nx.draw_networkx_nodes (H, pos, node_size = [ 200 , 200 , 200 , 200 , 200 , 200 , 200 , 200 ])

nx.draw_networkx_edges (H, pos)
nx.draw_networkx_labels (H, pos)
pl.show () 

Note: The above graph may differ slightly from the previous graph, but they are actually the same graphs. This is due to the random placement of nodes by the networkx library.

Step 7: Define some helper functions for the educational process

Q = np.matrix (np.zeros ([MATRIX_SIZE, MATRIX_SIZE]))

env_police = np.matrix (np. zeros ([MATRIX_SIZE, MATRIX_SIZE]))

env_drugs = np.matrix (np.zeros ([MATRIX_SIZE, MATRIX_SIZE]))

initial_state = 1

 
# Same as above

def availabl e_actions (state):

current_state_row = M [state,]

av_action = np.where (current_state_row & gt; = 0 ) [ 1 ]

return av_action

 
# Same as above

def sample_next_action (available_actions_range):

next_action = int (np.random.choice (available_action, 1 ))

return next_action

 
# Exploring the environment

def collect_environmental_data (action):

found = []

if action in police:

found.append ( `p` )

if action in drug_traces:

  found.append ( `d` )

return (found)

 

 

available_action = available_actions (initial_state)

action = sample_next_action (available_action)

  

def update (current_state, action, gamma):

max_index = np.where (Q [action,] = = np. max (Q [action,])) [ 1 ]

if max_index.shape [ 0 ] & gt; 1 :

max_index = int (np. random.choice (max_index, size = 1 ))

else :

max_index = int (max_index)

max_value = Q [action, max_index]

  Q [current_state, action] = M [current_state, action] + gamma * max_value

environment = collect_environmental_data (action)

if `p` in environment:

env_police [current_state, action] + = 1

if `d` in environment:

env_drugs [current_state, action] + = 1

if (np. max (Q ) & gt; 0 ):

])) [ 1 ]

if max_index.shape [ 0 ] & gt; 1 :

max_index = int (np. random.choice (max_index, size = 1 ))