All examples in this article were run on Starburst Galaxy with clusters based on trino 432.

Overview

Predicate pushdown is the term used in Trino to describe when a filter in a query is “pushed” down to a connector so that it can be used to reduce the amount of data read from the underlying data source.

Predicate pushdown is performed automatically by Trino when possible, which is typically whenever a SARGable filter expression is used and the connector supports predicate pushdown. Without predicate pushdown, the filtering is performed by Trino on each row after reading all data for a table via the connector.

For example, take this query executed in Trino against a MySQL catalog:

SELECT name FROM customer WHERE acctbal = 4231.45

The source stage in the output of EXPLAIN ANALYZE looks like:

Fragment 1 [SOURCE]
    CPU: 3.70ms, Scheduled: 6.56ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 1 row (23B); per task: avg.: 1.00 std.dev.: 0.00, Output: 1 row (23B)
    Output layout: [name]
    Output partitioning: SINGLE []
    TableScan[table = mysql:tpch.customer tpch.customer constraint on [acctbal] columns=[name:varchar(255):TINYTEXT]]
        Layout: [name:varchar(255)]
        Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        CPU: 2.00ms (100.00%), Scheduled: 5.00ms (100.00%), Blocked: 0.00ns (?%), Output: 1 row (23B)
        Input avg.: 1.00 rows, Input std.dev.: 0.00%
        name := name:varchar(255):TINYTEXT
        Input: 1 row (23B), Physical input time: 3.39ms

Note that the operator in the source stage is a TableScan operator meaning there is no filtering occurring in Trino. The other item to note in this source stage in the number of input rows.

Now lets update our simple example and construct a query on a MySQL catalog with a predicate that cannot be pushed down:

SELECT name FROM customer
WHERE acctbal = 4231.45 OR name = 'Customer#000000376'

The source stage in the output of EXPLAIN ANALYZE looks like:

Fragment 1 [SOURCE]
    CPU: 40.07ms, Scheduled: 43.26ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 1500 rows (0B); per task: avg.: 1500.00 std.dev.: 0.00, Output: 1 row (23B)
    Output layout: [name]
    Output partitioning: SINGLE []
    ScanFilterProject[table = mysql:tpch.customer tpch.customer columns=[name:varchar(255):TINYTEXT, acctbal:double:DOUBLE], filterPredicate = (("acctbal" = 4.23145E3) OR ("name" = CAST('Customer#000000376' AS varchar(255))))]
        Layout: [name:varchar(255)]
        Estimates: {rows: 1480 (79.49kB), cpu: 92.50k, memory: 0B, network: 0B}/{rows: ? (?), cpu: 92.50k, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        CPU: 39.00ms (100.00%), Scheduled: 42.00ms (100.00%), Blocked: 0.00ns (?%), Output: 1 row (23B)
        Input avg.: 1500.00 rows, Input std.dev.: 0.00%
        name := name:varchar(255):TINYTEXT
        acctbal := acctbal:double:DOUBLE
        Input: 1500 rows (0B), Filtered: 99.93%, Physical input time: 2.37ms

Note the operator is now a ScanFilterProject operator which means that a filter will occur in Trino. Also the number of input rows is now 1500 which is how large this particular table. Thus, the connector is scanning all data for this table and filtering is occurring in Trino.

Predicate pushdown on parquet files

There are three operations that are performed by the parquet reader in Trino to reduce the amount of data that is read:

  • Row group pruning
  • Data page filtering
  • Bloom filters

All of these operations are performed automatically by Trino when possible.

Row group pruning uses the min/max statistics for all columns and dictionary filtering for dictionary encoded columns.

The file footer in a parquet file contains metadata with min/max statistics for each column in a row group. Using this metadata, the parquet reader in Trino can decide to skip reading a row group if the value being searched for is not in the min/max range for that column. This can result in entire files being skipped if none of the row groups in that file have a value that is in the min/max range for the column that is being searched on.

Dictionary filtering can skip reading a row group for dictionary encoded columns. In each row group, for each dictionary encoded column, there is a dictionary page that the Trino parquet reader looks at first. If the value being searched for does not have a key in the dictionary, then the row group can be skipped.

For data page filtering, Parquet added support for column indexes which track min/max statistics for each data page (the default page size for parquet is 1MB). This can be leveraged to skip individual data pages for a column in a row group. It is essentially the same as row group min/max filtering but just at a finer granularity.

Finally, bloom filters are a probabilistic data structure used for set membership tests. Bloom filters in parquet files are typically useful when a field has too many distinct values to use dictionary encoding. The parquet writer in Trino does support writing bloom filters so in order to take advantage of bloom filters, the parquet files would need to be produced by another engine such as Spark. The option to control whether bloom filters will be created in Spark is parquet.bloom.filter.enabled

Using the previous examples, we will create a version of the customer table with the hive connector and parquet files:

create table parquet_sf100_customer
with (
  type = 'hive',
  format = 'parquet'
) as select * from tpch.sf100.customer

To demonstrate predicate pushdown with parquet files we need to make sure we have mulitple parquet files with each being at least 3MB in size. If a parquet file is less than 3MB in size then by default, trino will just read the entire file.

Row group pruning

There are 2 techniques used by the parquet reader in Trino to perform row group pruning: 1) filtering via min/max statistics and 2) dictionary filtering on dictionary encoded fields.

