In this section, we present our first two contributions: the formal description of the Apache Flink’s subset transformations and the semantic correspondence between SPARQL Algebra operators and Apache Flink’s set transformations.
3.2. Formalization of Apache Flink Transformations
In this section, we propose a formal interpretation of the PACT transformations implemented in the Apache Flink DataSet API. That interpretation will be used to establish a correspondence with the SPARQL Algebra operators. This correspondence is necessary before establishing an encoding to translate SPARQL queries to Flink programs in order to exploit the capabilities of Apache Flink for RDF data processing.
In order to define the PACT transformations, we need to define some auxiliary notions. First, we define the record projection, that builds a new record, which is made up of the key-value pairs associated with some specific keys. Second, we define the record value projection that allows obtaining the values associated with some specific keys. Next, we define the single dataset partition that creates groups of records where the values associated with some keys are the same. Finally, we define the multiple dataset partition as a generalization of the single dataset partition. The single dataset and the multiple dataset partitions are crucial to establish the definition of the reduce and the cogroup transformations due to the fact that they apply some specific user functions over groups of records.
Record projection is defined as follows:
Definition 4 (Record Projection). Let be a record and be a set of keys, we define the projection of
r over (denoted as ) as follows: In this way, by means of a record projection, a new record is obtained only with the key-value pairs associated to some key in the set .
Record value projection is defined as follows:
Definition 5 (Record Value Projection). Let be a record and be a tuple of keys such that , we define the value projection of
r over (denoted as ) as follows:where . It is worth specifying that the record value projection takes a record and produces a tuple of values. In this way, in this operation, the key order in the tuple is considered for the result construction. Likewise, the result of the record value projection could contain repeated elements. Let and be tuples of values, we say that and are equivalent () if both and contain exactly the same elements.
The notion of single dataset partition as follows:
Definition 6 (Single Dataset Partition). Let be a dataset and given a non-emtpy set of keys K, we define a single dataset partition
of over keys K as:where is a set partition of such that: Intuitively, the single dataset partition creates groups of records where the values associated to some keys (set K) are the same.
Analogous to the single dataset partition, we define the multiple dataset partition below. It is possible to realize that the multiple dataset partition is a generalization of the single dataset partition.
Definition 7 (Multiple Dataset Partition). Let be two datasets and given two non-emtpy set of keys and , we define a multiple dataset partition
of and over keys as:where is a set partition of such that: After defining the auxiliary notions, we will define the map, reduce, filter, project, match, outer match, cogroup, and union PACT transformations.
Definition 8 (Map Transformation). Let be a dataset and given a function f ranging over , i.e., , we define a map transformation
as follows: Correspondingly, the map transformation takes each record of a dataset and produces a new record by means of a user function f. Records produced by function f can differ with respect to the original records. First, the number of key-value pairs can be different, i.e., . Second, the keys do not have to match with the keys . Last, the datatype associated to each value can differ.
Accordingly, we define the reduce transformation as follows:
Definition 9 (Reduce Transformation). Let be a dataset and given a non-emtpy set of keys K and a function f ranging over the power set of , i.e., , we define a reduce transformation
as follows: In this way, the reduce transformation takes a dataset and groups records by means of the single dataset partition. In each group, the records have the same values for the keys in set K. Then, it applies user function f over each group and produces a new record.
Definition 10 (Filter Transformation). Let be a dataset and given a function f ranging over to boolean values, i.e., , we define a filter transformation
as follows: The filter transformation evaluates predicate f with every record of a dataset and it selects only those records with which f returns .
Definition 11 (Project Transformation). Let be a dataset and given a set of keys , we define a project transformation
as follows: While filter transformation allows selecting some specific records according to some criteria, which are expressed in the semantics of a function f, the project transformation enables us to obtain some specific fields from the records of a dataset . For this purpose, we apply a record projection operation—to each record in with respect to a set of keys K. It is worth highlighting that the result of a project transformation is a multi-set due to several records having the same values in the keys of set K.
Previous PACT transformations take as a parameter a dataset
and produce as a result a new dataset according to specific semantics. Nevertheless, a lot of data sources are available, and it is necessary to process and combine multiple datasets eventually. In consequence, some PACT transformations are taking two or more datasets as parameters [
14]. Following, we present a formal interpretation of the essential multi-datasets transformations, including matching, grouping, and union.
Definition 12 (Match Transformation). Let be datasets, given a function f ranging over and , i.e., and given sets of keys and , we define a match transformation
as follows: Thus, match transformation takes each pair of records built from datasets and , and applies user function f with those pairs for which the values in with respect to keys in coincide with the values in with respect to keys in . For this purpose, it checks this correspondence through a record value projection. Intuitively, the match transformation enables us to group and process pairs of records related to some specific criterion.
In some cases, it is necessary to match and process a record in a dataset even if a corresponding record does not exist in the other dataset. The outer match transformation extends the match transformation to enable such a matching. Outer match transformation is defined as follows:
Definition 13 (Outer Match Transformation). Let be datasets, given a function f ranging over and , i.e., and given sets of keys and , we define a outer match transformation
as follows: In this manner, the outer match transformation is similar to the match transformation, but it allows us to apply the user function f with a record , although it does not exist a record that matches with record with respect to keys and .
In addition to the match and outer match transformations, the cogroup transformation enables us to group records in two datasets. Those records must coincide with respect to a set of keys. The cogroup transformation is defined as follows:
Definition 14 (CoGroup Transformation). Let be datasets, given a function f ranging over and given sets of keys and , we define a cogroup transformation
as follows: Intuitively, cogroup transformation processes groups with the records on datasets and for which the values of the keys in and are equal. Then, it applies a user function f over each one of those groups.
Finally, the union transformation creates a new dataset with every record in two datasets and . It is defined as follows:
Definition 15 (Union Transformation). Let be datasets, we define a union transformation
as follows: It is essential to highlight that records in dataset and can differ in the number of pairs key-value and the type of values.
3.3. Correspondence between SPARQL Algebra Operators and Apache Flink Transformations
In this section, we propose a semantic correspondence between SPARQL algebra operators and the PACT transformations implemented in the Apache Flink DataSet API. We use the formalization of PACT transformations presented in the previous section to provide an intuitive and correct mapping of the semantics elements of SPARQL queries. It is important to remember that in this formalization a record is an unordered list of
n key-value pairs. However, as described in
Section 2.1, an RDF dataset is a set of triples which is composed of three elements
. Hence, for this particular case, a record will be understood as an unordered list of three key-value pairs. Besides, we assume that each field of a record
r can be accessed using indexes 0, 1, and 2. Likewise, we assume that RDF triple pattern are triples
where
can be variables or values. Finally, the result of the application of each PACT transformation is intended to be a solution mapping, i.e., sets of key-value pairs with RDF variables as keys that will be represented as records with
n key-value pairs.
Following, we present the definition of our encoding of SPARQL queries as PACT transformations. First, we define the encoding of the graph pattern evaluation as follows:
Definition 16 (Graph Pattern PACT Encoding). Let P be a graph pattern and be an RDF dataset, the PACT encoding of the evaluation of P over , denoted by , is defined recursively as follows:
1. If P is a triple pattern then:where function is defined as follows:and function is defined as follows: 2. If P is then:where and function is defined as follows: 3. If P is then:where and function is defined as follows: 4. If P is then: 5. If P is then:where function is defined as follows:where R is a boolean expression and In this way, the graph pattern PACT evaluation is encoded according to the recursive definition of a graph pattern P. More precisely, we have that:
If P is a triple pattern, then records of dataset are filtered (by means of function ) to obtain only the records that are compatible with respect to the variables and values in . Then, the filtered records are mapped (by means function ) to obtain solution mappings that relate each variable to each possible value.
If P is a join (left join) (it uses the SPARQL operators ()), then a () transformation is performed between the recursive evaluation of subgraphs and with respect to a set K conformed by the variables in and .
If P is a union graph pattern, then there is a union transformation between the recursive evaluation of subgraphs and .
Finally, if P is a filter graph pattern, then a transformation is performed over the recursive evaluation of subgraph where the user function f is built according to the structure of the filter expression R.
Additionally to the graph pattern evaluation, we present an encoding of the evaluation of SELECT and DISTINCT SELECT queries as well as the ORDER-BY and LIMIT modifiers. The selection encoding is defined as follows:
Definition 17 (Selection PACT Encoding). Let be an RDF dataset, P be a graph pattern, K be a finite set of variables, and be a selection query over , the PACT Encoding of the evaluation of Q over is defined as follows: Correspondingly, the selection query is encoded as a project transformation over the evaluation of the graph pattern P associated with the query with respect to a set of keys K conformed by the variables in the SELECT part of the query. We make a subtle variation in defining the distinct selection as follows:
Definition 18 (Distinct Selection PACT Encoding). Let be an RDF dataset, P be a graph pattern, K be a finite set of variables, and be a distinct selection query over , the PACT Encoding of the evaluation of over is defined as follows:where function is defined as follows: The definition of the distinct selection PACT encoding is similar to the general selection query encoding. The main difference corresponds to a reduction step ( transformation) in which, the duplicate records, i.e., records with the same value in the keys of set K (the distinct keys) are reduced to only one occurrence by means of the function that takes as a parameter a set of records for which the value in the keys in K is the same and returns the first of them (actually, it could return any of them).
The encoding of the evaluation of a order-by query is defined as follows:
Definition 19 (Order By PACT Encoding). Let be an RDF dataset, P be a graph pattern, k be a variable, and be an order by query over , the PACT Encoding of the evaluation of over is defined as follows:where function is defined as follows:where and is a permutation of M such that if or if , for each . Thereby, the graph pattern associated with the query is first evaluated according to the encoding of its precise semantics. Then, the resulting solution mapping is ordered by means of a function . Currently, we only consider ordering with respect to one key, which is a simplification of the ORDER BY operator in SPARQL. Finally, the encoding of the evaluation of a limit query is defined as follows:
Definition 20 (Limit PACT Encoding). Let be an RDF dataset, P be a graph pattern, m be an integer such that , and be a limit query over , the PACT Encoding of the evaluation of over is defined as follows:where function is defined as follows:where and such that , and . In this way, once the graph pattern associated with the query is evaluated, the result is shortened to consider only the m records according to the query. According to the SPARQL semantics, if , the result is equal to M.