Mapboxは毎日、モバイル(iOSとAndroid)SDKから3億マイル以上の匿名化された位置情報データを収集しています。このデータを使って、過去の観測データから生成された特定の時間と特定の道路の速度推定値を計算し、「金曜日の正午、サンフランシスコのマーケットストリートではどのくらいの速度が予想されるか」といった予測を立てています。
この記事では、Apache Sparkを使って、世界中の道路ネットワークを対象に、1週間に300億件の速度推定値を算出する方法を紹介します。
推定速度の算出
当社のSDKから収集された遠隔測定サービスは、匿名化やプライバシーフィルタリングが行われ、経度や緯度などの座標情報を含むトレースに繋がります。最終的には、連続した座標から距離、時間、速度、方位の情報が導き出され、これを”速度プローブ”と呼びます。
トレースから生成されたプローブは、全世界の道路ネットワークと照合されます。マッチングプロセスの最後には、各トレースに平均速度、5分間のタイムバケット、道路セグメントを割り当てることができます。同じ道路上で、同じ5分間のタイムバケット内にあるマッチは、速度ヒストグラムを作成するために集約されます。最後に、集約されたヒストグラムごとに速度を推定します。下記図は、ある週の、ある時間帯の道路状況をもとに推測されたものです。
膨大なデータ
全世界の道路ネットワークに対するすべての遠隔測定トレースを毎日照合し、過去の観測結果を集約することで、毎日、5分間の時間間隔ごとに1つの道路状況(速度)を推定することができます。
膨大な量のデータを処理するのは大変そうですが、実際にはどのくらいのデータ量なのでしょうか。
毎週、平均して22億件のトレースを23億件の道路にマッチングさせ、54億件のマッチングを行っています。そのデータをもとに510億通りの速度ヒストグラムを作成し、最終的に300億通りの速度推定値を算出します。
データの大きさ、変換や計算の複雑さを考えると、大規模なデータセットを高速に処理できる大規模分散コンピューティングのフレームワークを提供するpySpark*の実装は非常に理にかなっています。
*pySpark:Sparkを実行するためのPython APIです。Apache Spark とPython のコラボレーションをサポートするためにリリースされました。
データ処理パイプラインの設計
まず最初に取り組んだのは、パイプラインの設計と、生成される様々なデータセットのスキームの設計です。私たちのパイプラインでは、各pySparkアプリケーションで、下流のアプリケーションが使用できるように、ハイブテーブルに永続化されたデータセットを生成します。
1つのpySparkアプリケーションですべてのステップ(マップマッチング、アグリゲーション、速度推定など)を実行するのではなく、各ステップをそれぞれのアプリケーションに分離しました。そうすることで、各アプリケーションのデータセットフィクスチャをモックアップすることができ、チーム内での初期開発をスピードアップし、作業を分散させるこができます。また、複雑な変換の結果を実際のデータでテストし、評価することも可能になりました。さらに、中間的なデータセットによって、データサイエンティストはパイプラインのさまざまなコンポーネントに対してモデル実験を行うことができます。
Mapboxは、テーブルを可能な限り正規化し、関連するテーブル間の関係を通じて最終的なトラフィックプロファイルのデータセットにたどり着くことを可能にしました。
正規化することで、データセットを作成するアプリケーションがテーブルのスキーマを定義することができ、データの整合性を保ち、データの冗長性を取り除くことができます。もちろん、結合などの変換が非常にコスト高になった場合には、非正規化という選択肢も念頭に置いています。
データ・パーティショニング
パーティショニングによって、データの一部に対するクエリがより速く、簡単になります。結果として得られたすべてのデータセットを、時間的および空間的な次元で分割します。
Airflowを使用することで、空間的なパーティショニングをパイプラインのオーケストレーションに簡単に引き継ぐことができ、「世界全体」を一度に実行するのではなく、パーティショニングごとにパイプラインを実行することができます。
これにより、各パイプラインのデータサイズが小さくなり、スケーラビリティが向上します。
パイプライン全体をテストするために、より粗いパーティションを選ぶことができるので、より迅速な開発、反復、本番データに対する頻繁なテストが可能になります。
データの歪み
データがパーティションやキーに均等に分散していない場合、データは歪んだ状態になります。これは遠隔測定データの一般的な特徴で、常に特定の地理的位置に他よりも多くのデータが存在することになります。
結合のような変換を行う際、Sparkはパーティショニング式の評価に応じてデータをコロケーションします。データのキーとなる変数が不均等に分布している場合、非常に大きなパーティションがいくつかできてしまうことがあります。
これは、Sparkがパーティションごとに1つのタスクを割り当てるため、処理を行う時に問題となります。
そのため、非常に大きなパーティションが1つある場合、そのパーティションを処理するタスクが大半の時間を占め、他のタスクの90%以上はすぐに完了してしまいます。これでは分散処理の目的が果たせず、リソースを無駄にしてしまいます。適度な数のタスクがほぼ同じ時間で実行されることが望ましいです。
Mapboxは、データの歪みを軽減するためのいくつかの方法を見つけました。
・パーティションの数を増やす:
repartitionやspark.sql.shuffleを使ってデータフレームのパーティション数を増やすことは、データが極端に歪んでいない場合に有効です。
・新しいユニークIDの作成:
ユニークIDカラムを追加して再分割すると、ハッシュ・パーティショナーが歪んだ変数とは無関係に各行をパーティションに割り当てるため、バランスの取れたパーティションが作成されます。
・キーのランダム化:
歪んだキーに対して結合などの変換を行う必要がある場合、キーにランダム化を加えることで、より均等に分散させることができます。まず、許容できるバッチサイズを定義し、パッチ内のすべてのキーを同じランダムな整数にします。これにより、大きなパーティションを均等に分散された小さなグループに分割することができます。
まとめ
MapboxのpySparkパイプラインは毎日何百億行ものデータを処理しており、反復作業やモデルの改善、変更点の評価を迅速に行うことができます。
この規模のデータを扱うプロジェクトでSparkを使うには、Sparkの内部を深く理解し、基礎となるデータがパフォーマンスにどのような影響を与えるかを理解している必要がありました。
*本記事は、Mapbox Inc. Blog の翻訳記事です。