OkBublewrap

Spark (3): 데이터 핸들링, 최적화 기법, 멀티프로세싱 및 검증 본문

Python/학습용

Spark (3): 데이터 핸들링, 최적화 기법, 멀티프로세싱 및 검증

옥뽁뽁 2025. 3. 3. 23:27

Data Handling

read_parquet

%%time
df = pd.read_parquet("df_optimized.parquet")

CPU times: user 12.4 s, sys: 3.9 s, total: 16.3 s
Wall time: 14.6 s

 

method chaning

# 중간 변수를 활용한 방식
grouped = df.groupby("app_name").review_rating.agg(["size", "mean"])
reset = grouped.reset_index()
sorted_df = reset.sort_values("mean", ascending=False)

## method chaning
df.groupby("app_name").review_rating.agg(["size", "mean"]).reset_index().sort_values("mean", ascending=False)

 

1. 가독성이 좋음 (Readable & Concise)

  • 중간 변수 없이 한 줄로 처리 가능 → 코드가 직관적임
  • 위에서 아래로 흐름이 자연스럽게 이어짐
  • SQL 쿼리처럼 데이터 변형 과정을 한눈에 파악할 수 있음

2. 불필요한 중간 변수 제거

  • 코드가 길어질수록 중간 변수를 사용하면 관리하기 어려워짐
  • 메모리 사용을 줄일 수 있음 (특히 큰 데이터프레임에서는 효과적)
  • 불필요한 변수 선언 없이 바로 처리 가능

3. Pandas 내부 최적화 활용 가능

  • Pandas는 메서드 체이닝을 활용하면 백엔드에서 최적화된 연산을 수행할 수 있음
  • 중간 변수를 만들면 일부 연산이 반복될 가능성이 있음
    (예: reset_index() 후 다시 정렬하면 추가적인 연산 발생 가능)
  • 체이닝을 사용하면 필요 없는 연산을 최소화하여 성능 최적화 효과를 얻을 수 있음

4. 유지보수 및 디버깅 용이

  • 한 줄씩 주석을 달아가며 단계별로 이해 가능
  • .pipe() 등을 활용하면 체이닝을 더 깔끔하게 만들 수도 있음
(df.groupby("app_name")
   .review_rating
   .agg(["size", "mean"])
   .reset_index()
   .sort_values("mean", ascending=False)
)

 

함수내 함수 연속적용 (pipe)

 

03-03. 함수내 함수 연속적용 (pipe)

####DataFrame.pipe(func, args, kwargs) ##개요 pipe 메서드는 함수를 연속적으로 사용할 때 유용한 메서드입니다. 특히 함수가 인수를 사용할 …

wikidocs.net

 

기본 사용법
df.pipe(func, args, kwargs)
func : 함수입니다.
arg : 함수의 인수입니다.
kwargs : dict 형태의 함수의 인수입니다.

 

f1(data , x), f2(data, y), f3(data, x, y) 일 때

result = f3(f2(f1(data, x), y), x, y)

 

함수 호출이 중첩될수록 가독성이 떨어짐중간 함수의 결과를 확인하려면 코드를 수정해야 함

(result := (
    data
    .pipe(f1, x)
    .pipe(f2, y)
    .pipe(f3, x, y)
))

 

pipe 함수를 쓸 수 있을 때 직관적으로 표현코드 유지보수 및 디버깅이 용이하다.

 

groupby iteration

## groupby iteration
for app_name, group in df.groupby("app_name"):
    review_first = group.review_timestamp.min().strftime("%Y-%m")
    review_last = group.review_timestamp.max().strftime("%Y-%m")
    print(f"{app_name}: {review_first} ~ {review_last}")

 

그 외에도 

# 2. itertuples (행 단위 반복, namedtuple 형태로)
print("\n2. itertuples")
for row in df.itertuples(index=False):
    print(row.app_name, row.review_rating)
    
