Open7
Spark Scala
➜ 1brc git:(main) ✗ export JAVA_HOME=/opt/homebrew/Cellar/openjdk@11/11.0.27/libexec/openjdk.jdk/Contents/Home
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.functions._
/** 1BRC solution with hard-coded I/O paths (single-node dev convenience). */
object OneBillionRows {
// ── adjust these two constants only ────────────────────────────────────
private val InputPath = "measurements.txt" // <- your file
private val OutputPath = None: Option[String] // Some("/tmp/1brc") to write
// case-class schema for the fast manual parser
private case class Record(station: String, temperature: Double)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("1BillionRowChallenge")
.master("local[*]") // leave hard-coded for IDE runs
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "4g")
.config("spark.sql.shuffle.partitions", "512")
.getOrCreate()
import spark.implicits._
// ── 1. parse text file very cheaply ──────────────────────────────────
val data: Dataset[Record] = spark.read
.textFile(InputPath)
.mapPartitions { it =>
it.flatMap { line =>
val p = line.indexOf(';')
if (p > 0) {
val station = line.substring(0, p)
val temp = line.substring(p + 1).toDouble
Some(Record(station, temp))
} else None // skip corrupt rows
}
}(Encoders.product[Record])
// ── 2. min / mean / max per station ──────────────────────────────────
val stats = data.groupBy($"station")
.agg(
min("temperature").as("min"),
round(avg("temperature"), 1).as("mean"),
max("temperature").as("max")
)
.orderBy($"station")
// ── 3. format exactly as “station=min/mean/max” ──────────────────────
val out = stats.select(
concat_ws("/", $"station", $"min", $"mean", $"max").as("formatted")
)
// ── 4. emit ──────────────────────────────────────────────────────────
OutputPath match {
case Some(dir) => out.write.mode("overwrite").text(dir)
case None => out.collect().foreach(r => println(r.getString(0)))
}
spark.stop()
}
}
rg gunnarmorling futasoft
/Users/tafu/futasoft/1brc
➜ 1brc git:(main) ✗ javac src/main/java/dev/morling/onebrc/CreateMeasurements.java
java -cp src/main/java dev.morling.onebrc.CreateMeasurements 100
run it with local
Rust mmap, rayon
Rust Spark
Rust Spark Connector
use spark_connect_rs::{SparkSession, SparkSessionBuilder};
use spark_connect_rs::functions::{col, concat, lit, round, split};
use spark_connect_rs::types::DataType;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Initialize SparkSession with Spark Connect
let spark: SparkSession = SparkSessionBuilder::remote("sc://localhost:15002")
.build()
.await?;
// Input file path (adjust for your storage system, e.g., "hdfs://path/to/measurements.txt")
let input_path = "measurements.txt";
// Read the text file as a DataFrame
let df = spark
.read()
.format("text")
.load(vec![input_path.to_string()])?;
// Parse each line into (station, temperature)
let parsed_df = df
.select(vec![
// Split each line on semicolon
split(col("value"), ";")
.get_item(0)
.alias("station"),
// Cast temperature to double
split(col("value"), ";")
.get_item(1)
.cast(DataType::Double)
.alias("temperature"),
])?;
// Group by station and compute min, max, and mean
let result_df = parsed_df
.group_by(vec![col("station")])
.agg(vec![
col("temperature").min().alias("min_temp"),
col("temperature").max().alias("max_temp"),
col("temperature").avg().alias("avg_temp"),
])?
// Round mean to one decimal place
.select(vec![
col("station"),
col("min_temp"),
round(col("avg_temp"), 1).alias("avg_temp"),
col("max_temp"),
])?
// Sort by station name
.order_by(vec![col("station").asc()])?;
// Format output as {station}={min}/{mean}/{max}
let formatted_df = result_df.select(vec![
concat(
col("station"),
lit("="),
col("min_temp"),
lit("/"),
col("avg_temp"),
lit("/"),
col("max_temp"),
)
.alias("output"),
])?;
// Collect results and print
let rows = formatted_df.collect().await?;
for row in rows {
if let Some(output) = row.get_str(0)? {
println!("{}", output);
}
}
// Stop the SparkSession
spark.stop().await?;
Ok(())
}