Filtering via min/max statistics

The previous query we used for our simple example with MySQL does not have effective predicate pushdown with Parquet files. Lets look at the source stage when the query is run against the parquet_sf100_customer table we created:

Fragment 1 [SOURCE]
    CPU: 822.62ms, Scheduled: 5.37s, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 15000000 rows (131.51MB); per task: avg.: 7500000.00 std.dev.: 2023362.00, Output: 17 rows (391B)
    Output layout: [name]
    Output partitioning: SINGLE []
    ScanFilterProject[table = datalake:hive:parquet_sf100_customer, filterPredicate = ("acctbal" = 4.23145E3)]
        Layout: [name:varchar(25)]
        Estimates: {rows: 15000000 (329.02MB), cpu: 457.76M, memory: 0B, network: 0B}/{rows: 14 (312B), cpu: 457.76M, memory: 0B, network: 0B}/{rows: 14 (312B), cpu: 312, memory: 0B, network: 0B}
        CPU: 820.00ms (100.00%), Scheduled: 5.36s (100.00%), Blocked: 0.00ns (?%), Output: 17 rows (391B)
        Input avg.: 416666.67 rows, Input std.dev.: 137.17%
        name := name:varchar(25):REGULAR
        acctbal := acctbal:double:REGULAR
        Input: 15000000 rows (131.51MB), Filtered: 100.00%, Physical input: 83.09MB, Physical input time: 4.50s

Notice the number of input rows is 15000000 indicating that predicate pushdown was not effective in this case.

This is because the acctbal predicate value is within the min and max values for this field in all row groups across all parquet files. So it is not possible for the parquet reader to skip reading any row groups based on this predicate. We can view the min/max statistics stored in parquet files using the parquet CLI (see this post for an in-depth overview of using the parquet CLI). For example, here is what the parquet CLI shows us for a row group for one of the parquet files that make up the parquet_sf100_customer table:

Row group 0:  count: 1793097  53.60 B records  start: 4  total(compressed): 91.659 MB total(uncompressed):279.601 MB
--------------------------------------------------------------------------------
            type      encodings count     avg size   nulls   min / max
