Commit 10596f16 authored by Guillaume CLOCHARD's avatar Guillaume CLOCHARD
Browse files

Merge branch 'heuristic_builder' into 'master'

Ajout détection/remplacement par raccourcis 


- Ajout d'un module `heuristic_builder.py` qui génère une liste de raccourcis
  Output d'un JSON `shortcuts.json` de la forme : 
 
    ```js
  {
      "<suite de mouvements>": [<suite de mouvements plus courte qui mène au même état>],
      ...
  }
    ```
- Détection et remplacement des sous-listes de mouvements qui peuvent être raccourcis dans `algo_cfop()`

On se contente de détecter des mouvements de longueur 2 (comme "R R" --> "R2", "Ri R" --> "", etc.) pour ne pas avoir d'impact sur le temps de résolution.

See merge request !23
parents a9a30fc3 b3f533b0
......@@ -188,16 +188,19 @@ class Cube():
return '\n'.join(''.join(l) for l in result) #on convertit la liste en chaîne
def to_line(self):
def to_line(self, colors=True):
"""
to_line
:Args:
colors {Boolean} Afficher la chaîne en couleur. Defaut True.
:Returns:
{String} la représentation du cube format one line
ex: OGRBWYBGBGYYOYOWOWGRYOOOBGBRRYRBWWWRBWYGROWGRYBRGYWBOG
"""
up, left, front, right, back, down = build_faces(self, colors=True)
up, left, front, right, back, down = build_faces(self, colors=colors)
lines = [[]]*5
lines[0] = ''.join(sum(up, []))
......
......@@ -26,12 +26,16 @@ http://ruwix.com/puzzle-mouvements-generator/
'''
import json
import re
from sys import argv
from Cube import Cube
from lire_entree import lecture_cube
from utils import croix_valide, ftl_valide, cfop_valide
from utils import croix_valide, ftl_valide, cfop_valide, replace_sublist
from test import tableaux_test
SHORTCUTS = "shortcuts.json"
def algo_cfop(c):
'''
......@@ -70,7 +74,24 @@ def algo_cfop(c):
else:
return "Le cube est insolvable", None
return None, mouv
mouv = list(mouv)
#Détection des pattern qui peuvent être raccourcis
with open(SHORTCUTS) as dataFile:
shortcuts = json.load(dataFile)
#on tri par la taille des chaînes de mouvements
shortcuts = sorted(
shortcuts.items(),
key=lambda l: len(l[0]),
reverse=True
)
#On remplace chaque pattern dont on connait un raccourcis
for (sublist, remplacement) in shortcuts:
replace_sublist(mouv, sublist.split(), remplacement)
return None, mouv
def cross_facile(c):
'''
......
Construction d'une base d'heuristiques
--------------------------------------
# Fonctionnement
Le script `heuristic_builder.py` est un générateur de raccourcis.
Il construit un fichier `json` de la forme :
```json
{
<suite de mouvements> : <suite de mouvements équivalente mais plus courte>,
...
}
```
Il parcourt toutes les combinaisons de mouvements possibles (jusqu'à une
certaine limite) et déduit, pour chaque combinaison, quelle est la
suite `A` de mouvements la plus courte qui y amène.
Ainsi, pour toutes les autres suites `Bi` de mouvements qui amènent au même
état du cube on retient que `Bi` peuvent être remplacée par `A`.
Lancer le script (prévoir du café pour patienter) :coffee: :
```bash
python heuristic_builder.py --max 5 --output-file shortcuts-5.json
```
- `--max` : la taille max de la suite de mouvements. Défaut : `3`
- `--output-file` : le fichier de sortie. Défaut : `shortcuts.json`
| max | temps d'éxécution¹ | nombre de combinaisons | nombre de raccourcis |
| :-: | :---------------: | :--------------------: | :------------------: |
| 1 | instantané | 18 | 0 |
| 2 | instantané | 342 | 54 |
| 3 | ~10sec | 6174 | 1998 |
| 4 | ~5min | 11150 | 50598 |
| 5 | ~30min | 2000718 | 1124358 |
| 130 | ? | ~10^164 | ? |
*¹ Mac-mini i5, 8Go RAM*
Le script utilise le module `multiprocessing` de Python pour répartir le travail
sur les CPUs, tout en ayant des données partagées (dictionnaire des états,
dictionnaire des raccourcis, compteur, etc.).
# Quelques stats
On voit que ce n'est pas pertinent de chercher à remplacer des raccourcis longs.
Il y en a beaucoup et donc le temp de recherche/remplacement devient vite
important.
| Taille max shortcuts | Gain moyen solution | Temps d’éxécution de algo.py |
| :------------------: | :-----------------: | :--------------------------: |
| 1 | 0 | 0.6s |
| 2 | 6 | 0.6s |
| 3 | 7 | 6s |
| 4 | 8 | 38s |
| 5 | 8 | 24min |
from threading import Thread
from datetime import datetime, timedelta
import multiprocessing as mp
import sys
import getopt
import time
import json
import os
#Comme Cube utilise utils.py, qui utilise aussi getopt, mais avec d'autres
#options, on lit les arguments ici et on les efface
optlist, _ = getopt.getopt(sys.argv[1:], [], [
'output-file=',
'max=',
'ratio='
])
sys.argv = [sys.argv[0]]
#fix pour aller chercher un module dans le dossier parent
#@see http://stackoverflow.com/a/279338/2058840
sys.path.append(os.path.dirname(__file__) + "../")
from Cube import Cube
MAX_LENGTH = 3 #taille max de la chaîne de mouvements par défaut
REFRESH_TIME = 1 #1sec refresh affichage procession
RATIO = 2 #ratio entre longueur suite de mouvements et longueur raccourci
MOUVEMENTS = [
"U", "Ui", "U2", "L", "Li", "L2",
"F", "Fi", "F2", "R", "Ri", "R2",
"B", "Bi", "B2", "D", "Di", "D2"
]
SHORTCUTS_FILE = 'shortcuts.json'
def readArgs():
"""
readArgs
Lecture des arguments passés au script, version avancée.
En particulier, on veut lire --max=<taille max movements>
:Returns:
{Dict}
"""
return {k: v for k, v in optlist}
def watchProgress(count, max):
"""
watchProgress
Affiche la progression du process
ie. count/max
:Example:
[=== ] 100 / 10000 combinaisons
:Args:
count {multiprocessing.Value}
max {Number}
"""
INTERVAL = 30
def printLine(p, a, b, start, end):
#temps d'exécution
t = (datetime.now().replace(microsecond=0) - start).total_seconds()
#vitesse d'éxécution
v = a / t if t > 0 else None
#temps restant
restant = str(timedelta(seconds=int((b-a) / v))) if v else '?'
sys.stdout.write(
#représentation avancement
"[" + "=" * p + " " * (30-p) + "] "
#nombre done / nombre total
+ str(a) + "/" + str(b) + " combinaisons - "
#évaluation temps restant
+ ' Remaining : ' + (restant) + 's'
+ (" \r" if not end else " \n")
)
sys.stdout.flush()
start = datetime.now().replace(microsecond=0) #date de début
while count.value < max:
p = int(counter.value / max * INTERVAL) #rapport d'avancement sur INTERVAL
printLine(p, count.value, max, start, False) #afficher les infos
time.sleep(REFRESH_TIME) #on attend
#fin du travail, on affiche une dernière ligne
printLine(INTERVAL, max, max, start, True)
print('Done in', str(datetime.now().replace(microsecond=0) - start) + 's')
time.sleep(0.1)
def makeMove(queue, lock, counter, states, shortcuts, ration, maximum):
"""
makeMove
Un worker. Consomme les tâches de `queue` et enregistre le résultat
dans `states et `shortcuts`.
`queue` représente une liste de rotation à effectuer sur le cube.
Une action est de la forme (cube, history, longueur, mvt) où `mvt` est
la rotation à effectuer sur `cube`.
`historique` représente l'historique des mouvements faits sur ce cube, et
`longueur` le nombre de rotation de l'historique.
:Args:
queue {multiprocessing.manager.Queue}
lock {multiprocessing.Lock}
counter {multiprocessing.Value}
states {multiprocessing.manager.Dict}
shortcuts {multiprocessing.manager.Dict}
ratio {float} Rapport minimal entre la
taille de suite de mouvements
et le raccourci
maximum {int} Taille max de la liste de
mouvements
"""
while not queue.empty():
#on récupère ce qu'on doit faire
cube, history, longueur, mvt = queue.get()
#on applique la rotation
method = getattr(cube, 'rot_' + mvt)
method()
#l'état obtenu
state = cube.to_line(colors=False)
h = history + [mvt]
if state in states: #si on a déjà rencontré l'état
#on regarde quelle suite de mouvements amène à cet état
mouvements, l = states[state]
if longueur + 1 < l / ratio: #si notre solution actuelle est meilleure
#on retient cette suite de mouvements pour arriver à cet état
states[state] = h, longueur + 1
#on ajoute un shortcut pour utiliser notre version plutôt que mouvements
shortcuts[' '.join(mouvements)] = h
elif longueur + 1 > l * ratio: #si la solution historique est meilleure
#on ajoute un shortcut pour utiliser mouvements plutôt que notre version
shortcuts[' '.join(history) + ' ' + mvt] = mouvements
#sinon, on ne fait rien, car ne sert à presque rien de remplacer quoi que ce soit
else: #sinon, nouvel état
states[state] = h, longueur + 1
lock.acquire()
counter.value += 1
lock.release()
queue.task_done() #tâche effectuée
#si on n'atteint pas la limite de taille des mouvements à recherché,
#on relance un niveau supplémentaire
if longueur < maximum - 1:
for m in MOUVEMENTS:
#on ajoute à la queue
queue.put((cube, h, longueur + 1, m))
return
def saveResultShortcuts(shortcuts, file):
"""
saveResultShortcuts
:Args:
shortcuts {Dict}
file {String}
"""
with open(file, 'w') as outfile:
json.dump(shortcuts, outfile)
def calcNbCombinaisons(q, max):
"""
calcNbCombinaisons
Calcule le nombre de combinaisions que l'on va traiter en fonction
de la taille max de la liste de mouvements.
C'est une somme de d'une suite géométrique:
q + q^2 + ... q^MAX_LENGTH
où q = 18 (le nombre de mouvements)
:Args:
q {int}
max {int}
"""
return int((1 - q**(max+1)) / (1 - q) - 1)
if __name__ == '__main__':
args = readArgs()
maximum = int(args['--max']) if '--max' in args else MAX_LENGTH
shortcutsFile = args['--output-file'] if '--output-file' in args else SHORTCUTS_FILE
ratio = float(args['--ratio']) if '--ratio' in args else RATIO
cube = Cube() #un cube résolu
with mp.Manager() as manager:
"""
states
Stocke la liste des état, et le plus court jeu de mouvements
qui mènent à l'état
{
<state> : <list de mouvements>,
YYYY......WWWW: ('U R ... ', <longueur mouvement>)
....
}
"""
states = manager.dict()
#on remplit l'état inital
states[cube.to_line(colors=False)] = ([], 0)
"""
shortcuts
{
<suite de mouvements> : <suite mouvements plus courte>
"U Ui" : '',
...
}
"""
shortcuts = manager.dict()
counter = mp.Value('i', 0) #nombre de combinaisons traitées
queue = manager.Queue()
lock = manager.Lock()
#on commence à remplir queue avec les 18 premiers mouvements
for m in MOUVEMENTS:
queue.put((cube, [], 0, m))
#on lance un watcher de progression
watcher = Thread(
target=watchProgress,
args=(counter, calcNbCombinaisons(len(MOUVEMENTS), maximum)),
daemon=True
)
watcher.start()
#on crée un process par CPU pour distribuer autant que possible le
#travail
cpus = mp.cpu_count()
processes = [
mp.Process(
target=makeMove,
args=(queue, lock, counter, states, shortcuts, ratio, maximum),
daemon=True
) for i in range(cpus - 1 if cpus > 1 else 1)
]
for proc in processes:
proc.start()
#on attend que la queue soit vide
queue.join()
#on attend watche aussi histoire de ne pas avoir de pb d'affichage
watcher.join()
for proc in processes:
proc.terminate()
# listStates = sorted(states.items(), key=lambda l: l[1][1])
saveResultShortcuts(dict(shortcuts), shortcutsFile)
print('Output saved in', shortcutsFile)
{"D Di": [], "F2 F": ["Fi"], "U2 U2": [], "B2 B": ["Bi"], "R2 R": ["Ri"], "Bi B2": ["B"], "Bi B": [], "L L2": ["Li"], "U U2": ["Ui"], "Fi Fi": ["F2"], "L L": ["L2"], "D2 D2": [], "L Li": [], "U Ui": [], "R Ri": [], "Bi Bi": ["B2"], "R R2": ["Ri"], "Ui U": [], "D D2": ["Di"], "Ri R": [], "B2 Bi": ["B"], "R2 Ri": ["R"], "D2 Di": ["D"], "Ri R2": ["R"], "L2 L2": [], "Di Di": ["D2"], "R R": ["R2"], "F2 F2": [], "F F2": ["Fi"], "R2 R2": [], "F2 Fi": ["F"], "Di D2": ["D"], "F F": ["F2"], "L2 L": ["Li"], "Ui Ui": ["U2"], "L2 Li": ["L"], "Li L": [], "B B": ["B2"], "U2 U": ["Ui"], "Fi F2": ["F"], "D D": ["D2"], "Li L2": ["L"], "B B2": ["Bi"], "Di D": [], "Fi F": [], "Ui U2": ["U"], "Ri Ri": ["R2"], "B Bi": [], "D2 D": ["Di"], "U2 Ui": ["U"], "F Fi": [], "B2 B2": [], "U U": ["U2"], "Li Li": ["L2"]}
\ No newline at end of file
import numpy as np
import sys
from os import name as os_name
import sys
import subprocess
import getopt
......@@ -410,6 +410,60 @@ def newGetch():
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
def find_sublists(seq, sublist):
"""
find_sublists
Recherche d'une sous-liste dans une liste
:Args:
seq {List} La liste complète
sublist {List} La sous-liste qu'on recherche dans seq
:Returns:
{Generator} Va retourner ({Int}, {Int}) : positions début/fin
des sous-listes trouvées dans seq si il y en a
:Source:
Copier/Coller de http://stackoverflow.com/a/12898180/2058840
Ajout des commentaires
Voir http://stackoverflow.com/a/231855/2058840 pour explications
sur les générateurs et yield
"""
length = len(sublist) #taille de la sous liste recherchée
for index, value in enumerate(seq):
#quand on a trouvé la sublist dans seq
#(ie. à la position [index:index + length])
#on yield sa position
#si plusieurs match, on yieldera la prochaine position au prochain appel
if value == sublist[0] and seq[index:index+length] == sublist:
yield index, index+length
def replace_sublist(seq, target, replacement):
"""
replace_sublist
Remplacement de la sous-liste `target` de `seq` par `replacement`
:Args:
seq {List} La liste complète
target {List} La sous-liste qu'on recherche dans seq
replacement {List} La sous-liste qui va remplacer target
:Source:
Copier/Coller de http://stackoverflow.com/a/12898180/2058840
Ajout des commentaires
Voir http://stackoverflow.com/a/231855/2058840 pour explications
sur les générateurs et yield
"""
#on récupère un générateurs des emplacements de target dans seq
sublists = find_sublists(seq, target)
#pour chaque emplacement, on remplace
for start, end in sublists:
seq[start:end] = replacement
if __name__ == '__main__':
print("Test unixTermColors")
c = unixTermColors()
......@@ -444,3 +498,7 @@ if __name__ == '__main__':
print("Test colorize")
print("Red", colorize('R'))
a = [1, 2, 6, 7, 8, 4]
replace_sublist(a, [6, 7, 8], [3])
print(a)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment