File size is probably the single largest contributor to poor query performance that I see when working with Trino. In this post, I wanted to cover the case where we are writing data through Trino using the Hive connector and how to control the size of files created in that case.

These notes are based on the latest Trino release at the time of writing - 370. Where appropriate, I have linked to open github issues that may change some of the behavior discussed here.

In this post, first we’ll discuss some of the pieces that affect file size when writing through Trino and then we’ll show some examples to get a feel for how this works in practice.

Writer Scaling

By default in Trino, the number of writer tasks is static. Trino will schedule a writer task on every node in the cluster up to hash_partition_count which defaults to 100. Writer scaling is a feature where Trino first creates a single writer task. When evaluating whether to add additional writer tasks, Trino uses the following metrics:

  • minimum size of files to be created. This is configured by writer_min_size which defaults to 32MB
  • current number of workers writer tasks are running on
  • total amount of physical bytes written so far
  • percentage of source tasks that have overutilized output buffers

If at least 50% of the source tasks have output buffers that are overutilized (this is used as an indication that writing is a bottleneck) and the total number of physical bytes written so far is greater than or equal to current_worker_tasks * writer_min_size than an additional writer task is scheduled.

There is an open PR to enable writer scaling by default.

Preferred write partitioning

By default, Trino creates a writer per partition on each worker. So if you are writing 10 partitions in a single write operation, there will be 10 * num_of_workers files created.

With preferred write partitioning, Trino will create a single writer for each partition. Files for a partition will be written until they reach the target file size. The target file size defaults to 1GB and can be configured with the hive.target-max-file-size property.

Preferred write partitioning is enabled by default in Trino but only kicks in once the number of partitions to write is greater than preferred-write-partitioning-min-number-of-partitions. This property defaults to 50.

Preferred write partitioning can be problematic if you are writing many partitions but you have some data skew. That is, a small number of the partitions you are writing have significantly more data than the rest. In this case, you will see poor throughput as the write operation will be bottlenecked on the few partitions that have lots of data.

Unfortunately, writer scaling does not work when preferred write partitioning is enabled. There is an open github issue to address this limitation.

One option for this scenario is to write these individual partitions as a single write operation i.e. INSERT and enable writer scaling.

Other considerations

By default, a writer task has one writer but Trino has an option to allow you to configure multiple writers per writer task. This is controlled by the task_writer_count session property or task.writer-count system property. Each additional writer that is added to a writer task will write its own file.

Hive writers cannot open more than 100 partitions by default. This is configurable with hive.max-partitions-per-writers. This is one of the reasons why preferred write partitioning kicks in when you are writing 50 partitions by default. The idea is to prevent query failure due to too many open writers. An open github issue exists to take this into account when considering whether to use preferred write partitioning or not.

Examples

Lets go through some examples to get a better feel for how some of this works in practice. All these examples were run on a 20 node cluster with 1 coordinator and 19 workers. We will use the sf100.orders table as input from the tpch connector.

Unpartitioned tables

Lets start with the default values for all configuration properties writing to an unpartitioned table and see what happens. We are going to execute the following SQL statement to write some data:

CREATE TABLE hive.schema.orders AS
SELECT * from tpch.sf100.orders;

This will create a table with 150 million rows. Now lets see what files make up the table along with their respective file sizes.

trino> SELECT DISTINCT("$path"), "$file_size"/1024/1024 "file size (MB)"
    -> FROM hive.schema.orders;
                   $path                                                   | file size (MB)
---------------------------------------------------------------------------+----------------
 s3://bucket/schema/orders/000000_1148970250376273816915470453360745006588 |             68
 s3://bucket/schema/orders/000000_1215123384347621954917352838970147489146 |            218
 s3://bucket/schema/orders/000000_94932387121933732399061307661938279715   |            223
 s3://bucket/schema/orders/000000_117806137078366536011610830131126485742  |            215
 s3://bucket/schema/orders/000000_1001153647564003259415894971498455909464 |            231
 s3://bucket/schema/orders/000000_133414678567723366599987854974243915598  |            218
 s3://bucket/schema/orders/000000_104241354575902948509585735493038720361  |            200
 s3://bucket/schema/orders/000000_9959593748156202080139674613535097330    |            212
 s3://bucket/schema/orders/000000_934969781173770046110569100158810998349  |            170
 s3://bucket/schema/orders/000000_116238996385648034174033471464866464229  |             93
 s3://bucket/schema/orders/000000_1089771682823928534210552029250628502661 |             76
 s3://bucket/schema/orders/000000_124255962216982642672834186959532804536  |            232
 s3://bucket/schema/orders/000000_11917984803501896447758314434888550646   |             95
 s3://bucket/schema/orders/000000_1257820106788810439610628825335490890891 |            223
 s3://bucket/schema/orders/000000_136162185143624326563707220924434368862  |            230
 s3://bucket/schema/orders/000000_100091554983688721474073407018834215054  |             76
 s3://bucket/schema/orders/000000_1256622394392217552415841354466181267646 |            238
 s3://bucket/schema/orders/000000_1219136186713346171415531183136280889695 |            240
 s3://bucket/schema/orders/000000_9637899140263774860499238271213719254    |            232