custkey     INT64     G   _     1793097   1.52 B     0       "24604" / "14879964"
name        BINARY    G   _     1793097   2.60 B     0       "Customer#000024604" / "Customer#014879964"
address     BINARY    G   _     1793097   21.16 B    0       "   ,qJqVsHDVWLs6mv6S7Hwh9H" / "zzzmVsI9jUl6Wqk6oSl"
nationkey   INT64     G _ R     1793097   0.62 B     0       "0" / "24"
phone       BINARY    G   _     1793097   7.31 B     0       "10-100-124-6236" / "34-999-990-8682"
acctbal     DOUBLE    G   _     1793097   3.79 B     0       "-999.99" / "9999.99"
mktsegment  BINARY    G _ R     1793097   0.34 B     0       "AUTOMOBILE" / "MACHINERY"
comment     BINARY    G   _     1793097   16.27 B    0       " Tiresias about the furio..." / "zzle. slyly regular depos..."

Notice the min/max values for the acctbal field in this row group are -999.99/9999.99.

Let’s try a query on the custkey field that we would expect to be able to do some row group pruning. This is the query we will execute:

SELECT custkey FROM parquet_sf100_customer WHERE custkey = 1

The source stage from EXPLAIN ANALYZE looks like:

Fragment 1 [SOURCE]
    CPU: 243.27ms, Scheduled: 2.67s, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 2434552 rows (20.90MB); per task: avg.: 1217276.00 std.dev.: 1217276.00, Output: 1 row (9B)
    Amount of input data processed by the workers for this stage might be skewed
    Output layout: [custkey]
    Output partitioning: SINGLE []
    ScanFilter[table = datalake:hive:parquet_sf100_customer, filterPredicate = ("custkey" = BIGINT '1')]
        Layout: [custkey:bigint]
        Estimates: {rows: 15000000 (128.75MB), cpu: 128.75M, memory: 0B, network: 0B}/{rows: 1 (9B), cpu: 128.75M, memory: 0B, network: 0B}
        CPU: 241.00ms (100.00%), Scheduled: 2.67s (100.00%), Blocked: 0.00ns (?%), Output: 1 row (9B)
        Input avg.: 67626.44 rows, Input std.dev.: 591.61%
        custkey := custkey:bigint:REGULAR
        Input: 2434552 rows (20.90MB), Filtered: 100.00%, Physical input: 3.57MB, Physical input time: 268.29ms

Notice that input rows is 2434552. This indicates some row groups were skipped by the parquet reader. This makes sense because using the min/max stats from the row group we showed previously the min/max values for custkey in that row group were 24604/14879964. In this case, the predicate value of 1 is less than the min value for the custkey field in this row group. Thus, the parquet reader does not need to read this row group from this parquet file.

Now if we look at the row group that does match this predicate:

Row group 0:  count: 2434552  53.60 B records  start: 4  total(compressed): 124.449 MB total(uncompressed):379.587 MB
--------------------------------------------------------------------------------
            type      encodings count     avg size   nulls   min / max
custkey     INT64     G   _     2434552   1.52 B     0       "1" / "14981289"
name        BINARY    G   _     2434552   2.59 B     0       "Customer#000000001" / "Customer#014981289"
address     BINARY    G   _     2434552   21.16 B    0       "   ,CpQVsCA2ou" / "zzzrRIDbmZKVG9O,JaHPU7"
nationkey   INT64     G _ R     2434552   0.62 B     0       "0" / "24"
phone       BINARY    G   _     2434552   7.31 B     0       "10-100-114-7214" / "34-999-998-5763"
acctbal     DOUBLE    G   _     2434552   3.79 B     0       "-999.99" / "9999.99"
mktsegment  BINARY    G _ R     2434552   0.34 B     0       "AUTOMOBILE" / "MACHINERY"
comment     BINARY    G   _     2434552   16.27 B    0       " Tiresias above the foxes..." / "zzle? furiously regular p..."

Notice the min value for custkey in this row group is 1 and also notice that the number of rows in this row group is 2434552 which is the same number of rows read by Trino:

Input: 2434552 rows (20.90MB), Filtered: 100.00%, Physical input: 3.57MB, Physical input time: 268.29ms

