Open7

Spark Scala

TFTF

➜ 1brc git:(main) ✗ export JAVA_HOME=/opt/homebrew/Cellar/openjdk@11/11.0.27/libexec/openjdk.jdk/Contents/Home

TFTF
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.functions._

/** 1BRC solution with hard-coded I/O paths (single-node dev convenience). */
object OneBillionRows {
    // ── adjust these two constants only ────────────────────────────────────
    private val InputPath = "measurements.txt" // <- your file
    private val OutputPath = None: Option[String] // Some("/tmp/1brc") to write

    // case-class schema for the fast manual parser
    private case class Record(station: String, temperature: Double)

    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder
          .appName("1BillionRowChallenge")
          .master("local[*]") // leave hard-coded for IDE runs
          .config("spark.driver.memory", "4g")
          .config("spark.executor.memory", "4g")
          .config("spark.sql.shuffle.partitions", "512")
          .getOrCreate()

        import spark.implicits._

        // ── 1. parse text file very cheaply ──────────────────────────────────
        val data: Dataset[Record] = spark.read
          .textFile(InputPath)
          .mapPartitions { it =>
              it.flatMap { line =>
                  val p = line.indexOf(';')
                  if (p > 0) {
                      val station = line.substring(0, p)
                      val temp = line.substring(p + 1).toDouble
                      Some(Record(station, temp))
                  } else None // skip corrupt rows
              }
          }(Encoders.product[Record])

        // ── 2. min / mean / max per station ──────────────────────────────────
        val stats = data.groupBy($"station")
          .agg(
              min("temperature").as("min"),
              round(avg("temperature"), 1).as("mean"),
              max("temperature").as("max")
          )
          .orderBy($"station")

        // ── 3. format exactly as “station=min/mean/max” ──────────────────────
        val out = stats.select(
            concat_ws("/", $"station", $"min", $"mean", $"max").as("formatted")
        )

        // ── 4. emit ──────────────────────────────────────────────────────────
        OutputPath match {
            case Some(dir) => out.write.mode("overwrite").text(dir)
            case None => out.collect().foreach(r => println(r.getString(0)))
        }

        spark.stop()
    }
}
TFTF

rg gunnarmorling futasoft

/Users/tafu/futasoft/1brc

➜ 1brc git:(main) ✗ javac src/main/java/dev/morling/onebrc/CreateMeasurements.java
java -cp src/main/java dev.morling.onebrc.CreateMeasurements 100

TFTF

Rust Spark Connector

use spark_connect_rs::{SparkSession, SparkSessionBuilder};
use spark_connect_rs::functions::{col, concat, lit, round, split};
use spark_connect_rs::types::DataType;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Initialize SparkSession with Spark Connect
    let spark: SparkSession = SparkSessionBuilder::remote("sc://localhost:15002")
        .build()
        .await?;

    // Input file path (adjust for your storage system, e.g., "hdfs://path/to/measurements.txt")
    let input_path = "measurements.txt";

    // Read the text file as a DataFrame
    let df = spark
        .read()
        .format("text")
        .load(vec![input_path.to_string()])?;

    // Parse each line into (station, temperature)
    let parsed_df = df
        .select(vec![
            // Split each line on semicolon
            split(col("value"), ";")
                .get_item(0)
                .alias("station"),
            // Cast temperature to double
            split(col("value"), ";")
                .get_item(1)
                .cast(DataType::Double)
                .alias("temperature"),
        ])?;

    // Group by station and compute min, max, and mean
    let result_df = parsed_df
        .group_by(vec![col("station")])
        .agg(vec![
            col("temperature").min().alias("min_temp"),
            col("temperature").max().alias("max_temp"),
            col("temperature").avg().alias("avg_temp"),
        ])?
        // Round mean to one decimal place
        .select(vec![
            col("station"),
            col("min_temp"),
            round(col("avg_temp"), 1).alias("avg_temp"),
            col("max_temp"),
        ])?
        // Sort by station name
        .order_by(vec![col("station").asc()])?;

    // Format output as {station}={min}/{mean}/{max}
    let formatted_df = result_df.select(vec![
        concat(
            col("station"),
            lit("="),
            col("min_temp"),
            lit("/"),
            col("avg_temp"),
            lit("/"),
            col("max_temp"),
        )
        .alias("output"),
    ])?;

    // Collect results and print
    let rows = formatted_df.collect().await?;
    for row in rows {
        if let Some(output) = row.get_str(0)? {
            println!("{}", output);
        }
    }

    // Stop the SparkSession
    spark.stop().await?;

    Ok(())
}