Commit 118dd683 authored by Joseph's avatar Joseph
Browse files

remise à niveau

parent e024fbe4
# /!\ this is a fork of the original sage, for weird experimentations /!\
# Sage: a SPARQL query engine for public Linked Data providers
[![Build Status](https://travis-ci.com/sage-org/sage-engine.svg?branch=master)](https://travis-ci.com/sage-org/sage-engine) [![PyPI version](https://badge.fury.io/py/sage-engine.svg)](https://badge.fury.io/py/sage-engine) [![Docs](https://img.shields.io/badge/docs-passing-brightgreen)](https://sage-org.github.io/sage-engine/)
......@@ -52,7 +50,7 @@ The SaGe SPARQL query server can also be manually installed using the [poetry](h
```bash
git clone https://github.com/sage-org/sage-engine
cd sage-engine
poetry install --extras "hdt postgre"
poetry install --extras "hdt postgres"
```
As with pip, the various SaGe backends are installed as extras dependencies, using the `--extras` flag.
......
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
http://0.0.0.0:8000/sparql?query=Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia
......@@ -196,12 +196,12 @@ async def main():
global queue
nbCalls = 0
hasNext = True
maxValue = 242256
next_link = "EkUKAj9wCgI/bwoCP3MSNwooCgI/cxICP3AaAj9vIhpodHRwOi8vZXhhbXBsZS5vcmcvZGJwZWRpYRIBMBoENjAwMCDQ5A4="
maxValue = 500000
next_link = "EkYKAj9zCgI/cAoCP28SOAooCgI/cxICP3AaAj9vIhpodHRwOi8vZXhhbXBsZS5vcmcvZGJwZWRpYRIDMjU1GgItMSCHlNMD"
plan = decode_saved_plan(next_link)
root = RootTree()
root.ParseFromString(plan)
root.proj_source.scan_source.end_read = str(maxValue)
root.proj_source.scan_source.last_read = str(0)
next_link = encode_saved_plan(root)
queue.put_nowait(exQuery("Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D", next_link, session))
print("nbresults = " + str(NbResults))
......@@ -226,10 +226,18 @@ if __name__ == '__main__':
taskers = [
do_work('task1', queue),
do_work('task2', queue),
do_work('task3', queue)
do_work('task3', queue),
do_work('task4', queue),
do_work('task5', queue),
do_work('task6', queue),
do_work('task7', queue),
do_work('task8', queue),
do_work('task9', queue),
do_work('task10', queue)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(taskers))
print("nbresults = " + str(NbResults))
loop.close()
# bgp_interface_test.py
# Author: Thomas MINIER - MIT License 2017-2018
import pytest
import time
import asyncio
import aiohttp
import multiprocessing
import tracemalloc
from datetime import datetime
from asyncio import Queue
from sage.http_server.server import run_app
from starlette.testclient import TestClient
from tests.http.utils import post_sparql
from sage.http_server.utils import decode_saved_plan, encode_saved_plan
from sage.query_engine.protobuf.iterators_pb2 import (RootTree,
SavedBagUnionIterator,
SavedFilterIterator,
SavedIndexJoinIterator,
SavedProjectionIterator,
SavedScanIterator)
bgp_queries = [
("""
SELECT * WHERE {
?s ?p ?o.
}
""", 100000),
]
QUERY_LIMIT = multiprocessing.cpu_count()
CONVERGENCE_RUNS = 1 #Credit - Debit
globalRate = 0
queryToSplit = None
globalResults = []
Debit = 0
niveauActuel = 0
restant = 1
running = []
Credit = 1
NbResults = 0
tasks = []
async def exQuery(query, next_link, session, niveau):
global running
if running is None:
running = []
running.append([query,None])
print(len(running))
value = ''
if next_link is not None:
value = '&next=' + next_link
start = time.time()
now = datetime.now()
now.strftime("%H:%M:%S.%f")
async with session.get('http://0.0.0.0:8000/sparql?query='+query+'&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia'+value) as resp:
response = await resp.json()
end = time.time()
nd = datetime.now()
nd.strftime("%H:%M:%S.%f")
global NbResults
NbResults += len(response['bindings'])
next_link = response['next']
if next_link is not None:
plan = decode_saved_plan(next_link)
root = RootTree()
root.ParseFromString(plan)
if root.proj_source.scan_source.end_read=="-1":
root.proj_source.scan_source.end_read = str(root.proj_source.scan_source.cardinality)
if int(root.proj_source.scan_source.end_read) > int(root.proj_source.scan_source.last_read):
next_link = encode_saved_plan(root)
await onResults(query, response['bindings'], True, end-start, next_link, session, niveau)
else:
global globalResults
if globalResults is None:
globalResults = []
if running is not None and [query,None] in running:
running.remove([query,None])
globalResults.append(response['bindings'])
async def onResults(query, results, state, time, next_link, session, niveau):
global queue
global globalResults
if globalResults is None:
globalResults = []
globalResults.append(results)
global running
if running is not None and [query,None] in running:
running.remove([query,None])
if running is None:
running = []
running.append([query,len(results)/time])
global queryToSplit
queryToSplit = rateCheck()
if running is not None:
running.remove([query,len(results)/time])
global CONVERGENCE_RUNS
if queryToSplit == query:
queries = split(query, next_link, niveau)
if CONVERGENCE_RUNS > 0:
if len(queries)==3:
niveau = queries[2]
queue.put_nowait(exQuery(query, queries[0], session, niveau))
queue.put_nowait(exQuery(query, queries[1], session, niveau))
else:
queue.put_nowait(exQuery(query, queries[0], session, niveau))
else:
if CONVERGENCE_RUNS > 0:
queue.put_nowait(exQuery(query, next_link, session, niveau))
async def onFailure(query, state, next_link):
if running is not None and [query,None] in running:
running.remove([query,None])
if running is None:
running = []
global queryToSplit
queryToSplit = rateCheck()
global CONVERGENCE_RUNS
if queryToSplit==query:
queries = split(query, next_link)
if CONVERGENCE_RUNS > 0:
if len(queries)==2:
await asyncio.gather(
exQuery(query, queries[0], session),
exQuery(query, queries[1], session),
)
else:
await exQuery(query, queries[0], session)
else:
if CONVERGENCE_RUNS > 0:
await exQuery(query, next_link)
def rateCheck():
slowestQuery = None
rates = []
val = 0
currentRate = 0
for q in running:
if q[1] == None:
return None
else:
if slowestQuery == None:
slowestQuery = q
else:
if q[1] < slowestQuery[1]:
slowestQuery = q
rates = rates.append(q[1])
val = val + q[1]
if rates is not None and len(rates) != 0:
currentRate = val/len(rates)
global globalRate
ROI = 0
if max(globalRate, currentRate) != 0:
ROI = (globalRate - currentRate) / max(globalRate, currentRate)
global Credit
Credit = Credit + (ROI*QUERY_LIMIT)
global Debit
if ROI>=0:
Debit = Debit + (ROI*QUERY_LIMIT)
else:
Debit = Debit + (-ROI)*QUERY_LIMIT
global CONVERGENCE_RUNS
CONVERGENCE_RUNS = Credit - Debit
globalRate = currentRate
return slowestQuery[0]
def split(query, next_link, niveau):
plan = decode_saved_plan(next_link)
root = RootTree()
root.ParseFromString(plan)
global niveauActuel
if int(root.proj_source.scan_source.end_read)-int(root.proj_source.scan_source.last_read) > 200 and niveauActuel==niveau:
global restant
if restant == 1:
niveauActuel = niveauActuel+1
restant = pow(2,niveauActuel)
else:
restant = restant - 1
niveau = niveau + 1
plan = decode_saved_plan(next_link)
root2 = RootTree()
root2.ParseFromString(plan)
root2.proj_source.scan_source.last_read = str(int(((float(root2.proj_source.scan_source.end_read)-float(root2.proj_source.scan_source.last_read))/2)+float(root2.proj_source.scan_source.last_read)))
root.proj_source.scan_source.end_read = root2.proj_source.scan_source.last_read
next_link = encode_saved_plan(root)
next_link2 = encode_saved_plan(root2)
print ("niveau : " + str(niveau))
return [next_link, next_link2, niveau]
return [next_link]
class TestBGPInterface(object):
@classmethod
def setup_class(self):
self._app = run_app('config.yaml')
self._client = TestClient(self._app)
@classmethod
def teardown_class(self):
pass
session = aiohttp.ClientSession()
async def main():
global queue
nbCalls = 0
hasNext = True
maxValue = 500000
next_link = "EkYKAj9zCgI/cAoCP28SOAooCgI/cxICP3AaAj9vIhpodHRwOi8vZXhhbXBsZS5vcmcvZGJwZWRpYRIDMjU1GgItMSCHlNMD"
plan = decode_saved_plan(next_link)
root = RootTree()
root.ParseFromString(plan)
root.proj_source.scan_source.last_read = str(0)
next_link = encode_saved_plan(root)
queue.put_nowait(exQuery("Select%20%2A%20where%20%7B%3Fs%20%3Fp%20%3Fo%7D", next_link, session, 0))
print("nbresults = " + str(NbResults))
assert nbCalls >= 0
async def do_work(task_name, work_queue):
while not work_queue.empty():
queue_item = work_queue.get_nowait()
print('{0} got item: {1}'.format(task_name, queue_item))
await queue_item
print('{0} finished processing item: {1}'.format(task_name, queue_item))
if __name__ == '__main__':
queue = Queue()
# Load initial jobs into queue
queue.put_nowait(main())
# use 3 workers to consume tasks
taskers = [
do_work('task1', queue),
do_work('task2', queue),
do_work('task3', queue),
do_work('task4', queue),
do_work('task5', queue),
do_work('task6', queue),
do_work('task7', queue),
do_work('task8', queue),
do_work('task9', queue),
do_work('task10', queue)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(taskers))
print("nbresults = " + str(NbResults))
loop.close()
# bgp_interface_test.py
# Author: Thomas MINIER - MIT License 2017-2018
import pytest
import time
import asyncio
import aiohttp
import multiprocessing
import tracemalloc
from datetime import datetime
from asyncio import Queue
from sage.http_server.server import run_app
from starlette.testclient import TestClient
from tests.http.utils import post_sparql
from sage.http_server.utils import decode_saved_plan, encode_saved_plan
from sage.query_engine.protobuf.iterators_pb2 import (RootTree,
SavedBagUnionIterator,
SavedFilterIterator,
SavedIndexJoinIterator,
SavedProjectionIterator,
SavedScanIterator)
bgp_queries = [
("""
SELECT * WHERE {
?s ?p ?o.
}
""", 100000),
]
QUERY_LIMIT = multiprocessing.cpu_count()
CONVERGENCE_RUNS = 1 #Credit - Debit
globalRate = 0
queryToSplit = None
globalResults = []
niveauActuel = 0
restant = 1
Debit = 0
running = []
Credit = 1
NbResults = 0
tasks = []
async def exQuery(query, next_link, session, vitessePrec, niveau):
global running
if running is None:
running = []
running.append([query,None])
print(len(running))
value = ''
if next_link is not None:
value = '&next=' + next_link
start = time.time()
now = datetime.now()
now.strftime("%H:%M:%S.%f")
async with session.get('http://0.0.0.0:8000/sparql?query='+query+'&default-graph-uri=http%3A%2F%2Fexample.org%2Fdbpedia'+value) as resp:
response = await resp.json()
end = time.time()
nd = datetime.now()
nd.strftime("%H:%M:%S.%f")
global NbResults
NbResults += len(response['bindings'])
next_link = response['next']
if next_link is not None:
plan = decode_saved_plan(next_link)
root = RootTree()
root.ParseFromString(plan)
if root.proj_source.scan_source.end_read=="-1":
root.proj_source.scan_source.end_read = str(root.proj_source.scan_source.cardinality)
if int(root.proj_source.scan_source.end_read) > int(root.proj_source.scan_source.last_read):
next_link = encode_saved_plan(root)
await onResults(query, response['bindings'], True, end-start, next_link, session, vitessePrec, niveau)
else:
global globalResults
if globalResults is None:
globalResults = []
if running is not None and [query,None] in running:
running.remove([query,None])
globalResults.append(response['bindings'])
async def onResults(query, results, state, time, next_link, session, vitessePrec, niveau):
global queue
global globalResults
if globalResults is None:
globalResults = []
globalResults.append(results)
global running
if running is not None and [query,None] in running:
running.remove([query,None])
if running is None:
running = []
running.append([query,len(results)/time])
global queryToSplit
queryToSplit = rateCheck()
if running is not None:
running.remove([query,len(results)/time])
global CONVERGENCE_RUNS
if queryToSplit == query:
queries = split(query, next_link, vitessePrec, len(results)/time, niveau)
if CONVERGENCE_RUNS > 0:
if len(queries)==3:
niveau = queries[2]
queue.put_nowait(exQuery(query, queries[0], session, len(results)/time, niveau))
queue.put_nowait(exQuery(query, queries[1], session, len(results)/time, niveau))
else:
queue.put_nowait(exQuery(query, queries[0], session, len(results)/time, niveau))
else:
if CONVERGENCE_RUNS > 0:
queue.put_nowait(exQuery(query, next_link, session, len(results)/time, niveau))
async def onFailure(query, state, next_link):
if running is not None and [query,None] in running:
running.remove([query,None])
if running is None:
running = []
global queryToSplit
queryToSplit = rateCheck()
global CONVERGENCE_RUNS
if queryToSplit==query:
queries = split(query, next_link)
if CONVERGENCE_RUNS > 0:
if len(queries)==2:
await asyncio.gather(
exQuery(query, queries[0], session),
exQuery(query, queries[1], session),
)
else:
await exQuery(query, queries[0], session)
else:
if CONVERGENCE_RUNS > 0:
await exQuery(query, next_link)
def rateCheck():
slowestQuery = None
rates = []
val = 0
currentRate = 0
for q in running:
if q[1] == None:
return None
else:
if slowestQuery == None:
slowestQuery = q
else:
if q[1] < slowestQuery[1]:
slowestQuery = q
rates = rates.append(q[1])
val = val + q[1]
if rates is not None and len(rates) != 0:
currentRate = val/len(rates)
global globalRate
ROI = 0
if max(globalRate, currentRate) != 0:
ROI = (globalRate - currentRate) / max(globalRate, currentRate)
global Credit
Credit = Credit + (ROI*QUERY_LIMIT)
global Debit
if ROI>=0:
Debit = Debit + (ROI*QUERY_LIMIT)
else:
Debit = Debit + (-ROI)*QUERY_LIMIT
global CONVERGENCE_RUNS
CONVERGENCE_RUNS = Credit - Debit
globalRate = currentRate
return slowestQuery[0]
def split(query, next_link, vitessePrec, vitesseAct, niveau):
plan = decode_saved_plan(next_link)
root = RootTree()
root.ParseFromString(plan)
global niveauActuel
if int(root.proj_source.scan_source.end_read)-int(root.proj_source.scan_source.last_read) > 200 and vitessePrec<vitesseAct and niveauActuel==niveau:
global restant
if restant == 1:
niveauActuel = niveauActuel+1
restant = pow(2,niveauActuel)
else:
restant = restant - 1
niveau = niveau + 1
plan = decode_saved_plan(next_link)
root2 = RootTree()
root2.ParseFromString(plan)
root2.proj_source.scan_source.last_read = str(int(((float(root2.proj_source.scan_source.end_read)-float(root2.proj_source.scan_source.last_read))/2)+float(root2.proj_source.scan_source.last_read)))
root.proj_source.scan_source.end_read = root2.proj_source.scan_source.last_read
next_link = encode_saved_plan(root)
next_link2 = encode_saved_plan(root2)
return [next_link, next_link2, niveau]
return [next_link]
class TestBGPInterface(object):
@classmethod
def setup_class(self):
self._app = run_app('config.yaml')
self._client = TestClient(self._app)
@classmethod
def teardown_class(self):
pass