# 3. items (컬럼 단위 반복)
print("\n3. items")
for column_name, column_data in df.items():
    print(f"Column: {column_name}")
    print(column_data[:5])  # 첫 5개만 출력

 

다양하게 활용할 수 있다.

 

nlargets vs sort_values

%%time
## top likes
df.review_likes.nlargest()

%%time
## top likes
df.review_likes.sort_values(ascending=False).head(5)

둘은 똑같은 결과를 내고 있지만 속도 측정 만큼은 다르다.

 

함수 CPU times sys total Wall time
nlargets 150 ms 77 ms 227 ms 226 ms
sort_values 549 ms 123 ms 672 ms 671 ms

 

코드를 실행을 할 때 메모리와 시간을 항시 주목하자

 

Vectorize

%%time
(df.review_timestamp
    .apply(lambda x: x.month)
    .value_counts()
    .sort_index()
    .plot())
    
%%time
(df.review_timestamp
    .dt.month
    .value_counts()
    .sort_index()
    .plot())
    
%%time
(df.review_timestamp
    .dt
    .to_period('M')
    .value_counts()
    .sort_index()
    .plot())
    
%%time
(df.review_timestamp
    .apply(lambda x: x.strftime('%Y-%m-01'))
    .value_counts()
    .sort_index().plot())
함수 CPU times sys total Wall time
apply(lambda) 19.1 s 985 ms 20.1 s 20.3 s
dt.month 865 ms 54.5 ms 919 ms 520 ms
dt.to_period('M') 626 ms 93.9 ms 719 ms 720 ms
apply(strftime()) 50.1 s 1.55 s 51.7 s 51.4 s

 

1. apply(lambda) : 한 줄 씩 실행

2. dt.month 벡터화 접근 방식

3. dt.to_period('M')은 dt.month보다 느린결과를 보임

4. apply(strftime()) : 문자열로 날짜를 변환하는 작업이고, 한 줄 씩 실행되서 성능이 매우 낮음

 

연-월로 계산을 할 때 좋지 않은 코드

%%time
(df.review_timestamp
    .apply(lambda x: x.strftime('%Y-%m-01'))
    .value_counts()
    .sort_index().plot())

 

월별 평가 집계

# review_timestamp_month 컬럼 생성
df = df.assign(review_timestamp_month = df.review_timestamp.dt.to_period('M'))

# 시각화
plt.figure(figsize=(25,15))
for idx, (app_name, group) in enumerate(df.groupby("app_name")):
    monthly_rating = group.groupby("review_timestamp_month").review_rating.mean() # 그룹별 평점 평균
    plt.subplot(2, 4, idx+1) # 
    monthly_rating.plot(alpha=0.2) # 본 데이터 연하게
    monthly_rating.rolling(3).mean().plot() # 3일 이동평균 시각화
    plt.title(app_name) # 그래프 이름
    plt.legend(["Raw", "Smoothing"]) # 그래프 범례
    plt.xticks(rotation=15) # x축 이름 살짝 틀게
    plt.ylim(0, 5) # y축 고정
plt.show()

 

 

 

트렌드

우리의 앱 평점이 상승 추세인지 하강 추세인지 감별하고 싶다.

이를 위해 주식에서 많이 사용하는 골든, 데드 크로스를 사용하기로 하였다

일 단위로 평균 rating 집계

리뷰가 5개 이하인 일자는 제외(조금 있음)

20일자의 이평선이, 60일자의 이평선을 뚫고 내려가거나(데드크로스), 돌파하여 올라갈 때 (골든 크로스)를 감지한다.

 

# 날짜 컬럼 할당
df = df.assign(review_timestamp_day = df.review_timestamp.dt.date)

# app name, 날짜별 평점 개수, 평균 집계
df_daily = df.groupby(["app_name", "review_timestamp_day"]).review_rating.agg(["size", "mean"])
df_daily.head()

