This commit is contained in:
Lsong 2024-09-06 01:22:37 +08:00
parent a48acc6862
commit ca45426639
2 changed files with 153 additions and 213 deletions

View File

@ -1,109 +0,0 @@
package me.lsong.mytv.epg
import android.util.Log
import android.util.Xml
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import okhttp3.OkHttpClient
import okhttp3.Request
import org.xmlpull.v1.XmlPullParser
import me.lsong.mytv.epg.fetcher.EpgFetcher
import java.io.BufferedReader
import java.io.ByteArrayInputStream
import java.io.InputStreamReader
import java.io.StringReader
import java.text.SimpleDateFormat
import java.util.Locale
import java.util.zip.GZIPInputStream
/**
* 节目单获取
*/
class EpgRepository {
suspend fun getEpgList(url: String): EpgList = withContext(Dispatchers.Default) {
try {
return@withContext parseEpgXml(request(url))
} catch (ex: Exception) {
Log.e("epg", "获取节目单失败", ex)
throw Exception(ex)
}
}
}
fun request(url: String): String{
val client = OkHttpClient()
val request = Request.Builder().url(url).build()
client.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw Exception("request failed: $response.code")
}
val contentType = response.header("content-type")
if (contentType?.startsWith("text/")!!) {
return response.body!!.string()
}
val gzData = response.body!!.bytes()
val stringBuilder = StringBuilder()
val gzipInputStream = GZIPInputStream(ByteArrayInputStream(gzData));
val reader = BufferedReader(InputStreamReader(gzipInputStream));
var line: String?
while (reader.readLine().also { line = it } != null) {
stringBuilder.append(line).append("\n")
}
response.close()
return stringBuilder.toString()
}
}
/**
* 解析节目单xml
*/
private fun parseEpgXml(xmlString: String): EpgList {
val parser: XmlPullParser = Xml.newPullParser()
parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, false)
parser.setInput(StringReader(xmlString))
val epgMap = mutableMapOf<String, EpgChannel>()
var eventType = parser.eventType
while (eventType != XmlPullParser.END_DOCUMENT) {
when (eventType) {
XmlPullParser.START_TAG -> {
if (parser.name == "channel") {
val channelId = parser.getAttributeValue(null, "id")
parser.nextTag()
val channelDisplayName = parser.nextText()
val channel = EpgChannel(
id = channelId,
title = channelDisplayName,
)
// Log.d("epg", "${channel.id}, ${channel.title}")
epgMap[channelId] = channel
} else if (parser.name == "programme") {
val channelId = parser.getAttributeValue(null, "channel")
val startTime = parser.getAttributeValue(null, "start")
val stopTime = parser.getAttributeValue(null, "stop")
parser.nextTag()
val title = parser.nextText()
fun parseTime(time: String): Long {
if (time.length < 14) return 0
return SimpleDateFormat("yyyyMMddHHmmss Z", Locale.getDefault()).parse(time)?.time ?: 0
}
val programme = EpgProgramme(
channelId = channelId,
startAt = parseTime(startTime),
endAt = parseTime(stopTime),
title = title,
)
if (epgMap.containsKey(channelId)) {
// Log.d("epg", "${programme.channelId}, ${programme.title}")
epgMap[channelId] = epgMap[channelId]!!.copy(
programmes = epgMap[channelId]!!.programmes + listOf(programme)
)
}
}
}
}
eventType = parser.next()
}
Log.i("epg","解析节目单完成,共${epgMap.size}个频道")
return EpgList(epgMap.values.toList())
}

View File

