Spark DataFrame은 클러스터의 여러 노드에 분할되어 병렬로 작동할 수 있는 org.apache.spark.sql.Row 개체의 분산된 데이터세트입니다. DataFrame은 R 또는 Python의 DataFrame과 유사하지만 Spark 최적화와 유사한 행 및 열이 있는 데이터 테이블을 나타냅니다. DataFrame은 파티션으로 구성되며, 각 파티션은 데이터 노드의 캐시에 있는 행 범위입니다.
![](/content/dam/en-zz/Solutions/ai-data-science/spark-ebook/Page_14.png)
DataFrames는 csv, parquet, JSON 파일, Hive 테이블 또는 외부 데이터베이스와 같은 데이터 소스에서 구성할 수 있습니다. DataFrame은 관계형 변환 및 Spark SQL 쿼리를 사용하여 작동할 수 있습니다.
Spark 셸 또는 Spark 노트북은 Spark를 대화형으로 사용할 수 있는 간단한 방법을 제공합니다. 다음 명령으로 로컬 모드에서 셸을 시작할 수 있습니다.
$ /[installation path]/bin/spark-shell --master local[2]
그런 다음 이 챕터의 나머지 부분에 있는 코드를 셸에 입력하여 결과를 대화형으로 볼 수 있습니다. 코드 예제에서 셸의 출력은 결과가 맨 처음에 나옵니다.
애플리케이션 드라이버와 클러스터 관리자 간의 실행 조정을 위해, 프로그램에서 다음 코드 예제와 같이 SparkSession 개체를 만듭니다.
val spark = SparkSession.builder.appName("Simple Application").master("local[2]").getOrCreate()
Spark 애플리케이션이 시작되면 마스터 URL을 통해 클러스터 관리자에게 연결됩니다. 마스터 URL은 SparkSession 개체를 만들거나 SparkSession 애플리케이션을 제출할 때 N 스레드를 사용하여 로컬로 실행되도록 클러스터 관리자 또는 로컬[N]으로 설정할 수 있습니다. Spark 셸 또는 노트북을 사용하는 경우 SparkSession 개체가 이미 만들어져 있으며 변수 Spark로 사용할 수 있습니다. 연결되면 클러스터 관리자는 클러스터의 노드에 대해 구성된 대로 리소스를 할당하고 실행자 프로세스를 시작합니다. Spark 애플리케이션이 실행되면 SparkSession은 실행을 위해 작업을 실행자에 보냅니다.
![](/content/dam/en-zz/Solutions/ai-data-science/spark-ebook/Page_17.png)
SparkSession 읽기 방법을 사용하면 파일에서 DataFrame으로 데이터를 읽고 스키마에 대한 파일 유형, 파일 경로 및 입력 옵션을 지정할 수 있습니다.
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val schema =
StructType(Array(
StructField("vendor_id", DoubleType),
StructField("passenger_count", DoubleType),
StructField("trip_distance", DoubleType),
StructField("pickup_longitude", DoubleType),
StructField("pickup_latitude", DoubleType),
StructField("rate_code", DoubleType),
StructField("store_and_fwd", DoubleType),
StructField("dropoff_longitude", DoubleType),
StructField("dropoff_latitude", DoubleType),
StructField("fare_amount", DoubleType),
StructField("hour", DoubleType),
StructField("year", IntegerType),
StructField("month", IntegerType),
StructField("day", DoubleType),
StructField("day_of_week", DoubleType),
StructField("is_weekend", DoubleType)
))
val file = "/data/taxi_small.csv"
val df = spark.read.option("inferSchema", "false")
.option("header", true).schema(schema).csv(file)
result:
df: org.apache.spark.sql.DataFrame = [vendor_id: double, passenger_count:
double ... 14 more fields]
take 메서드는 이 DataFrame의 개체가 포함된 배열을 반환하며, 이는 org.apache.spark.sql.Row 형식입니다.
df.take(1)
result:
Array[org.apache.spark.sql.Row] =
Array([4.52563162E8,5.0,2.72,-73.948132,40.829826999999995,-6.77418915E8,-1.0,-73.969648,40.797472000000006,11.5,10.0,2012,11,13.0,6.0,1.0])