# 집계가 없는 것 확인
df_daily.nsmallest(3, "size")

# 컬럼명 변경 및 인덱스 초기화
df_daily = df_daily.rename(columns={"size":"review_count", "mean":"review_mean"}).reset_index()
df_daily.head()

# review_count가 없는것은 제외
df_daily = df_daily[df_daily['review_count'] != 0]

# 5점 이하의 리뷰는 삭제
filter_count = (df_daily["review_count"] <= 5).sum()
if filter_count > 0:
    filter_ratio = filter_count / len(df_daily)
    print(f"{filter_count}개 row droped")
    df_daily = df_daily[df_daily["review_count"] > 5]

 

 

코드 비교

df_daily_facebook = df_daily[df_daily.app_name == "FACEBOOK_REVIEWS"]
df_daily_facebook = df_daily_facebook.reset_index()

 

# 1 안좋은 코드
%%time
trends = []
ma_longs = []
ma_shorts = []
for i in range(1, len(df_daily_facebook)+1):
    if i < 61:
        trends.append(None)
        continue
    ma_long = df_daily_facebook.review_mean.iloc[i-60:i].mean()
    ma_short = df_daily_facebook.review_mean.iloc[i-20:i].mean()
    if ma_short > ma_long:
        trend = "Up"
    elif ma_short < ma_long:
        trend = "Down"
    else:
        trend = "Neutral"
    trends.append(trend)
    
    ma_longs.append(ma_long)
    ma_shorts.append(ma_short)
# 2
%%time
ma_long = df_daily_facebook.review_mean.rolling(60).mean()
ma_short = df_daily_facebook.review_mean.rolling(20).mean()
trend = np.where(ma_short == ma_long, 'Neutral', np.where(ma_short > ma_long, 'Up', 'Down'))
trend[:60] = None

 

  CPU times sys total Wall time
# 1 148 ms 1.4 ms 150 ms 150 ms
# 2 922 µs 664 µs 1.59 ms 1.08 ms

 

# 1 : O(n) 반복문 수행 → 느림

# 2 : 벡터 연산(O(1)) → 빠름

 

# 값 할당
df_daily_facebook = df_daily_facebook.assign(trend_np = trend)
df_daily_facebook = df_daily_facebook.assign(trend = trends)

df_daily_facebook[df_daily_facebook["trend"] != df_daily_facebook["trend_np"]]
  • for 루프와 rolling().mean() 계산 방식에서 차이가 발생했음

 

df_daily_facebook["trend"].equals(df_daily_facebook["trend_np"])

다른 방법으로는 equals() 방법을 사용해서 동일한결과가 나오는지 확인할 수 있음

  • True이면 완전히 동일함
  • False이면 일부 값이 다름

 

plt.figure(figsize=(20, 5))
color_map = {"Up":"Red", "Down":"Blue", "Neutral":"Grey"}
for trend, group in df_daily_facebook.groupby("trend"):
    color = color_map[trend]
    plt.plot(group.review_timestamp_day, group.review_mean, color=color, alpha=0.2)
    plt.plot(group.review_timestamp_day, group.review_mean.rolling(14).mean(), color=color, alpha=0.8)
plt.show()

 

값에는 up, down으로 빨간색, 파란색으로 나오는데 수상하게 이어지는 부분이 있다. 

 

함수화와 검증

→ 위에는 FACEBOOK만 했음. 다른 카테고리도 봐야하니 함수화하는게 효율적

def trend_calculator(df_target):
    df_target = df_target.reset_index(drop=True)
    ma_long = df_target.review_mean.rolling(60).mean()
    ma_short = df_target.review_mean.rolling(20).mean()
    trend = np.where(ma_short == ma_long, 'Neutral', np.where(ma_short > ma_long, 'Up', 'Down'))
    df_target = df_target.assign(trend = trend)
    df_target.loc[df_target.index < 60, "trend"] = None
    return df_target

 

