Commit fb0981c6 authored by Joseph's avatar Joseph
Browse files

Ajout des fichiers utilisés

parent 9c21a868
FROM python:3.7-alpine
RUN apk add --no-cache git make gcc g++ bash postgresql-dev python3-dev musl-dev curl
# for cryptography, need to install libffi-dev
RUN apk add --no-cache libffi-dev
# install poetry
RUN curl -sSL https://raw.githubusercontent.com/sdispater/poetry/master/get-poetry.py | python
WORKDIR /opt/sage-engine/
RUN pip install pybind11==2.2.4
RUN source ~/.poetry/env
COPY poetry.lock pyproject.toml ./
# install grpcio first to be faster than a simple poetry install
# super hack-ish from: https://github.com/grpc/grpc/issues/18150
# from 20 minutes to almost less than 1.. thank you :D
# tempory fix until someone release a python3.7-alpine grpcio wheel
RUN echo 'manylinux1_compatible = True' > /usr/local/lib/python3.7/site-packages/_manylinux.py
RUN pip install grpcio
# roll back
RUN rm /usr/local/lib/python3.7/site-packages/_manylinux.py
# generate the requirements.txt from poetry and then use pip to install
RUN ~/.poetry/bin/poetry export -f requirements.txt -v > requirements.txt
# install using poetry
RUN pip install -r requirements.txt
COPY . /opt/sage-engine
# now re run poetry for installing but without using the creation of virtualenv. no need we are in a container ><
# thus no need to install dev dependencies it's a production container
RUN ~/.poetry/bin/poetry config virtualenvs.create false && ~/.poetry/bin/poetry install --no-dev --extras "hdt postgres"
CMD [ "sage" ]
MIT License
Copyright (c) 2018 Thomas Minier
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# Query Parallelism # /!\ this is a fork of the original sage, for weird experimentations /!\
# Sage: a SPARQL query engine for public Linked Data providers
[![Build Status](https://travis-ci.com/sage-org/sage-engine.svg?branch=master)](https://travis-ci.com/sage-org/sage-engine) [![PyPI version](https://badge.fury.io/py/sage-engine.svg)](https://badge.fury.io/py/sage-engine) [![Docs](https://img.shields.io/badge/docs-passing-brightgreen)](https://sage-org.github.io/sage-engine/)
[Read the online documentation](https://sage-org.github.io/sage-engine/)
SaGe is a SPARQL query engine for public Linked Data providers that implements *Web preemption*. The SPARQL engine includes a smart Sage client
and a Sage SPARQL query server hosting RDF datasets (hosted using [HDT](http://www.rdfhdt.org/)).
This repository contains the **Python implementation of the SaGe SPARQL query server**.
SPARQL queries are suspended by the web server after a fixed quantum of time and resumed upon client request. Using Web preemption, Sage ensures stable response times for query execution and completeness of results under high load.
The complete approach and experimental results are available in a Research paper accepted at The Web Conference 2019, [available here](https://hal.archives-ouvertes.fr/hal-02017155/document). *Thomas Minier, Hala Skaf-Molli and Pascal Molli. "SaGe: Web Preemption for Public SPARQL Query services" in Proceedings of the 2019 World Wide Web Conference (WWW'19), San Francisco, USA, May 13-17, 2019*.
We appreciate your feedback/comments/questions to be sent to our [mailing list](mailto:sage@univ-nantes.fr) or [our issue tracker on github](https://github.com/sage-org/sage-engine/issues).
# Table of contents
* [Installation](#installation)
* [Getting started](#getting-started)
* [Server configuration](#server-configuration)
* [Starting the server](#starting-the-server)
* [Sage Docker image](#sage-docker-image)
* [Command line utilities](#command-line-utilities)
* [Documentation](#documentation)
# Installation
Installation in a [virtualenv](https://virtualenv.pypa.io/en/stable/) is **strongly advised!**
Requirements:
* Python 3.7 (*or higher*)
* [pip](https://pip.pypa.io/en/stable/)
* **gcc/clang** with **c++11 support**
* **Python Development headers**
> You should have the `Python.h` header available on your system.
> For example, for Python 3.6, install the `python3.6-dev` package on Debian/Ubuntu systems.
## Installation using pip
The core engine of the SaGe SPARQL query server with [HDT](http://www.rdfhdt.org/) as a backend can be installed as follows:
```bash
pip install sage-engine[hdt,postgres]
```
The SaGe query engine uses various **backends** to load RDF datasets.
The various backends available are installed as extras dependencies. The above command install both the HDT and PostgreSQL backends.
## Manual Installation using poetry
The SaGe SPARQL query server can also be manually installed using the [poetry](https://github.com/sdispater/poetry) dependency manager.
```bash
git clone https://github.com/sage-org/sage-engine
cd sage-engine
poetry install --extras "hdt postgre"
```
As with pip, the various SaGe backends are installed as extras dependencies, using the `--extras` flag.
# Getting started
## Server configuration
A Sage server is configured using a configuration file in [YAML syntax](http://yaml.org/).
You will find below a minimal working example of such configuration file.
A full example is available [in the `config_examples/` directory](https://github.com/sage-org/sage-engine/blob/master/config_examples/example.yaml)
```yaml
name: SaGe Test server
maintainer: Chuck Norris
quota: 75
max_results: 2000
graphs:
-
name: dbpedia
uri: http://example.org/dbpedia
description: DBPedia
backend: hdt-file
file: datasets/dbpedia.2016.hdt
```
The `quota` and `max_results` fields are used to set the maximum time quantum and the maximum number of results
allowed per request, respectively.
Each entry in the `datasets` field declare a RDF dataset with a name, description, backend and options specific to this backend.
Currently, **only** the `hdt-file` backend is supported, which allow a Sage server to load RDF datasets from [HDT files](http://www.rdfhdt.org/). Sage uses [pyHDT](https://github.com/Callidon/pyHDT) to load and query HDT files.
## Starting the server
The `sage` executable, installed alongside the Sage server, allows to easily start a Sage server from a configuration file using [Gunicorn](http://gunicorn.org/), a Python WSGI HTTP Server.
```bash
# launch Sage server with 4 workers on port 8000
sage my_config.yaml -w 4 -p 8000
```
The full usage of the `sage` executable is detailed below:
```
Usage: sage [OPTIONS] CONFIG
Launch the Sage server using the CONFIG configuration file
Options:
-p, --port INTEGER The port to bind [default: 8000]
-w, --workers INTEGER The number of server workers [default: 4]
--log-level [debug|info|warning|error]
The granularity of log outputs [default:
info]
--help Show this message and exit.
```
# SaGe Docker image
The Sage server is also available through a [Docker image](https://hub.docker.com/r/callidon/sage/).
In order to use it, do not forget to [mount in the container](https://docs.docker.com/storage/volumes/) the directory that contains you configuration file and your datasets.
```bash
docker pull callidon/sage
docker run -v path/to/config-file:/opt/data/ -p 8000:8000 callidon/sage sage /opt/data/config.yaml -w 4 -p 8000
```
# Documentation
To generate the documentation, navigate in the `docs` directory and generate the documentation
```bash
cd docs/
make html
open build/html/index.html
```
Copyright 2017-2019 - [GDD Team](https://sites.google.com/site/gddlina/), [LS2N](https://www.ls2n.fr/?lang=en), [University of Nantes](http://www.univ-nantes.fr/)
# bgp_interface_test.py
# Author: Thomas MINIER - MIT License 2017-2018
import pytest
import asyncio
import multiprocessing
from sage.http_server.server import run_app
from starlette.testclient import TestClient
from tests.http.utils import post_sparql
bgp_queries = [
("""
SELECT * WHERE {
?s ?p ?o.
}
""", 242256),
]
QUERY_LIMIT = multiprocessing.cpu_count()
CONVERGENCE_RUNS = 1
globalRate = None
queryToSplit = None
globalResults = {}
Debit = 0
running = {}
Credit = 1
def exQuery(query):
running = running.union(query)
emit(query)
def onResults(query, results, state, time):
globalResults = globalResults.union(results)
query.localRate = time
query.end_read = size(results)
queryToSplit = rateCheck()
if queryToSplit == query:
running.difference(query)
queries = split(query)
for q in queries:
if CONVERGENCE_RUNS > 0:
exQuery(q)
else:
if CONVERGENCE_RUNS > 0:
exQuery(query)
def onFailure(query, state):
queryToSplit = rateCheck()
if queryToSplit==query:
queries = split(query)
for q in queries:
if CONVERGENCE_RUNS > 0:
exQuery(q)
else:
if CONVERGENCE_RUNS > 0:
exQuery(query)
def rateCheck():
slowestQuery = None
rates = {}
val = 0
for q in running:
if q.localRate == None:
return None
else:
if slowestQuery == None:
slowestQuery = q
else:
if q.localRate > slowestQuery.localRate:
slowestQuery = q
rates = rates.union(q.rate)
val = val + q.rate
currentRate = val/cardinality.count(rates)
ROI = (globalRate - currentRate) / max(globalRate, currentRate)
Credit = Credit + (ROI*QUERY_LIMIT)
if ROI>=0:
Debit = Debit + (ROI*QUERY_LIMIT)
else:
Debit = Debit + (-ROI)*QUERY_LIMIT
CONVERGENCE_RUNS = Credit - Debit
globalRate = currentRate
return slowestQuery
def split(query):
query2 = query
query2.last_read = (query2.end_read-query2.last_read/2)+query2.last_read
query.end_read = query2.last_read
return {query, query2}
class TestBGPInterface(object):
QUERY_LIMIT = multiprocessing.cpu_count()
CONVERGENCE_RUNS = 1
globalRate = None
queryToSplit = None
globalResults = {}
Debit = 0
running = {}
Credit = 1
@classmethod
def setup_class(self):
self._app = run_app('config.yaml')
self._client = TestClient(self._app)
@classmethod
def teardown_class(self):
pass
def exQuery(query):
running = running.union(query)
emit(query)
def onResults(query, results, state, time):
globalResults = globalResults.union(results)
query.localRate = time
query.end_read = size(results)
queryToSplit = rateCheck()
if queryToSplit == query:
running.difference(query)
queries = split(query)
for q in queries:
if CONVERGENCE_RUNS > 0:
exQuery(q)
else:
if CONVERGENCE_RUNS > 0:
exQuery(query)
def onFailure(query, state):
queryToSplit = rateCheck()
if queryToSplit==query:
queries = split(query)
for q in queries:
exQuery(q)
else:
exQuery(query)
def rateCheck():
slowestQuery = None
rates = {}
val = 0
for q in running:
if q.localRate == None:
return None
else:
if slowestQuery == None:
slowestQuery = q
else:
if q.localRate > slowestQuery.localRate:
slowestQuery = q
rates = rates.union(q.rate)
val = val + q.rate
currentRate = val/cardinality.count(rates)
ROI = (globalRate - currentRate) / max(globalRate, currentRate)
Credit = Credit + (ROI*QUERY_LIMIT)
if ROI>=0:
Debit = Debit + (ROI*QUERY_LIMIT)
else:
Debit = Debit + (-ROI)*QUERY_LIMIT
CONVERGENCE_RUNS = Credit - Debit
globalRate = currentRate
return slowestQuery
def split(query):
query2 = query
query2.last_read = (query2.end_read-query2.last_read/2)+query2.last_read
query.end_read = query2.last_read
return {query, query2}
@pytest.mark.parametrize("query,cardinality", bgp_queries)
def test_bgp_interface(self, query, cardinality):
nbResults = 0
nbCalls = 0
hasNext = True
next_link = None
while hasNext:
response = post_sparql(self._client, query, next_link, 'http://example.org/dbpedia')
assert response.status_code == 200
response = response.json()
nbResults += len(response['bindings'])
hasNext = response['hasNext']
next_link = response['next']
exQuery(next_link)
nbCalls += 1
assert nbResults == cardinality
assert nbCalls >= 1
# bgp_interface_test.py
# Author: Thomas MINIER - MIT License 2017-2018
import pytest
import asyncio
import multiprocessing
from sage.http_server.server import run_app
from starlette.testclient import TestClient
from tests.http.utils import post_sparql
from sage.http_server.utils import decode_saved_plan, encode_saved_plan
from sage.query_engine.protobuf.iterators_pb2 import (RootTree,
SavedBagUnionIterator,
SavedFilterIterator,
SavedIndexJoinIterator,
SavedProjectionIterator,
SavedScanIterator)
bgp_queries = [
("""
SELECT * WHERE {
?s ?p ?o.
}
""", 100000),
]
QUERY_LIMIT = multiprocessing.cpu_count()
CONVERGENCE_RUNS = 1 // Credit - Debit
globalRate = 0
queryToSplit = None
globalResults = []
Debit = 0
running = []
Credit = 1
def exQuery(query):
global running
running = running.append(query)
def onResults(query, results, state, time):
global globalResults
globalResults = globalResults.append(results)
query.localRate = time
query.end_read = size(results)
global queryToSplit
queryToSplit = rateCheck()
global CONVERGENCE_RUNS
if queryToSplit == query:
global running
running.difference(query)
queries = split(query)
for q in queries:
if CONVERGENCE_RUNS > 0:
exQuery(q)
else:
if CONVERGENCE_RUNS > 0:
exQuery(query)
def onFailure(query, state):
global queryToSplit
queryToSplit = rateCheck()
global CONVERGENCE_RUNS
if queryToSplit==query:
queries = split(query)
for q in queries:
if CONVERGENCE_RUNS > 0:
exQuery(q)
else:
if CONVERGENCE_RUNS > 0:
exQuery(query)
def rateCheck():
slowestQuery = None
rates = {}
val = 0
currentRate = 0
for q in running:
if q.localRate == None:
return None
else:
if slowestQuery == None:
slowestQuery = q
else:
if q.localRate > slowestQuery.localRate:
slowestQuery = q
rates = rates.append(q.rate)
val = val + q.rate
if len(rates) != 0:
currentRate = val/len(rates)
global globalRate
ROI = 0
if max(globalRate, currentRate) != 0:
ROI = (globalRate - currentRate) / max(globalRate, currentRate)
global Credit
Credit = Credit + (ROI*QUERY_LIMIT)
global Debit
if ROI>=0:
Debit = Debit + (ROI*QUERY_LIMIT)
else:
Debit = Debit + (-ROI)*QUERY_LIMIT
global CONVERGENCE_RUNS
CONVERGENCE_RUNS = Credit - Debit
globalRate = currentRate
return slowestQuery
def split(query):
query2 = query
query2.proj_source.scan_source.last_read = str((int(query2.proj_source.scan_source.end_read)-int(query2.proj_source.scan_source.last_read)/2)+int(query2.proj_source.scan_source.last_read))
query.proj_source.scan_source.end_read = query2.proj_source.scan_source.last_read
return [query, query2]
class TestBGPInterface(object):
@classmethod
def setup_class(self):
self._app = run_app('config.yaml')
self._client = TestClient(self._app)
@classmethod
def teardown_class(self):
pass
@pytest.mark.parametrize("query,cardinality", bgp_queries)
def test_bgp_interface(self, query, cardinality):
nbResults = 0
nbCalls = 0
hasNext = True
next_link = None
while hasNext:
response = post_sparql(self._client, query, next_link, 'http://example.org/dbpedia')
assert response.status_code == 200
response = response.json()
nbResults += len(response['bindings'])
hasNext = response['hasNext']
next_link = response['next']
if next_link is not None:
plan = decode_saved_plan(next_link)
root = RootTree()
root.ParseFromString(plan)
print(root)
onFailure(root, True)
root.proj_source.scan_source.end_read = '100000'
print(root)
next_link = encode_saved_plan(root)
nbCalls += 1
assert nbResults == cardinality
assert nbCalls >= 1
# U.py
# Author: Thomas MINIER - MIT License 2017-2018
from sage.database.db_connector import DatabaseConnector
from sage.database.db_iterator import DBIterator
# from itertools import filter
class DummyDataset:
def __init__(self, doc, name):
self._name = name
self._doc = doc
def get_graph(self, name):
return self._doc
def has_graph(self, name):
return self._name == name
class SimpleIterator(DBIterator):
"""A DBIterator that iterates over a set of triples"""
def __init__(self, triples, pattern, offset=0):
super(SimpleIterator, self).__init__(pattern)
self._triples = triples
self._popped = offset
def has_next(self):
return len(self._triples) > 0
def last_read(self):
return "{}".format(self._popped)
def next(self):
self._popped += 1
return self._triples.pop()
class MemoryDatabase(DatabaseConnector):
"""An in-memory RDF database"""
def __init__(self):
super(MemoryDatabase, self).__init__()
self._triples = list()
def from_config(config):
return MemoryDatabase()
def search(self, subject, predicate, obj, last_read=None, as_of=None):
def __filter(triple):
s, p, o = triple
return (subject.startswith('?') or subject == s) and (predicate.startswith('?') or predicate == p) and (obj.startswith('?') or obj == o)
pattern = {
"subject": subject,
"predicate": predicate,
"object": obj
}
results = list(filter(__filter, self._triples))
offset = 0 if last_read is None else int(float(last_read))
results = results[offset:]
return SimpleIterator(results, pattern, offset), len(results)
def insert(self, subject, predicate, obj):
self._triples.append((subject, predicate, obj))
def delete(self, subject, predicate, obj):
self._triples.remove((subject, predicate, obj))
def close(self):
self._triples = list()
print(search(this, 'ab', ?s, ?o))
# bgp_interface_test.py
# Author: Thomas MINIER - MIT License 2017-2018
import pytest
from sage.http_server.server import run_app
from starlette.testclient import TestClient
from tests.http.utils import post_sparql
bgp_queries = [
("""
SELECT * WHERE {