Dictionary filtering

This filtering method is based on dictionary encoding. We can leverage dictionary filtering if a column is encoded as a dictionary. Parquet has a default dictionary size limit of 1 MB. If the size of a column for a row group exceeds 1 MB, it will fall back to plain encoding.

For this example, we will use the customer table again. From the previous examples, we know that the nationkey field is a dictionary encoded field. We will insert some data to create new parquet files that do not contain a few of nation keys that will be between the min and max nation keys. This is to ensure that min/max filtering will not be effective for queries on nationkey.

insert into parquet_sf10_customer
select * from parquet_sf10_customer
where nationkey <> 10 and nationkey <> 20

Now if we execute a query like the following:

select custkey
from parquet_sf10_customer
where nationkey = 10

We would expect dictionary filtering to skip reading row groups that do not contain any data where nationkey is 10. Min/max filtering cannot be used since the min and max for nationkey is 0 and 24 respectively for all row groups in all parquet files.

The source stage for our query looks like:

Fragment 1 [SOURCE]
    CPU: 125.89ms, Scheduled: 1.11s, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 1500000 rows (25.75MB); per task: avg.: 375000.00 std.dev.: 406705.40, Output: 60101 rows (528.23kB)
    Amount of input data processed by the workers for this stage might be skewed
    Output layout: [custkey]
    Output partitioning: SINGLE []
    ScanFilterProject[table = datalake:hive:parquet_sf10_customer, filterPredicate = ("nationkey" = BIGINT '10')]
        Layout: [custkey:bigint]
        Estimates: {rows: 2880096 (24.72MB), cpu: 49.44M, memory: 0B, network: 0B}/{rows: 115204 (1012.53kB), cpu: 49.44M, memory: 0B, network: 0B}/{rows: 115204 (1012.53kB), cpu: 1012.53k, memory: 0B, network: 0B}
        CPU: 126.00ms (100.00%), Scheduled: 1.11s (100.00%), Blocked: 0.00ns (?%), Output: 60101 rows (528.23kB)
        Input avg.: 187500.00 rows, Input std.dev.: 183.10%
        nationkey := nationkey:bigint:REGULAR
        custkey := custkey:bigint:REGULAR
        Input: 1500000 rows (25.75MB), Filtered: 95.99%, Physical input: 3.20MB, Physical input time: 542.87ms

Notice the the number of inputs rows is 1500000. The total rows in the table is 2880096. This indicates we did not read all data. We can verify the data is in the dictionary page for the nationkey field by using the parquet CLI to view dictionary information for this field.

Row group 0 dictionary for "nationkey":
     0: 1
     1: 14
     2: 7
     3: 2
     4: 8
     5: 6
     6: 24
     7: 18
     8: 19
     9: 3
    10: 12
    11: 23
    12: 5
    13: 21
    14: 9
    15: 13
    16: 0
    17: 17
    18: 4
    19: 15
    20: 16
    21: 11
    22: 22

The nationkey values are the value in this dictionary. Notice that there is no 10 value in this dictionary. This allows the parquet reader to skip reading this row group entirely.

Data page filtering

Note that using parquet column indexes is currently only supported in the hive connector. There is an open PR to add support for this to Iceberg.

Data page filtering requires column indexes to be present in the parquet files being queried. The native parquet writer in newer versions of Trino (> 422) does not support writing column indexes. Older versions of Trino that used a different parquet writer support writing column indexes. Thus, I am going to query a table in this example created using an older version of Trino.

For this example, we will use this query:

SELECT orderkey
FROM "datalake"."hive"."old_writer_lineitem"
WHERE partkey = 1

This query uses both row group pruning and data page filtering. The source stage from the output of EXPLAIN ANALYZE looks like:

Fragment 1 [SOURCE]
    CPU: 1.12s, Scheduled: 8.16s, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 460000 rows (4.98MB); per task: avg.: 460000.00 std.dev.: 0.00, Output: 23 rows (207B)
    Output layout: [orderkey]
    Output partitioning: SINGLE []
    ScanFilterProject[table = datalake:hive.old_writer_lineitem, filterPredicate = ("partkey" = BIGINT '1')]
        Layout: [orderkey:bigint]
        Estimates: {rows: 342300666 (2.87GB), cpu: 5.74G, memory: 0B, network: 0B}/{rows: ? (?), cpu: 5.74G, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        CPU: 1.11s (100.00%), Scheduled: 8.15s (100.00%), Blocked: 0.00ns (?%), Output: 23 rows (207B)
        Input avg.: 1825.40 rows, Input std.dev.: 315.54%
        partkey := partkey:bigint:REGULAR
        orderkey := orderkey:bigint:REGULAR
        Input: 460000 rows (4.98MB), Filtered: 100.00%, Physical input: 9.00MB, Physical input time: 2.58s

Notice the number of input rows is 460000. The total number of rows in this table is 6000000000. This means we are filtering a lot of rows in the parquet reader. To understand why, lets look at the row group information in one of the parquet files for the table we are querying:

Row group 0:  count: 4730100  28.25 B records  start: 4  total(compressed): 127.436 MB total(uncompressed):314.723 MB
--------------------------------------------------------------------------------
               type      encodings count     avg size   nulls   min / max
orderkey       INT64     G _ R_ F  4730100   0.62 B     0       "1526974944" / "2174365665"
partkey        INT64     G   _     4730100   4.44 B     0       "75" / "199999958"
suppkey        INT64     G   _     4730100   3.89 B     0       "5" / "9999993"
linenumber     INT32     G _ R     4730100   0.17 B     0       "1" / "7"
quantity       DOUBLE    G _ R     4730100   0.75 B     0       "1.0" / "50.0"
extendedprice  DOUBLE    G   _     4730100   4.10 B     0       "903.88" / "104860.0"
discount       DOUBLE    G _ R     4730100   0.44 B     0       "-0.0" / "0.1"
tax            DOUBLE    G _ R     4730100   0.41 B     0       "-0.0" / "0.08"
returnflag     BINARY    G _ R     4730100   0.18 B     0       "A" / "R"
linestatus     BINARY    G _ R     4730100   0.11 B     0       "F" / "O"
shipdate       INT32     G _ R     4730100   1.50 B     0       "1992-01-02" / "1998-12-01"
commitdate     INT32     G _ R     4730100   1.49 B     0       "1992-01-31" / "1998-10-31"
receiptdate    INT32     G _ R     4730100   1.50 B     0       "1992-01-03" / "1998-12-30"
shipinstruct   BINARY    G _ R     4730100   0.25 B     0       "COLLECT COD" / "TAKE BACK RETURN"
shipmode       BINARY    G _ R     4730100   0.37 B     0       "AIR" / "TRUCK"
comment        BINARY    G   _     4730100   8.04 B     0       " Tiresias " / "zzle? express, final sauter"

Row group 1:  count: 4730100  28.25 B records  start: 133626543  total(compressed): 127.448 MB total(uncompressed):314.719 MB
--------------------------------------------------------------------------------
               type      encodings count     avg size   nulls   min / max
orderkey       INT64     G _ R_ F  4730100   0.62 B     0       "1529611680" / "2176597922"
partkey        INT64     G   _     4730100   4.44 B     0       "1" / "199999974"
suppkey        INT64     G   _     4730100   3.89 B     0       "2" / "10000000"
linenumber     INT32     G _ R     4730100   0.17 B     0       "1" / "7"
quantity       DOUBLE    G _ R     4730100   0.75 B     0       "1.0" / "50.0"
extendedprice  DOUBLE    G   _     4730100   4.10 B     0       "900.61" / "104928.0"
discount       DOUBLE    G _ R     4730100   0.44 B     0       "-0.0" / "0.1"
tax            DOUBLE    G _ R     4730100   0.41 B     0       "-0.0" / "0.08"
returnflag     BINARY    G _ R     4730100   0.18 B     0       "A" / "R"
linestatus     BINARY    G _ R     4730100   0.11 B     0       "F" / "O"
shipdate       INT32     G _ R     4730100   1.50 B     0       "1992-01-02" / "1998-12-01"
commitdate     INT32     G _ R     4730100   1.49 B     0       "1992-01-31" / "1998-10-31"
receiptdate    INT32     G _ R     4730100   1.50 B     0       "1992-01-03" / "1998-12-30"
shipinstruct   BINARY    G _ R     4730100   0.25 B     0       "COLLECT COD" / "TAKE BACK RETURN"
shipmode       BINARY    G _ R     4730100   0.37 B     0       "AIR" / "TRUCK"
comment        BINARY    G   _     4730100   8.04 B     0       " Tiresias " / "zzle? blithel"

Row group 2:  count: 4730100  28.25 B records  start: 267265365  total(compressed): 127.426 MB total(uncompressed):314.708 MB
--------------------------------------------------------------------------------
               type      encodings count     avg size   nulls   min / max
orderkey       INT64     G _ R_ F  4730100   0.61 B     0       "1532289632" / "2177798306"
partkey        INT64     G   _     4730100   4.44 B     0       "25" / "199999967"
suppkey        INT64     G   _     4730100   3.89 B     0       "1" / "10000000"
linenumber     INT32     G _ R     4730100   0.17 B     0       "1" / "7"
quantity       DOUBLE    G _ R     4730100   0.75 B     0       "1.0" / "50.0"
extendedprice  DOUBLE    G   _     4730100   4.10 B     0       "902.06" / "104898.5"
discount       DOUBLE    G _ R     4730100   0.44 B     0       "-0.0" / "0.1"
tax            DOUBLE    G _ R     4730100   0.41 B     0       "-0.0" / "0.08"
returnflag     BINARY    G _ R     4730100   0.18 B     0       "A" / "R"
linestatus     BINARY    G _ R     4730100   0.11 B     0       "F" / "O"
shipdate       INT32     G _ R     4730100   1.50 B     0       "1992-01-02" / "1998-12-01"
commitdate     INT32     G _ R     4730100   1.49 B     0       "1992-01-31" / "1998-10-31"
receiptdate    INT32     G _ R     4730100   1.50 B     0       "1992-01-04" / "1998-12-30"
shipinstruct   BINARY    G _ R     4730100   0.25 B     0       "COLLECT COD" / "TAKE BACK RETURN"
shipmode       BINARY    G _ R     4730100   0.37 B     0       "AIR" / "TRUCK"
comment        BINARY    G   _     4730100   8.03 B     0       " Tiresias " / "zzle? slyly unusual depos..."

Row group 3:  count: 1520210  28.54 B records  start: 400880731  total(compressed): 41.384 MB total(uncompressed):99.829 MB
--------------------------------------------------------------------------------
               type      encodings count     avg size   nulls   min / max
orderkey       INT64     G _ R_ F  1520210   0.90 B     0       "1535122564" / "2178671012"
partkey        INT64     G   _     1520210   4.44 B     0       "58" / "199999997"
suppkey        INT64     G   _     1520210   3.89 B     0       "6" / "10000000"
linenumber     INT32     G _ R     1520210   0.17 B     0       "1" / "7"
quantity       DOUBLE    G _ R     1520210   0.75 B     0       "1.0" / "50.0"
extendedprice  DOUBLE    G   _     1520210   4.10 B     0       "905.68" / "104825.0"
discount       DOUBLE    G _ R     1520210   0.44 B     0       "-0.0" / "0.1"
tax            DOUBLE    G _ R     1520210   0.41 B     0       "-0.0" / "0.08"
returnflag     BINARY    G _ R     1520210   0.18 B     0       "A" / "R"
linestatus     BINARY    G _ R     1520210   0.11 B     0       "F" / "O"
shipdate       INT32     G _ R     1520210   1.50 B     0       "1992-01-02" / "1998-12-01"
commitdate     INT32     G _ R     1520210   1.50 B     0       "1992-01-31" / "1998-10-31"
receiptdate    INT32     G _ R     1520210   1.50 B     0       "1992-01-04" / "1998-12-30"
shipinstruct   BINARY    G _ R     1520210   0.26 B     0       "COLLECT COD" / "TAKE BACK RETURN"
shipmode       BINARY    G _ R     1520210   0.37 B     0       "AIR" / "TRUCK"
comment        BINARY    G   _     1520210   8.04 B     0       " Tiresias " / "zzle; final packages"

This parquet file has 4 row groups. Notice that only row group 1 will be read because it is the only row group with min/max values for the partkey field that match the predicate value. So with row group pruning, we would read at least 4730100 rows.

Next, we will look at the column indexes for the partkey field in row group 1 using the parquet CLI:

column index for column partkey:
Boundary order: UNORDERED
                      null count  min                                       max
page-0                         0  3946                                      199999039
page-1                         0  17965                                     199998705
page-2                         0  3594                                      199991556
page-3                         0  26870                                     199996833
page-4                         0  1649                                      199981961
...
page-88                        0  1                                         199991447
...

Notice that page-88 is the only page with a minimum value that matches the value of the partkey field in the predicate. Thus, all other data pages in this row group will be skipped by the parquet reader. Page 88 contains 460000 rows and that matches the input count in the source stage for our query:

Input: 460000 rows (4.98MB), Filtered: 100.00%, Physical input: 9.00MB, Physical input time: 2.58s

To compare, let’s look at the same query against a table that was created with no column indexes in the underlying parquet files. The source stage for the same query against such a table looks like:

Fragment 1 [SOURCE]
    CPU: 2.38s, Scheduled: 8.27s, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 36636395 rows (316.07MB); per task: avg.: 36636395.00 std.dev.: 0.00, Output: 23 rows (207B)
    Output layout: [orderkey]
    Output partitioning: SINGLE []
    ScanFilterProject[table = datalake:parquetteststpch.no_col_indexes, filterPredicate = ("partkey" = BIGINT '1')]
        Layout: [orderkey:bigint]
        Estimates: {rows: 36636395 (314.45MB), cpu: 628.91M, memory: 0B, network: 0B}/{rows: 1 (9B), cpu: 628.91M, memory: 0B, network: 0B}/{rows: 1 (9B), cpu: 9, memory: 0B, network: 0B}
        CPU: 2.39s (100.00%), Scheduled: 8.27s (100.00%), Blocked: 0.00ns (?%), Output: 23 rows (207B)
        Input avg.: 872295.12 rows, Input std.dev.: 119.07%
        partkey := partkey:bigint:REGULAR
        orderkey := orderkey:bigint:REGULAR
        Input: 36636395 rows (316.07MB), Filtered: 100.00%, Physical input: 183.67MB, Physical input time: 4.61s

Notice the number of input rows is larger now due to the absence of column indexes.

While this artificial example shows some benefit to column indexes, in practice it’s not clear that there is much of a performance improvement from column indexes. These page level min/max column indexes don’t tend to be very selective unless the data is sorted by the field we are searching on. However, if the data is sorted then row group pruning tends to provide the majority of the benefit. All this to say I wouldn’t worry too much about whether or not the parquet files being queried contain column indexes.

Bloom filters

I do not have any parquet files with bloom filters at the moment to test with. However, the idea is same as dictionary filtering. In order for bloom filters to be used by the parquet reader in trino you will need to use version 406 or newer as that is when support for these was added.