Distributed Engine V1.5 (Flotilla) is a redesign of Daft’s distributed Ray-based execution engine to improve scalability, performance, and observability, for multimodal data workloads.
This is to support the mission for Daft to become THE data engine for anything multimodal.
Daft is a distributed query engine providing simple and reliable data processing for any modality and scale. Typical workloads include:
Users define their workloads using Daft’s Dataframe or SQL API. For example, an embedding workload could look like:
import daft
from sentence_transformers import SentenceTransformer
@daft.udf(return_dtype=daft.DataType.embedding(daft.float32(), 768))
def embedding(text):
model = SentenceTransformer("BAAI/bge-base-en-v1.5")
return [model.encode(t) for t in text]
df = daft.read_json("s3://daft-public-data/redpajama-1t-sample/stackexchange_sample.jsonl")
df = df.select(embedding(df["text"]))
df.write_parquet("output")
Similar to traditional databases or other distributed query engines like Spark and Trino, Daft is lazy executing and has a query optimizer. The user’s query is expressed as a tree of operators, optimized using rules such as pushdowns and expression simplification. Post optimization, it is transformed into a physical plan, again a tree of operators.
For example, the tree of operators for the above code looks something like
read_json -> embedding_udf -> write
In order to run this operation across a distributed cluster, Daft chunks the input files into partitions, and schedules Ray tasks, one per partition, that perform the sequence of computations for this query on the partition of data.
While seemingly simple, the current partition per Ray-task model has a number of issues.
Consider the following query plan:
read_parquet -> url_download -> image_decode