@ -4,20 +4,26 @@ import android.util.Log
import android.util.Xml
import androidx.compose.runtime.Immutable
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
import me.lsong.mytv.epg.EpgChannel
import me.lsong.mytv.epg.EpgList
import me.lsong.mytv.epg.EpgProgramme
import me.lsong.mytv.epg.EpgRepository
import me.lsong.mytv.utils.Constants
import me.lsong.mytv.utils.Settings
import okhttp3.OkHttpClient
import okhttp3.Request
import org.xmlpull.v1.XmlPullParser
import java.io.BufferedReader
import java.io.ByteArrayInputStream
import java.io.InputStreamReader
import java.io.StringReader
import java.text.SimpleDateFormat
import java.util.Locale
import java.util.zip.GZIPInputStream
// 数据类定义
@ -98,104 +104,6 @@ data class TVChannelList(val value: List<TVChannel> = emptyList()) : List<TVChan
}
}
class IPTVProvider(private val epgRepository: EpgRepository) : TVProvider {
private var groupList: TVGroupList = TVGroupList()
private var epgList: EpgList = EpgList()
override suspend fun load() {
val (sources, epgUrls) = fetchIPTVSources()
groupList = process(sources)
epgList = fetchEPGData(epgUrls)
}
override fun groups(): TVGroupList {
return groupList
}
override fun channels(groupTitle: String): TVChannelList {
return groupList.find { it.title == groupTitle }?.channels ?: TVChannelList()
}
private suspend fun fetchIPTVSources(): Pair<List<TVSource>, List<String>> {
val allSources = mutableListOf<TVSource>()
val epgUrls = mutableListOf<String>()
val iptvUrls = Settings.iptvSourceUrls.ifEmpty { listOf(Constants.IPTV_SOURCE_URL) }
iptvUrls.forEach { url ->
val m3u = retry { getM3uChannels(sourceUrl = url) }
allSources.addAll(m3u.channels.map {
TVSource(
tvgId = it.attributes["tvg-id"],
tvgLogo = it.attributes["tvg-logo"],
tvgName = it.attributes["tvg-name"],
groupTitle = it.attributes["group-title"],
title = it.title,
url = it.url,
)
})
epgUrls += m3u.headers["x-tvg-url"]?.split(",").orEmpty()
}
if (epgUrls.isEmpty()) epgUrls.add(Constants.EPG_XML_URL)
return Pair(allSources, epgUrls.distinct())
}
private suspend fun fetchEPGData(epgUrls: List<String>): EpgList {
val epgChannels = mutableListOf<EpgChannel>()
epgUrls.forEach { url ->
val epg = retry { epgRepository.getEpgList(url) }
epgChannels.addAll(epg.value)
}
return EpgList(epgChannels.distinctBy { it.id })
}
private fun process(sources: List<TVSource>): TVGroupList {
val channels = sources.groupBy { it.name }
.map { (name, sources) ->
TVChannel(
name = name,
title = sources.first().title,
sources = sources,
)
}
return TVGroupList(
channels.groupBy { it.groupTitle ?: "其他" }
.map { (title, channels) -> TVGroup(title = title, channels = TVChannelList(channels)) }
)
}
private suspend fun <T> retry(fn: suspend () -> T): T {
repeat(Constants.HTTP_RETRY_COUNT) {
try {
return fn()
} catch (e: Exception) {
if (it == Constants.HTTP_RETRY_COUNT) throw e
delay(Constants.HTTP_RETRY_INTERVAL)
}
}
throw IllegalStateException("Failed to fetch data after ${Constants.HTTP_RETRY_COUNT} attempts")
}
private suspend fun request(url: String) = withContext(Dispatchers.IO) {
Log.d("request", "request start: $url")
val client = OkHttpClient()
val request = Request.Builder().url(url).build()
try {
client.newCall(request).execute().use { response ->
if (!response.isSuccessful) throw Exception("failed: ${response.code}")
response.body?.string()?.trim() ?: throw Exception("Empty response body")
}
} catch (ex: Exception) {
val e = Exception("request failed $url", ex)
Log.d("request", "${e.message}")
throw e;
}
}
private suspend fun getM3uChannels(sourceUrl: String): M3uData {
return parseM3u(request(sourceUrl))
}
}
// Interface definition
interface TVProvider {
suspend fun load()
@ -205,7 +113,7 @@ interface TVProvider {
class MyTvProviderManager : TVProvider {
private val providers: List<TVProvider> = listOf(
IPTVProvider(EpgRepository())
IPTVProvider()
)
override suspend fun load() {
providers.forEach { it.load() }
@ -283,11 +191,14 @@ fun parseAttributes(input: String): Map<String, String> {
}
fun parseEpgXML(xmlString: String): List<EpgChannel> {
val epgMap = mutableMapOf<String, EpgChannel>()
/**
* 解析节目单xml
*/
private fun parseEpgXml(xmlString: String): EpgList {
val parser: XmlPullParser = Xml.newPullParser()
parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, false)
parser.setInput(StringReader(xmlString))
val epgMap = mutableMapOf<String, EpgChannel>()
var eventType = parser.eventType
while (eventType != XmlPullParser.END_DOCUMENT) {
when (eventType) {
@ -300,7 +211,7 @@ fun parseEpgXML(xmlString: String): List<EpgChannel> {
id = channelId,
title = channelDisplayName,
)
Log.d("epg", "${channel.id}, ${channel.title}")
// Log.d("epg", "${channel.id}, ${channel.title}")
epgMap[channelId] = channel
} else if (parser.name == "programme") {
val channelId = parser.getAttributeValue(null, "channel")
@ -329,6 +240,144 @@ fun parseEpgXML(xmlString: String): List<EpgChannel> {
}
eventType = parser.next()
}
Log.i("epg","解析节目单完成,共${epgMap.size}个频道")
return epgMap.values.toList()
return EpgList(epgMap.values.toList())
}
private suspend fun request(url: String) = withContext(Dispatchers.IO) {
Log.d("request", "request start: $url")
val client = OkHttpClient()
val request = Request.Builder().url(url).build()
try {
client.newCall(request).execute().use { response ->
if (!response.isSuccessful) throw Exception("failed: ${response.code}")
val contentType = response.header("content-type")
if (contentType?.startsWith("text/") == true || url.endsWith(".m3u")) {
response.body?.string() ?: throw Exception("Empty response body")
} else {
val gzData = response.body?.bytes() ?: throw Exception("Empty response body")
BufferedReader(InputStreamReader(GZIPInputStream(ByteArrayInputStream(gzData)))).use { reader ->
reader.lineSequence().joinToString("\n")
}
}
}
} catch (ex: Exception) {
val e = Exception("request failed $url", ex)
Log.d("request", "${e.message}")
throw e;
}
}
class IPTVProvider : TVProvider {
private var groupList: TVGroupList = TVGroupList()
private var epgList: EpgList = EpgList()
override suspend fun load() {
val (sources, epgUrls) = fetchIPTVSources()
groupList = processSources(sources)
epgList = fetchEPGData(epgUrls)
}
override fun groups(): TVGroupList = groupList
override fun channels(groupTitle: String): TVChannelList =
groupList.find { it.title == groupTitle }?.channels ?: TVChannelList()
private suspend fun fetchIPTVSources(): Pair<List<TVSource>, List<String>> = coroutineScope {
val iptvUrls = Settings.iptvSourceUrls.ifEmpty { listOf(Constants.IPTV_SOURCE_URL) }
val deferredResults = iptvUrls.map { url ->
async { fetchM3uData(url) }
}
val results = deferredResults.awaitAll()
val allSources = results.flatMap { it.first }
val allEpgUrls = results.flatMap { it.second }.distinct()
Pair(allSources, if (allEpgUrls.isEmpty()) listOf(Constants.EPG_XML_URL) else allEpgUrls)
}
private suspend fun fetchM3uData(url: String): Pair<List<TVSource>, List<String>> {
val m3u = retry { parseM3u(request(url)) }
val sources = m3u.channels.map { it.toTVSource() }
val epgUrls = m3u.headers["x-tvg-url"]?.split(",").orEmpty()
return Pair(sources, epgUrls)
}
private suspend fun fetchEPGData(epgUrls: List<String>): EpgList = coroutineScope {
val deferredEpgChannels = epgUrls.map { url ->
async { retry { getEpgList(url) } }
}
val epgChannels = deferredEpgChannels.awaitAll().flatten()
EpgList(epgChannels.distinctBy { it.id })
}
private suspend fun getEpgList(url: String): List<EpgChannel> = withContext(Dispatchers.Default) {
try {
parseEpgXml(request(url)).value
} catch (ex: Exception) {
Log.e("epg", "Failed to fetch EPG data", ex)
emptyList()
}
}
private fun processSources(sources: List<TVSource>): TVGroupList {
val channels = sources.groupBy { it.name }
.map { (name, sources) -> TVChannel(name, sources.first().title, sources) }
return TVGroupList(
channels.groupBy { it.groupTitle ?: "Others" }
.map { (title, channels) -> TVGroup(title, TVChannelList(channels)) }
)
}
private suspend fun request(url: String): String = withContext(Dispatchers.IO) {
Log.d("request", "Request start: $url")
val client = OkHttpClient()
val request = Request.Builder().url(url).build()
try {
client.newCall(request).execute().use { response ->
if (!response.isSuccessful) throw Exception("Request failed: ${response.code}")
val contentType = response.header("content-type")
val body = response.body ?: throw Exception("Empty response body")
when {
contentType?.startsWith("text/") == true || url.endsWith(".m3u") -> body.string()
else -> decodeGzipContent(body.bytes())
}
}
} catch (ex: Exception) {
Log.d("request", "Request failed: $url", ex)
throw Exception("Request failed: $url", ex)
}
}
private fun decodeGzipContent(gzData: ByteArray): String {
return BufferedReader(InputStreamReader(GZIPInputStream(ByteArrayInputStream(gzData)))).use { reader ->
reader.lineSequence().joinToString("\n")
}
}
private suspend fun <T> retry(attempts: Int = Constants.HTTP_RETRY_COUNT, block: suspend () -> T): T {
repeat(attempts - 1) { attempt ->
try {
return block()
} catch (e: Exception) {
Log.w("retry", "Attempt ${attempt + 1} failed", e)
delay(Constants.HTTP_RETRY_INTERVAL)
}
}
return block() // Last attempt
}
}
// Extension function to convert M3uChannel to TVSource
private fun M3uSource.toTVSource() = TVSource(
tvgId = attributes["tvg-id"],
tvgLogo = attributes["tvg-logo"],
tvgName = attributes["tvg-name"],
groupTitle = attributes["group-title"],
title = title,
url = url
)