📈

【.NET】最近マージされたLINQのMin/Maxの最適化

に公開

はじめに

最近dotnet/runtimeにマージされたPRで、個人的に面白いと思ったLINQのMin/Maxの最適化方法について紹介しておこうと思います。

LINQのMin/Maxの内部ロジック

https://github.com/dotnet/runtime/pull/127995

この最適化は、前提としてTIBinaryInteger<T>を実装している型、つまり数値型の場合(主にbyteshortなど)に適用されます。
以下のサンプルコード(byte配列の最小値を取得)を例に内部ロジックを解説します。

サンプル
var array = Enumerable.Range(1, 100).Shuffle().Select(x => (byte)x).ToArray();
var min = array.Min();
Console.WriteLine($"Min: {min}");

Min/Maxの内部ロジックはざっくりと以下の感じになっています。
見にくいですが、HorizontalMinMaxメソッドが今回最適化された部分です。

共通ロジック部分をざっくりと説明すると、まずbyte[100]から最小値を取得する際に、配列サイズに応じたSIMD機能による並列計算によって、byte[100]Vector128<byte>まで絞りこみます。

配列サイズが100の場合、以下のフローにより最小値を含めた16要素分をVector128<byte>として抽出します。

  • Vector512<byte>を用いたSIMD命令でbyte[100]からVector512<byte>にまで最小値を絞りこむ
  • Vector512<byte>の上位256bit/下位256bitを二つのVector256<byte>に切り出して、Vector256<byte>にまで最小値を絞りこむ
  • Vector256<byte>の上位128bit/下位128bitを二つのVector128<byte>に切り出して、Vector128<byte>にまで最小値を絞りこむ
byte[100]からVector128<byte>に絞りこむ処理
Vector128<byte> best128;

ReadOnlySpan<byte> data = span;
// byte[100]の先頭から64要素分をVector512<byte>として切り出す
Vector512<byte> best = Vector512.Create(data);
data = data.Slice(Vector512<byte>.Count);

while (data.Length > Vector512<byte>.Count)
{
    // byte[100]の64要素分ずつ切り出して、二つのVector512<byte>の最小値を要素ごとに計算する
    best = Vector512.Min(best, Vector512.Create(data));
    data = data.Slice(Vector512<byte>.Count);
}

// 余った要素は、重複した要素を含めた64要素分を切り出して、最小値を計算する
best = Vector512.Min(best, Vector512.Create(span.Slice(span.Length - Vector512<byte>.Count)));

// Vector512<byte>を二つのVector256<byte>として切り出して最小値を計算して、Vector256<byte>`にまで最小値を絞りこむ
Vector256<byte> best256 = Vector256.Min(best.GetLower(), best.GetUpper());

// Vector256<byte>を二つのVector128<byte>として切り出して最小値を計算して、Vector128<byte>にまで最小値を絞りこむ
best128 = Vector128.Min(best256.GetLower(), best256.GetUpper());

Vector512.min

最適化部分

そして、ここからが最適化された部分である、抽出した最小値を含めたVector128<byte>から最小値を取得するロジックになります。

まず最適化前は、Vector128<T>をbyteの一要素ずつ比較して最小値を取得するという処理になっており、15回の要素アクセスと15回の比較が行われます。

最適化前処理
byte value = best128[0];
// Vector128<byte>をbyteの一要素ずつ比較して最小値を取得
for (int i = 1; i < Vector128<T>.Count; i++)
{
    if (best128[i] < value)
    {
        value = best128[i];
    }
}

最適化後では、Vector128.Shuffleを用いた手法に変更されました。
今回のTbyteの場合には、以下のようになります。

