PySpark 기본적인 사용법
pySpark 라이브러리 load
-
from pyspark.sql import SparkSession spark = SparkSession.builder.appName('XXX').getOrCreate()
- pySpark 사용하기 위해서는 먼저 SparkSession을 만들어야함
- SparkSession을 생성한다는 뜻은 spark 시스템을 생성하고 load 하겠다는 뜻.
- 즉 MasterNode, WorkerNode 등등이 생성된다 ---> 전체적인 Spark 시스템 생성 및 구성
파일 읽기(Read)
spark.read.csv('test.csv', inferSchema=True)
spark.read.json('test.json')
spark.read.load('test.json')
- 여러 포맷의 파일을 읽을 수 있음
- inferSchema 를 추가해서 데이터타입을 pyspark가 유추해서 int는 int로 str는 str로 데이터 타입을 저장하게 할 수 있음
- 만약 특정 디렉토리 안에 들어있는 모든 서브-디렉토리의 모든 json 파일을 읽고싶다면
-
spark.read.json('s3a://udacity-dend/song_data/*/*/*/*.json
- * 을 써서 모든 <디렉토리> 및 <파일>을 선택할 수 있음
- 그냥 "s3a://udacity-dend" 만 쓰면 자동으로 서브-디렉토리 검색 안함!!
데이터프레임 보기 (pandasDF.head())
df.show() : 모든 레코드 보기
df.take() : 몇개만 보기
df.head() : 몇개만 보기
- pandas의 head() 도 가능하고, pySpark의 take() 및 show() 모두 가능
데이터프레임 정보 (pandasDF.info())
데이터프레임 열(Column) 선택 ( pandasDF['age'], pandasDF.age )
df.select('age') # 1개 선택
df.select(['age', 'name']) # 2개 이상 선택
df['age'] 는 안됨!! .select 를 꼭 써야함
데이터프레임 열 추가 (pandasDF['newColumnName'])
df = df.withColumn('NewAge', df['age']+2)
- df 에 새로운 열 'NewAge' 를 원래 존재하던 'age' 열을 이용하여 만들 때
데이터프레임 열 제거 (pandasDF.drop('age', axis=1)
df = df.drop('newAge')
- 열 제거 할때는 pandas와 비슷하게 .drop() 을 쓰지만, axis=1 을 추가 할 필요 없음
Null 값 제거
df.na.drop(how="all" or "any").show()
- all: 레코드의 열 중 1개라도 Null 값이 있다면 제거
- any: 레코드의 모든 열이 Null 값이라면 제거
df.na.drop(how='any', subset=['Age', 'Name']).show()
- subset: 해당하는 열 가지고만 필터링함
Null 값 채우기
df = df.na.fill('Ohoh', ['age']).show()
from pyspark.ml.feature import Imputer
imputer = Imputer(
inputCols=['age', 'Exp'],
outputCols=['{}_imputed".format(c) for c in ['age','Exp']]
).setStrategy("mean")
# Null 값을 mean 값으로 대체하여 새로운 {}_imputed 열 생성 #
imputer.fit(df).transform(df).show()
Filter 필터링
df.filter( ("age <= 20") & ("exp>=100") ).select(["age", "exp"]).show()
- <'age' 20 이하> AND <'exp' 100 이상>인 레코드만 필터링해서 ['age', 'exp'] 열만 보여줌
GroupBy and Aggregate 함수
df.groupBy("age").sum().show()
- "age" column을 기준으로 모든 데이터가 묶여지게 되고, 그 중 integer type인 열은 자동으로 합해준다
'데엔- Udacity' 카테고리의 다른 글
ETL 정리 (0) | 2022.02.25 |
---|---|
Unix Timestamp 처리하는법 (0) | 2022.02.17 |
Udacity - Data Lakes w/ Spark - Data Lakes (0) | 2022.02.10 |
Udacity - Data Lakes w/ Spark - Intro to Spark (0) | 2022.01.26 |
Udacity - Cloud Data Warehouse - Implementing DWH (0) | 2022.01.07 |