(19 rows)

trino>

Notice there is 19 files created as expected since as we mentioned earlier, Trino will schedule a writer task on every node in the cluster up to hash_partition_count.

Lets drop the table and set some session properties now:

DROP TABLE hive.schema.orders;
SET SESSION scale_writers=true;
CREATE TABLE hive.schema.orders AS
SELECT * from tpch.sf100.orders;

If you examine the files created, you will see there is almost no difference. There will be 19 files created pretty much the same sizes as when scale_writers was false. The reason for this is because writer_min_size defaults to 32MB. So we did start with one writer but writer scaling kicked in very quickly and scaled to one writer on every node in the cluster.

Lets increase writer_min_size and see what happens:

DROP TABLE hive.schema.orders;
SET SESSION scale_writers = true;
SET SESSION writer_min_size = '256M';
CREATE TABLE hive.schema.orders AS
SELECT * from tpch.sf100.orders;

Now, the file listing for the table looks different:

trino> SELECT DISTINCT("$path"), "$file_size"/1024/1024 "file size (MB)"
    -> FROM hive.schema.orders;
                                                 $path                     | file size (MB)
---------------------------------------------------------------------------+----------------
 s3://bucket/schema/orders/000000_1377595777372344627116636555983565637996 |             33
 s3://bucket/schema/orders/000000_99009178714256612358893726592261639547   |             68
 s3://bucket/schema/orders/000000_13472050365964937787172296248786177422   |            106
 s3://bucket/schema/orders/000000_1235983261586334774415807092221720413995 |            247
 s3://bucket/schema/orders/000000_1290934766225256968918093139793381378849 |             44
 s3://bucket/schema/orders/000000_99666591088221896837512665438916461593   |            522
 s3://bucket/schema/orders/000000_1314592899941435688912102934398036230458 |            270
 s3://bucket/schema/orders/000000_93516165430063608919362987023330987077   |             68
 s3://bucket/schema/orders/000000_127796541167598661858541674897111794985  |            179
 s3://bucket/schema/orders/000000_97541421057189906701251290742318121084   |            366
 s3://bucket/schema/orders/000000_96638551161615777301182887331064989471   |            465
 s3://bucket/schema/orders/000000_995172082584028822516636345442201128353  |            896
 s3://bucket/schema/orders/000000_1310607426014842381610388882407432078718 |            134
 s3://bucket/schema/orders/000000_969936561665743341218015719903539776034  |             95
(14 rows)

trino>

Now there is 14 files for the table with varying sizes. This is because it took longer for writer scaling to kick in. Remember writer scaling adds a new writer when the total amount of bytes written so far is greater than or equal to current_worker_tasks * writer_min_size. So with a larger writer_min_size not as many writers were required.

Finally, lets see what happens if we set task_writer_count to 4.

DROP TABLE hive.schema.orders;
SET SESSION scale_writers = false;
SET SESSION task_writer_count = 4;
CREATE TABLE hive.schema.orders AS
SELECT * from tpch.sf100.orders;

Examining the files in the table now:

trino> SELECT DISTINCT("$path"), "$file_size"/1024/1024 "file size (MB)"
    -> FROM hive.schema.orders;
               $path                                                       | file size (MB)
---------------------------------------------------------------------------+----------------
 s3://bucket/schema/orders/000000_1327778650584285583914638612389897650257 |             46
 s3://bucket/schema/orders/000000_94789734096748700767106657631149640245   |             46
 s3://bucket/schema/orders/000000_133316333924262072775421837119926648989  |             47
 s3://bucket/schema/orders/000000_103945055866900400305793036419297658186  |             46
 s3://bucket/schema/orders/000000_1352096330589007547313025749534328638021 |             47
 s3://bucket/schema/orders/000000_11741659409465051430275721966299661714   |             47
 s3://bucket/schema/orders/000000_930015843200006077215938462065578362118  |             48
 s3://bucket/schema/orders/000000_130472521844905959243143795957553317277  |             45
 s3://bucket/schema/orders/000000_1235200687630939159713725338262038529276 |             41
 s3://bucket/schema/orders/000000_94949123118577522958958140797445228575   |             48
 s3://bucket/schema/orders/000000_119717423415929640441547267762871685047  |             45
 s3://bucket/schema/orders/000000_10356034476499632116853388213431584148   |             44
 ...
 s3://bucket/schema/orders/000000_106889506735014793328862626510383760736  |             43
 s3://bucket/schema/orders/000000_127973413398668769087683224367368388632  |             46
 s3://bucket/schema/orders/000000_9479488288009668113409431449173704708    |             48
 s3://bucket/schema/orders/000000_1044722728954301272514380956339957024693 |             50
 s3://bucket/schema/orders/000000_137819937315507510504351574064435249970  |             43