검증

df_daily_facebook2 = df_daily[df_daily.app_name == "FACEBOOK_REVIEWS"]
df_daily_facebook2 = trend_calculator(df_daily_facebook2)

from pandas.testing import assert_frame_equal

column_to_check = ["review_timestamp_day", "review_mean", "trend"]
assert_frame_equal(df_daily_facebook[column_to_check], df_daily_facebook2[column_to_check])

 

데이터 정합성 검증을 하는 것은 중요한일임

 

Mulitprocessing

# for 구문
list_df_target_trend = []
for app_name, df_target in df_daily.groupby("app_name"):
    df_target_trend = trend_calculator(df_target)
    list_df_target_trend.append(df_target_trend)
    
# 리스트 컴프리헨션
%%time
df_agg = pd.concat([trend_calculator(df_target) for app_name, df_target in df_daily.groupby("app_name")])

# 카테고리 7개이니 나눠서 하면 빠르지 않을까?
%%time
from joblib import Parallel, delayed
df_agg = pd.concat(Parallel(n_jobs=-1)(delayed(trend_calculator)(df_target) for app_name, df_target in df_daily.groupby("app_name")))

 

  CPU times sys total Wall time
리스트
컴프리헨션
28.5 ms 2.04 ms 30.5 ms 10.1 ms
멀티 프로세싱 181 ms 123 ms 304 ms 1.73 ms

 

리스트 컴프리헨션이 더 빠름
멀티프로세싱은 데이터 크기가 크고 연산이 무거울 때 사용해야 효과적
벡터 연산을 활용하는 것이 가장 효율적인 최적화 방법

 

 

Entropy

정보학 이론에서 데이터가, 자료가 얼마나 퍼져 있는지를 나타내는 것 중에 하나고 카테고리에서도 적용이 가능하다.

# 엔트로피 계산
def get_entropy(group):
    entropy = sum([-rate * np.log(ratio) for rate, ratio in group.review_rating.value_counts(normalize=True).items()])
    return entropy

# 카테고리별로 엔트로피 계산
%%time
ser_entropy = pd.Series({app_name:get_entropy(group) for app_name, group in df.groupby("app_name")})
print(ser_entropy)

CPU times: user 1.48 s, sys: 1.77 s, total: 3.25 s
Wall time: 3.96 s

 

FACEBOOK_REVIEWS     31.455511
FLIPKART_REVIEWS     28.376202
INSTAGRAM_REVIEWS    29.376620
SPOTIFY_REVIEWS      28.390700
TIKTOK_REVIEWS       32.137737
TWITTER_REVIEWS      27.621901
WHATSAPP_REVIEWS     27.886915
dtype: float64

 

FACEBOOK_REVIEWS가 제일 많이 퍼져 있고, WHATSAPP_REVIEWS 제일 적게 퍼져 있다.

 

시각화

import seaborn as sns
# app_name 별 review_rating별 개수
df_review_rating_by_app = df.groupby(["app_name", "review_rating"]).size().dropna().reset_index(name="size")

# 왓츠앱, 페이스북만
df_review_rating_by_app_target = df_review_rating_by_app[
    df_review_rating_by_app.app_name.isin(["FACEBOOK_REVIEWS", "WHATSAPP_REVIEWS"])
]
df_review_rating_by_app_target

# 불필요한 카테고리 제거
df_review_rating_by_app_target.loc[:, "app_name"] = df_review_rating_by_app_target.app_name.cat.remove_unused_categories()

# 시각화
sns.barplot(x="review_rating", y="size", hue="app_name", data = df_review_rating_by_app_target)

 

불필요한 카테고리 제거를 하지 않으면

category 타입에서 다른 값이 남아 있음

 

실제로 FACEBOOK은 1, 5에 많이 되있고 그에 비해 WHATSAPP은 극단적으로 몰려있지는 않다.