본문 바로가기
데엔- Udacity

Udacity - Data Lakes w/ Spark - PySpark

by 녕나 2022. 2. 16.

PySpark 기본적인 사용법

pySpark 라이브러리 load

  • from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('XXX').getOrCreate()
  • pySpark 사용하기 위해서는 먼저 SparkSession을 만들어야함
  • SparkSession을 생성한다는 뜻은 spark 시스템을 생성하고 load 하겠다는 뜻.
    • 즉 MasterNode, WorkerNode 등등이 생성된다 ---> 전체적인 Spark 시스템 생성 및 구성

파일 읽기(Read)

spark.read.csv('test.csv', inferSchema=True)
spark.read.json('test.json')
spark.read.load('test.json')​

 

  • 여러 포맷의 파일을 읽을 수 있음
    • inferSchema 를 추가해서 데이터타입을 pyspark가 유추해서 int는 int로 str는 str로 데이터 타입을 저장하게 할 수 있음
  • 만약 특정 디렉토리 안에 들어있는 모든 서브-디렉토리의 모든 json 파일을 읽고싶다면
  • spark.read.json('s3a://udacity-dend/song_data/*/*/*/*.json
    • * 을 써서 모든 <디렉토리> 및 <파일>을 선택할 수 있음
    • 그냥 "s3a://udacity-dend" 만 쓰면 자동으로 서브-디렉토리 검색 안함!!

데이터프레임 보기 (pandasDF.head())

df.show() : 모든 레코드 보기
df.take() : 몇개만 보기
df.head() : 몇개만 보기
  • pandas의 head() 도 가능하고, pySpark의 take() 및 show() 모두 가능

데이터프레임 정보 (pandasDF.info())

df.printSchema()
df.describe().show()

데이터프레임 열(Column) 선택 ( pandasDF['age'], pandasDF.age )

df.select('age')             # 1개 선택
df.select(['age', 'name'])   # 2개 이상 선택

df['age'] 는 안됨!! .select 를 꼭 써야함

데이터프레임 열 추가 (pandasDF['newColumnName'])

df = df.withColumn('NewAge', df['age']+2)
  • df 에 새로운 열 'NewAge'원래 존재하던 'age' 열을 이용하여 만들 때

데이터프레임 열 제거 (pandasDF.drop('age', axis=1)

df = df.drop('newAge')
  • 열 제거 할때는 pandas와 비슷하게 .drop() 을 쓰지만, axis=1 을 추가 할 필요 없음

Null 값 제거

df.na.drop(how="all" or "any").show()
  • all: 레코드의 열 중 1개라도 Null 값이 있다면 제거
  • any: 레코드의 모든 열이 Null 값이라면 제거
df.na.drop(how='any', subset=['Age', 'Name']).show()
  • subset: 해당하는 열 가지고만 필터링함

Null 값 채우기

df = df.na.fill('Ohoh', ['age']).show()

from pyspark.ml.feature import Imputer
		imputer = Imputer(
		inputCols=['age', 'Exp'],
        outputCols=['{}_imputed".format(c) for c in ['age','Exp']]
            ).setStrategy("mean")
            
# Null 값을 mean 값으로 대체하여 새로운 {}_imputed 열 생성 #
imputer.fit(df).transform(df).show()

Filter 필터링

df.filter( ("age <= 20") & ("exp>=100") ).select(["age", "exp"]).show()
  • <'age' 20 이하> AND <'exp' 100 이상>인 레코드만 필터링해서 ['age', 'exp'] 열만 보여줌

GroupBy and Aggregate 함수

df.groupBy("age").sum().show()
  • "age" column을 기준으로 모든 데이터가 묶여지게 되고, 그 중 integer type인 열은 자동으로 합해준다