(76 rows)

trino>

I have not shown all files in the above output but now we have 76 files. This matches what we would expect as we have 19 workers and 4 writers per writer task. Notice how files are also smaller now.

Partitioned tables

We are going to create a table that is partitioned by the orderstatus column in the orders table. First, lets see how many partitions there will be and how many rows for each partition:

trino> SELECT orderstatus, count(*) FROM tpch.sf100.orders GROUP BY orderstatus;
 orderstatus |  _col1
-------------+----------
 O           | 73086053
 F           | 73072502
 P           |  3841445
(3 rows)

trino>

There will be 3 partitions. O and F will have the same amount of data and P has significantly less.

Lets start with the default values for all configuration properties writing to an partitioned table and see what happens. We are going to execute the following SQL statement to write some data:

CREATE TABLE hive.schema.orders_part WITH (
    PARTITIONED_BY = ARRAY['orderstatus']
) AS
SELECT
  orderkey, custkey,
  totalprice, orderdate,
  orderpriority, clerk,
  shippriority, "comment", orderstatus
FROM tpch.sf100.orders;

Lets look at what files make up the table:

trino> SELECT DISTINCT("$path"), "$file_size"/1024/1024 "file size (MB)"
    -> FROM hive.schema.orders_part;
                         $path                                                                | file size (MB)
----------------------------------------------------------------------------------------------+----------------
 s3://bucket/schema/orders_part/orderstatus=O/000000_128584668682339574879802670643519570829  |             96
 s3://bucket/schema/orders_part/orderstatus=P/000000_123319395652705810246138819005516890606  |              5
 s3://bucket/schema/orders_part/orderstatus=F/000000_112406935902812657386577229574238848654  |             93
 s3://bucket/schema/orders_part/orderstatus=P/000000_127316066648024189353330844171795252872  |              3
 s3://bucket/schema/orders_part/orderstatus=P/000000_107837501527688671693884590968487690637  |              5
 s3://bucket/schema/orders_part/orderstatus=O/000000_112652705648896950256952745918657939195  |             96
 s3://bucket/schema/orders_part/orderstatus=P/000000_96401151262324353316613301189825482478   |              5
 s3://bucket/schema/orders_part/orderstatus=O/000000_1252745054876985227015755303221707622310 |             73
 s3://bucket/schema/orders_part/orderstatus=O/000000_109198811615840855187654035085899155872  |             95
 s3://bucket/schema/orders_part/orderstatus=O/000000_10982354965074499624397721986189051911   |             77
 s3://bucket/schema/orders_part/orderstatus=F/000000_117596718225736561815480560455431177255  |            101
 s3://bucket/schema/orders_part/orderstatus=O/000000_101551444329366153847054119236203530366  |            101
 s3://bucket/schema/orders_part/orderstatus=O/000000_136895592240784148214939346167992699414  |            102
 s3://bucket/schema/orders_part/orderstatus=F/000000_1021403776743514575115577786482170350623 |             66
 s3://bucket/schema/orders_part/orderstatus=P/000000_1276088785191487719511571484253424667019 |              4
...
 s3://bucket/schema/orders_part/orderstatus=O/000000_102010231565598635379308522615862477435  |             95
 s3://bucket/schema/orders_part/orderstatus=P/000000_94698171298649936721502727724916032741   |              5
(57 rows)

trino>

I have not shown all files in the above output but you can see that there is 57 total files. This matches our expectations since we have 3 partitions to write to and 19 workers to schedule writers on so 57 files.

Next lets see what happens when we use preferred write partitioning. To have preferred write partitioning kick in for this table, we need to set preferred_write_partitioning_min_number_of_partitions to 3 or less. So lets set it to one.

DROP TABLE hive.schema.orders_part;
SET SESSION preferred_write_partitioning_min_number_of_partitions = 1;
CREATE TABLE hive.schema.orders_part WITH (
    PARTITIONED_BY = ARRAY['orderstatus']
) AS
SELECT
  orderkey, custkey,
  totalprice, orderdate,
  orderpriority, clerk,
  shippriority, "comment", orderstatus
FROM tpch.sf100.orders;

Examing the list of files created for the table now we see:

trino> SELECT DISTINCT("$path"), "$file_size"/1024/1024 "file size (MB)"
    -> FROM hive.schema.orders_part;
                       $path                                                                  | file size (MB)
----------------------------------------------------------------------------------------------+----------------
 s3://bucket/schema/orders_part/orderstatus=O/000000_1254472926280513888711848091454450452149 |            692
 s3://bucket/schema/orders_part/orderstatus=F/000000_1068576034620793011018243320222690066787 |            692
 s3://bucket/schema/orders_part/orderstatus=P/000000_126485365865919680615562458566484181697  |             91

trino>

Now there is one file created per partition.

Summary

These are my individual notes that I have collected while working with Trino. I am following development of the Iceberg connector and will write up my notes around writing with Iceberg once I have some more experience with it in real world deployments.