๋ฐ์ดํฐ ์์ง๋์ด๋ก์ ๋ณต์กํ ๋ถ์ฐ ์์คํ ์ ๊ตฌ์ถํ๋ฉด์ ๋ง์ง๋ง ์ฅ์ ๋ฌผ์ ๋์์ ๋ ๋๋ผ๋ ์ฑ์ทจ๊ฐ์ ์ ๋ง ํน๋ณํฉ๋๋ค.
์ค๋์ Apache Spark Structured Streaming + Kafka๋ฅผ ์ฌ์ฉํ ๋ ๊ต์ฅํ ๋ต๋ตํ ๋ฌธ์ ๋ฅผ ๊ณต์ ํ๋ ค๊ณ ํฉ๋๋ค:
๐ ๋ฐ๋ก ์
๋ช
๋์ Failed to find data source: kafka
์๋ฌ์
๋๋ค.
๐งจ ๋ฌธ์ : ๋ชจ๋ ๊ฒ ์ ์์ธ๋ฐ Kafka๋ง ์ ๋๋ค
์ํฉ์ ๋ ์ฌ๋ ค๋ด ์๋ค:
- Spark ํด๋ฌ์คํฐ ์ ์ ๊ตฌ๋
- Postgres ์ฐ๊ฒฐ๋ ๋ฌธ์ ์์
- Kafka์์ ์ด๋ฒคํธ๋ ์ ๋ฐํ๋จ
- ์ฝ๋์์
.readStream.format("kafka")
ํธ์ถ
๊ทธ๋ฐ๋ฐ ๊ฐ์๊ธฐ ๋ค์๊ณผ ๊ฐ์ ์๋ฌ๊ฐ ๋ฐ์:
pyspark.errors.exceptions.captured.AnalysisException: Failed to find data source: kafka.
๊ทธ๋์ ๋ชจ๋ ๊ฑธ ๋ค์ ํ์ธํด ๋ด ๋๋ค:
- โ JAR ํ์ผ ์ ๋ง์ดํธ๋จ
- โ classpath๋ ์ ์
- โ Kafka์ Spark ๋ฒ์ ํธํ
๊ทธ๋ฐ๋ฐ๋ Spark๋ ๊ณ์ ๋ถํํฉ๋๋ค. ์ด๊ฒ ๋ฐ๋ก Spark์์ ์์ฃผ ๊ฒช๋ ์จ๊ฒจ์ง ํจ์ ์ ๋๋ค.
๐งช ๊ทผ๋ณธ ์์ธ: ๋จ์ JAR ๋ก๋ฉ vs. ํ๋ฌ๊ทธ์ธ ๋ฑ๋ก
๋ฌธ์ ๋ JAR ํ์ผ์ด ์๋ ๊ฒ ์๋๋๋ค. Spark๊ฐ ํ๋ฌ๊ทธ์ธ์ ๋ด๋ถ์ ์ผ๋ก ์ด๋ป๊ฒ ๋ฑ๋กํ๋๊ฐ์ ์ฐจ์ด์ ๋๋ค.
์กฐ๊ธ ๋ ์์ธํ ์ดํด๋ณผ๊ฒ์:
JDBC: ์ฌ์ด ์ผ์ด์ค
JDBC๋ฅผ ์ฌ์ฉํ ๋:
1๏ธโฃ df.write.jdbc()
ํธ์ถ
2๏ธโฃ Spark๊ฐ classpath์์ JDBC ๋๋ผ์ด๋ฒ ๊ฒ์
3๏ธโฃ ๋ โ --jars
๋ง์ผ๋ก ์ถฉ๋ถ
Kafka: ํน๋ณํ ์ผ์ด์ค
Kafka (Structured Streaming)๋ ๋ค๋ฆ ๋๋ค:
1๏ธโฃ df.readStream.format("kafka")
ํธ์ถ
2๏ธโฃ Spark๊ฐ Kafka๋ฅผ DataSourceV2 ํ๋ฌ๊ทธ์ธ์ผ๋ก ๋ฑ๋ก ์๋
3๏ธโฃ Spark ์ด๊ธฐํ ๊ณผ์ ์์ ํ๋ฌ๊ทธ์ธ ๋ฑ๋ก ์งํ
4๏ธโฃ ๋จ์ํ classpath์ JAR ์ถ๊ฐํ๋ ๊ฒ๋ง์ผ๋ก๋ ๋ฑ๋ก์ด ์๋ฃ๋์ง ์์
๊ทธ๋์ Kafka์์๋ --jars
๋ง์ผ๋ก ์ ๋๋ ๊ฒฝ์ฐ๊ฐ ๋ง์ต๋๋ค.
๐ง ํด๊ฒฐ์ฑ : ์ค์ ๋ก ๋์ํ๋ ๋ ๊ฐ์ง ๋ฐฉ๋ฒ
โ ๋ฐฉ๋ฒ 1: ํ์ด๋ธ๋ฆฌ๋ ๋ฐฉ์ (ํ๋ก๋์ ์ถ์ฒ)
Kafka ์ปค๋ฅํฐ๋ --packages
๋ก, ๋๋จธ์ง ๋๋ผ์ด๋ฒ๋ ๋ก์ปฌ JAR๋ก ๊ด๋ฆฌ:
/opt/spark/bin/spark-submit \
--master spark://spark-master:7077 \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
--jars /opt/spark/extra-jars/postgresql-42.6.0.jar,/opt/spark/extra-jars/kafka-clients-3.5.0.jar \
--conf spark.driver.extraClassPath="/opt/spark/extra-jars/*" \
--conf spark.executor.extraClassPath="/opt/spark/extra-jars/*" \
transaction_streaming.py
์ ์ด๊ฒ ์ ๋์ํ๋?
--packages
๋ ๋จ์ํ JAR๋ง ์ถ๊ฐํ๋ ๊ฒ ์๋๋ผ, ํ๋ฌ๊ทธ์ธ ๋ฑ๋ก๊น์ง ๋ด๋ถ์ ์ผ๋ก ์ฒ๋ฆฌํด ์ค๋๋ค.
โ ๋ฐฉ๋ฒ 2: ์์ ๋ก์ปฌ JAR ๋ฐฉ์ (ํ ๋์ปค ํ๊ฒฝ์์ ์ ํธ)
์ ์ฒด ์ ์ด๊ถ์ ์ ์งํ๊ณ ์ถ๋ค๋ฉด:
/opt/spark/bin/spark-submit \
--master spark://spark-master:7077 \
--jars /opt/spark/extra-jars/postgresql-42.6.0.jar,/opt/spark/extra-jars/spark-sql-kafka-0-10_2.12-3.5.0.jar \
--conf spark.sql.streaming.kafka.allowAutoTopicCreation=true \
--conf spark.driver.extraClassPath="/opt/spark/extra-jars/*" \
--conf spark.executor.extraClassPath="/opt/spark/extra-jars/*" \
transaction_streaming.py
โ ๏ธ ์ฃผ์: Kafka์ ๋ชจ๋ ์ข ์ JAR (transitive dependencies) ๋ฅผ ๋ก์ปฌ์ ์ง์ ์ค๋นํด์ผ ํฉ๋๋ค.
๐งญ ์ด ์ฐจ์ด๊ฐ ์ค์ํ ์ด์
- ๐ ๋๋ฒ๊น โ ํ๋ฌ๊ทธ์ธ ๋ฑ๋ก ๋ฐฉ์์ ์๋ฉด ๋์๋ ์ฝ์ง์ ์ค์ผ ์ ์์
- ๐ข ๋ฐฐํฌ ์ ํ์ง โ Docker ํ๊ฒฝ์ ๋ก์ปฌ JAR ์ ํธ, ํด๋ผ์ฐ๋ ๋ค์ดํฐ๋ธ๋
--packages
๊ฐ ๋ ๊ฐํธ - โฑ ์ฑ๋ฅ โ
--packages
๋ ์คํ ์ ๋ค์ด๋ก๋ โ ์ด๊ธฐ ๋ถํ ์ฝ๊ฐ ๋๋ฆฌ์ง๋ง ๋ฒ์ ํธํ ๋ณด์ฅ - ๐ฅ ํ ํ์ โ ๋ช ํํ ๋ฌธ์ํ๊ฐ “๋ด ์ปดํจํฐ์์ ๋๋๋ฐ?” ์ํฉ ๋ฐฉ์ง
๐ ํ๋ก๋์ ์์ ๋ด๊ฐ ๋ฐ๋ฅด๋ ๋ฃฐ
์ปดํฌ๋ํธ | ์ถ์ฒ ๋ฐฉ์ |
---|---|
Kafka ์ปค๋ฅํฐ | --packages ์ฌ์ฉ |
DB ๋๋ผ์ด๋ฒ | ๋ก์ปฌ --jars ์ฌ์ฉ |
๋ด๋ถ ์ปค์คํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ | ๋ก์ปฌ --jars ์ฌ์ฉ |
ํ ํ๊ฒฝ | ์ฌ์ฉ ๋ฐฉ์์ ๋ฐ๋์ ๋ฌธ์ํ |
๐งต ํ ๋จ๊ณ ๋ ๊น์ด
์ด ๋ฌธ์ ๋ Spark ๋ด๋ถ ๋์์ ์ดํดํ๋ ๊ฒ์ด ์ ์ค์ํ์ง๋ฅผ ์ ๋ณด์ฌ์ค๋๋ค.
classpath์ JAR์ด ์๋๋ vs. data source๊ฐ ํ๋ฌ๊ทธ์ธ์ผ๋ก ๋ฑ๋ก๋๋๋ โ ์ด ์ฐจ์ด๊ฐ ์คํธ๋ฆฌ๋ฐ ํ์ดํ๋ผ์ธ์ด ์ ๋๋์ง, ๋๋ฒ๊น ์ง์ฅ์ ๋น ์ง๋์ง๋ฅผ ๊ฒฐ์ ํฉ๋๋ค.
๋ค์๋ฒ์ ์๋ ์๋ฌ๋ฅผ ๋ง๋๋ฉด:
Failed to find data source: kafka
โ JAR์ด ์๋์ง๋ง ๋ณด์ง ๋ง๊ณ , โ Spark๊ฐ ์ด๋ป๊ฒ ํ๋ฌ๊ทธ์ธ์ ๋ฑ๋กํ๋์ง ํ์ธํ์ธ์.
๋ง์ง๋ง ์ฅ์ ๋ฌผ์ ์ฌ์ค ์ด๋ ค์ด ๊ฒ ์๋๋ผ ๊ทธ๋ฅ ์์ ๋ฐ์ผ ๋ฟ์ ๋๋ค.