最適化後処理(再現コード)
private static byte HorizontalMinMax(Vector128<byte> x)
{
    if (Vector128<byte>.Count == 16)
    {
        x = Vector128.Min(x, Vector128.Shuffle(x.AsByte(),
            Vector128.Create((byte)8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7)));
        x = Vector128.Min(x, Vector128.Shuffle(x.AsByte(),
            Vector128.Create((byte)4, 5, 6, 7, 0, 1, 2, 3, 8, 9, 10, 11, 12, 13, 14, 15)));
        x = Vector128.Min(x, Vector128.Shuffle(x.AsByte(),
            Vector128.Create((byte)2, 3, 0, 1, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)));
        x = Vector128.Min(x, Vector128.Shuffle(x.AsByte(),
            Vector128.Create((byte)1, 0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)));
    }
    else if (Vector128<byte>.Count == 8)
    {
        x = Vector128.Min(x, Vector128.Shuffle(x.AsInt16(),
            Vector128.Create(4, 5, 6, 7, 0, 1, 2, 3)).As<short, byte>());
        x = Vector128.Min(x, Vector128.Shuffle(x.AsInt16(),
            Vector128.Create(2, 3, 0, 1, 4, 5, 6, 7)).As<short, byte>());
        x = Vector128.Min(x, Vector128.Shuffle(x.AsInt16(),
            Vector128.Create(1, 0, 2, 3, 4, 5, 6, 7)).As<short, byte>());
    }
    else if (Vector128<byte>.Count == 4)
    {
        x = Vector128.Min(x, Vector128.Shuffle(x.AsInt32(), Vector128.Create(2, 3, 0, 1)).As<int, byte>());
        x = Vector128.Min(x, Vector128.Shuffle(x.AsInt32(), Vector128.Create(1, 0, 3, 2)).As<int, byte>());
    }
    else
    {
        Debug.Assert(Vector128<byte>.Count == 2);
        x = Vector128.Min(x, Vector128.Shuffle(x.AsInt64(), Vector128.Create(1, 0)).As<long, byte>());
    }
    return x.ToScalar();
}

Vector128.Shuffleは、ソースとなるVector128<T>Vector128<T>で作成したインデックスのセットをもとに要素の位置を変更するメソッドです。
以下のコードだと、インデックスのセットが[15, 14, ..., 0]と逆順になっているため、Vector128.Shuffleを実行すると要素の並びが逆になっているVector128<T>が作成されます。

Vector128.Shuffleのサンプルコード
byte[] bytes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
var byteVector = Vector128.Create(bytes);

// Vector128.Shuffleに指定するインデックス情報
var indices = Vector128.Create((byte)15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0);

// bytesの要素をindicesのインデックスを使用して値をセットする
// <16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1>
var shuffled = Vector128.Shuffle(byteVector, indices);

それを踏まえて、再度最適化処理(HorizontalMinMaxメソッド)を確認すると、

if (Vector128<byte>.Count == 16)
{
    // 一回目
    x = Vector128.Min(x, Vector128.Shuffle(x.AsByte(),
        Vector128.Create((byte)8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7)));
    // 二回目
    x = Vector128.Min(x, Vector128.Shuffle(x.AsByte(),
        Vector128.Create((byte)4, 5, 6, 7, 0, 1, 2, 3, 8, 9, 10, 11, 12, 13, 14, 15)));
    // 三回目
    x = Vector128.Min(x, Vector128.Shuffle(x.AsByte(),
        Vector128.Create((byte)2, 3, 0, 1, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)));
    // 四回目
    x = Vector128.Min(x, Vector128.Shuffle(x.AsByte(),
        Vector128.Create((byte)1, 0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)));
}

HorizontalMinMax

というように、インデックスを上下反転させたVector128<byte>と4回最小値計算をするというロジックになっています。
いわゆる二分探索木的な形で要素を16、8、4、2、1と圧縮しつつ最小値を出力するため、計算量も最適前と比較すると、要素数に対するステップ数がO(N)からO(log N)と改善されています。
PRにて記載されているベンチマーク結果からも、byteshortの場合、数倍から数十倍の高速化されました。

EgorBoさんのPRコメントには「Inspired by a similar helper in Tensors」があったことから確認したところ、おそらくSystem.Numerics.TensorsTensorPrimitives.Min/Maxで使われているTensorPrimitives.HorizontalAggregateをベースにされたと思われます。

https://github.com/dotnet/runtime/blob/v11.0.0-preview.4.26230.115/src/libraries/System.Numerics.Tensors/src/System/Numerics/Tensors/netcore/Common/TensorPrimitives.IBinaryOperator.cs#L2733-L2766

まとめ

SIMD関連による最適化はあまり知識が無い部分だったので、最初にソース変更箇所を見たときに、「ナニコレ?」という感想でした。
ただ、一つ一つ調べるとなるほどと思いましたので、今後はSystem.Numerics.Tensorsの実装も少しずつ確認していこうと思います。

それでは、今回はこのへんで。

Discussion