Archived
1
0
Fork 0

Improve Scheduler performance, fix ByteBuf leak and cache TagsPacket encoding

This commit is contained in:
Moritz Ruth 2020-12-04 20:17:06 +01:00
parent 978daa37db
commit f915955caf
No known key found for this signature in database
GPG key ID: AFD57E23E753841B
8 changed files with 95 additions and 52 deletions

View file

@ -18,6 +18,7 @@ val coroutinesVersion = properties["version.kotlinx-coroutines"].toString()
val nettyVersion = properties["version.netty"].toString()
val junitVersion = properties["version.junit"].toString()
val striktVersion = properties["version.strikt"].toString()
val guavaVersion = properties["version.guava"].toString()
dependencies {
// Kotlin
@ -32,7 +33,7 @@ dependencies {
api("io.netty:netty-buffer:${nettyVersion}")
// Other
api("com.google.guava:guava:30.0-jre")
api("com.google.guava:guava:$guavaVersion")
// Testing
testImplementation("io.strikt:strikt-core:${striktVersion}")

View file

@ -13,6 +13,7 @@ repositories {
val nettyVersion = properties["version.netty"].toString()
val junitVersion = properties["version.junit"].toString()
val striktVersion = properties["version.strikt"].toString()
val guavaVersion = properties["version.guava"].toString()
dependencies {
api(project(":blokk-packets"))
@ -20,6 +21,9 @@ dependencies {
// Netty
api("io.netty:netty-buffer:${nettyVersion}")
// Other
api("com.google.guava:guava:$guavaVersion")
// Testing
testImplementation("io.strikt:strikt-core:${striktVersion}")
testImplementation("org.junit.jupiter:junit-jupiter-api:${junitVersion}")

View file

@ -1,26 +1,44 @@
package space.blokk.net.packet.play
import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.common.cache.LoadingCache
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import space.blokk.net.MinecraftProtocolDataTypes.writeString
import space.blokk.net.MinecraftProtocolDataTypes.writeVarInt
import space.blokk.net.packet.OutgoingPacketCodec
import space.blokk.tags.Tag
object TagsPacketCodec : OutgoingPacketCodec<TagsPacket>(0x5C, TagsPacket::class) {
override fun TagsPacket.encode(dst: ByteBuf) {
listOf(
tags.filter { it.type == Tag.Type.BLOCKS },
tags.filter { it.type == Tag.Type.ITEMS },
tags.filter { it.type == Tag.Type.FLUIDS },
tags.filter { it.type == Tag.Type.ENTITY_TYPES }
).forEach { tags ->
dst.writeVarInt(tags.size)
private val cache: LoadingCache<TagsPacket, ByteBuf> = CacheBuilder.newBuilder()
.maximumSize(3)
.weakKeys()
.removalListener<TagsPacket, ByteBuf> { it.value.release() }
.build(object : CacheLoader<TagsPacket, ByteBuf>() {
override fun load(packet: TagsPacket): ByteBuf {
val dst = Unpooled.buffer()
tags.forEach { tag ->
dst.writeString(tag.name)
dst.writeVarInt(tag.values.size)
tag.numericIDs.forEach { dst.writeVarInt(it) }
listOf(
packet.tags.filter { it.type == Tag.Type.BLOCKS },
packet.tags.filter { it.type == Tag.Type.ITEMS },
packet.tags.filter { it.type == Tag.Type.FLUIDS },
packet.tags.filter { it.type == Tag.Type.ENTITY_TYPES }
).forEach { tags ->
dst.writeVarInt(tags.size)
tags.forEach { tag ->
dst.writeString(tag.name)
dst.writeVarInt(tag.values.size)
tag.numericIDs.forEach { dst.writeVarInt(it) }
}
}
return dst
}
}
})
override fun TagsPacket.encode(dst: ByteBuf) {
dst.writeBytes(cache.get(this))
}
}

View file

@ -13,6 +13,7 @@ import space.blokk.net.packet.login.*
import space.blokk.net.packet.play.*
import space.blokk.player.BlokkPlayer
import space.blokk.player.GameMode
import space.blokk.tags.TagRegistry
import space.blokk.util.*
import space.blokk.world.Chunk
import java.security.MessageDigest
@ -22,6 +23,8 @@ import javax.crypto.spec.SecretKeySpec
import kotlin.random.Random
class LoginAndJoinProcedure(val session: BlokkSession) {
private val tagsPacket by lazy { TagsPacket(TagRegistry.tags.values) }
suspend fun start(packet: LoginStartPacket) {
session.state.getOrFail<Session.State.WaitingForLoginStart>()
@ -173,7 +176,7 @@ class LoginAndJoinProcedure(val session: BlokkSession) {
session.send(SetSelectedHotbarSlotPacket(state.selectedHotbarSlot))
session.send(DeclareRecipesPacket(session.server.recipes))
// TODO: Send Tags packet
session.send(tagsPacket)
// TODO: Send Entity Status packet with OP permission level
session.send(PlayerPositionAndLookPacket(state.initialWorldAndLocation.location))

View file

@ -27,12 +27,11 @@ class PacketMessageCodec(private val session: BlokkSession) : MessageToMessageCo
override fun decode(ctx: ChannelHandlerContext, msg: ByteBuf, out: MutableList<Any>) {
val packetID = msg.readVarInt()
val data = msg.readBytes(msg.readableBytes())
val codec = session.currentProtocol!!.incomingPacketCodecsByID[packetID]
?: throw IOException("Client sent an unknown packet (ID: $packetID)")
out.add(IncomingPacketMessage(session, codec.decode(data)))
out.add(IncomingPacketMessage(session, codec.decode(msg)))
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {

View file

@ -35,7 +35,7 @@ class BlokkPluginManager(private val server: BlokkServer) : PluginManager {
@Suppress("UNCHECKED_CAST")
val pluginClass = loader.loadClass(pluginClassName) as Class<Plugin>
return@mapNotNull pluginClass.newInstance()
return@mapNotNull pluginClass.getDeclaredConstructor().newInstance()
}
} catch (e: ClassNotFoundException) {
error = LoadError.PluginClassNotFound(e.message!!)

View file

@ -1,12 +1,13 @@
package space.blokk.util
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.suspendCancellableCoroutine
import space.blokk.Scheduler
import space.blokk.logging.Logger
import space.blokk.server.Server
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import kotlin.collections.LinkedHashSet
import kotlin.coroutines.resume
@ -14,9 +15,7 @@ import kotlin.coroutines.resume
* Basically ExecutorService but for coroutines and with ticks.
*/
class BlokkScheduler : Scheduler {
private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val scope = CoroutineScope(dispatcher)
private var job: Job? = null
private var thread: Thread? = null
private val tasks = ConcurrentHashMap.newKeySet<Task<out Any>>()
private val shutdownTasks = Collections.synchronizedSet(LinkedHashSet<suspend () -> Unit>())
@ -38,47 +37,64 @@ class BlokkScheduler : Scheduler {
fun startTicking() {
val interval = 1000L / Server.TICKS_PER_SECOND
job?.cancel()
job = scope.launch {
var lastTickEndTime = System.currentTimeMillis()
thread = Thread {
runBlocking {
var lastTickEndTime = System.currentTimeMillis()
val t = Thread.currentThread()
while (isActive) {
for (task in tasks) {
if (!task.cancelled) {
if (task.ticksUntilExecution == 0) {
try {
task.fn()
while (!t.isInterrupted) {
for (task in tasks) {
if (!task.cancelled) {
if (task.ticksUntilExecution == 0) {
try {
task.fn()
if (task.interval != null) {
task.ticksUntilExecution = task.interval
} else tasks.remove(task)
} catch (e: Exception) {
tasks.remove(task)
e.printStackTrace()
if (task.interval != null) {
task.ticksUntilExecution = task.interval
} else tasks.remove(task)
} catch (e: Exception) {
tasks.remove(task)
e.printStackTrace()
}
} else {
task.ticksUntilExecution -= 1
}
} else {
task.ticksUntilExecution -= 1
}
}
var first = true
var time: Long
do {
time = System.currentTimeMillis()
val elapsedSinceLastTick = time - lastTickEndTime
val timeUntilNextTick = interval - elapsedSinceLastTick
if (first && timeUntilNextTick < 0) {
logger.warn("Last tick took ${elapsedSinceLastTick}ms, but should only take ${interval}ms")
// TODO: Do something when this happens too often or suppress the warning for subsequent occurrences
}
first = false
} while (timeUntilNextTick > 0)
lastTickEndTime = time
}
val time = System.currentTimeMillis()
val elapsedSinceLastTick = time - lastTickEndTime
val timeUntilNextTick = interval - elapsedSinceLastTick
lastTickEndTime = time
if (timeUntilNextTick < 0) {
logger.warn("Last tick took ${elapsedSinceLastTick}ms, but should only take ${interval}ms")
// TODO: Do something when this happens too often or suppress the warning for subsequent occurrences
} else delay(timeUntilNextTick)
}
}.apply {
name = "Scheduler"
start()
}
}
suspend fun shutdown() {
job?.cancelAndJoin()
dispatcher.close()
thread?.let {
it.interrupt()
@Suppress("BlockingMethodInNonBlockingContext")
it.join()
}
shutdownTasks.forEach { it.invoke() }
}

View file

@ -1,3 +1,4 @@
# suppress inspection "UnusedProperty" for whole file
kotlin.code.style=official
version.kotlin=1.4.20
version.netty=4.1.54.Final
@ -6,3 +7,4 @@ version.kotlinx-coroutines=1.4.2
version.slf4j=1.7.30
version.junit=5.7.0
version.strikt=0.28.0
version.guava=30.0-jre