636 lines
21 KiB
Python
636 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
SCI Resource Text Extractor
|
|
|
|
Extracts text strings from Sierra's Creative Interpreter (SCI) resource files.
|
|
Supports SCI0 format (King's Quest IV) with LZW decompression.
|
|
"""
|
|
|
|
import os
|
|
import struct
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import List, Dict, Tuple, Optional
|
|
|
|
|
|
class TextEntry:
|
|
"""Represents a single text entry from a message resource."""
|
|
def __init__(self):
|
|
self.noun: int = 0
|
|
self.verb: int = 0
|
|
self.condition: int = 0
|
|
self.sequence: int = 0
|
|
self.talker: int = 0
|
|
self.style: int = 0
|
|
self.text: str = ""
|
|
|
|
def __repr__(self):
|
|
return f"TextEntry(noun={self.noun}, verb={self.verb}, cond={self.condition}, seq={self.sequence}, talker={self.talker}, text={self.text[:50]!r}...)"
|
|
|
|
|
|
def decompress_lzw(src: bytes, length: int) -> bytes:
|
|
"""
|
|
Decompress LZW compressed data (SCI0 method 1).
|
|
Uses 9-12 bit LZW encoding.
|
|
"""
|
|
dest = bytearray(length)
|
|
|
|
bitlen = 9 # no. of bits to read (max. 12)
|
|
bitmask = 0x01ff
|
|
bitctr = 0 # current bit position
|
|
bytectr = 0 # current byte position
|
|
|
|
tokenlist = [0] * 4096 # pointers to dest[]
|
|
tokenlengthlist = [0] * 4096 # char length of each token
|
|
tokenctr = 0x102 # no. of registered tokens (starts here)
|
|
maxtoken = 0x200 # The biggest token
|
|
|
|
tokenlastlength = 0
|
|
destctr = 0
|
|
complength = len(src)
|
|
|
|
while bytectr < complength:
|
|
# Read next token from bit stream
|
|
if bytectr >= complength:
|
|
break
|
|
|
|
tokenmaker = src[bytectr] >> bitctr
|
|
if bytectr + 1 < complength:
|
|
tokenmaker |= (src[bytectr + 1] << (8 - bitctr))
|
|
if bytectr + 2 < complength:
|
|
tokenmaker |= (src[bytectr + 2] << (16 - bitctr))
|
|
|
|
token = int(tokenmaker & bitmask)
|
|
|
|
bitctr += bitlen - 8
|
|
|
|
while bitctr >= 8:
|
|
bitctr -= 8
|
|
bytectr += 1
|
|
|
|
bytectr += 1
|
|
|
|
if token == 0x101: # terminator
|
|
break
|
|
|
|
if token == 0x100: # reset command
|
|
maxtoken = 0x200
|
|
bitlen = 9
|
|
bitmask = 0x01ff
|
|
tokenctr = 0x0102
|
|
else:
|
|
if token > 0xff:
|
|
if token < tokenctr:
|
|
tokenlastlength = tokenlengthlist[token] + 1
|
|
if destctr + tokenlastlength > length:
|
|
# Overflow protection
|
|
i = 0
|
|
while destctr < length and i < tokenlastlength:
|
|
dest[destctr] = dest[tokenlist[token] + i]
|
|
destctr += 1
|
|
i += 1
|
|
else:
|
|
for i in range(tokenlastlength):
|
|
dest[destctr] = dest[tokenlist[token] + i]
|
|
destctr += 1
|
|
else:
|
|
tokenlastlength = 1
|
|
if destctr < length:
|
|
dest[destctr] = token
|
|
destctr += 1
|
|
|
|
# Register new token
|
|
if tokenctr == maxtoken:
|
|
if bitlen < 12:
|
|
bitlen += 1
|
|
bitmask = (bitmask << 1) | 1
|
|
maxtoken <<= 1
|
|
|
|
if tokenctr < 4096:
|
|
tokenlist[tokenctr] = destctr - tokenlastlength
|
|
tokenlengthlist[tokenctr] = tokenlastlength
|
|
tokenctr += 1
|
|
|
|
return bytes(dest)
|
|
|
|
|
|
def decompress_huffman(src: bytes, length: int) -> bytes:
|
|
"""
|
|
Decompress Huffman compressed data (SCI0 method 2).
|
|
"""
|
|
if len(src) < 2:
|
|
return src
|
|
|
|
dest = bytearray(length)
|
|
destctr = 0
|
|
|
|
numnodes = src[0]
|
|
terminator = src[1]
|
|
bytectr = 2 + (numnodes << 1)
|
|
bitctr = 0
|
|
|
|
nodes = src[2:2 + (numnodes << 1)]
|
|
|
|
while bytectr < len(src) and destctr < length:
|
|
node_idx = 0
|
|
|
|
while nodes[node_idx * 2 + 1] != 0:
|
|
if bytectr >= len(src):
|
|
break
|
|
|
|
value = (src[bytectr] << bitctr) & 0xFF
|
|
bitctr += 1
|
|
|
|
if bitctr == 8:
|
|
bitctr = 0
|
|
bytectr += 1
|
|
|
|
if value & 0x80:
|
|
next_node = nodes[node_idx * 2 + 1] & 0x0f
|
|
if next_node == 0:
|
|
if bytectr >= len(src):
|
|
break
|
|
result = (src[bytectr] << bitctr) & 0xFF
|
|
bytectr += 1
|
|
if bytectr < len(src):
|
|
result |= src[bytectr] >> (8 - bitctr)
|
|
result &= 0x0ff
|
|
|
|
if result == terminator:
|
|
break
|
|
|
|
if destctr < length:
|
|
dest[destctr] = result
|
|
destctr += 1
|
|
break
|
|
else:
|
|
next_node = nodes[node_idx * 2 + 1] >> 4
|
|
|
|
node_idx += next_node
|
|
|
|
if nodes[node_idx * 2 + 1] == 0:
|
|
value = nodes[node_idx * 2] | (nodes[node_idx * 2 + 1] << 8)
|
|
if value == (0x100 | terminator):
|
|
break
|
|
if destctr < length:
|
|
dest[destctr] = value & 0xFF
|
|
destctr += 1
|
|
|
|
return bytes(dest)
|
|
|
|
|
|
class SCIResourceExtractor:
|
|
"""Extracts text resources from SCI game files."""
|
|
|
|
# Resource types
|
|
RESOURCE_TYPES = {
|
|
0: "View",
|
|
1: "Pic",
|
|
2: "Script",
|
|
3: "Text",
|
|
4: "Sound",
|
|
5: "Memory",
|
|
6: "Vocab",
|
|
7: "Font",
|
|
8: "Cursor",
|
|
9: "Patch"
|
|
}
|
|
|
|
# Compression methods
|
|
COMPRESSION_METHODS = {
|
|
0: "None",
|
|
1: "LZW",
|
|
2: "Huffman",
|
|
3: "LZW+Huffman"
|
|
}
|
|
|
|
def __init__(self, game_dir: str):
|
|
self.game_dir = Path(game_dir)
|
|
self.map_file = self.game_dir / "RESOURCE.MAP"
|
|
self.resource_files = sorted(self.game_dir.glob("RESOURCE.0*"))
|
|
self.resources = []
|
|
|
|
def read_resource_map(self) -> List[Dict]:
|
|
"""Read the RESOURCE.MAP file and return list of resource entries."""
|
|
resources = []
|
|
|
|
with open(self.map_file, 'rb') as f:
|
|
data = f.read()
|
|
|
|
# SCI0 format: 6 bytes per entry
|
|
# - 2 bytes: resource number (11 bits) + type (5 bits)
|
|
# - 4 bytes: offset (26 bits) + package number (6 bits)
|
|
entry_size = 6
|
|
|
|
for i in range(0, len(data), entry_size):
|
|
if i + entry_size > len(data):
|
|
break
|
|
|
|
entry = data[i:i+entry_size]
|
|
|
|
# Unpack SCI0 entry
|
|
word = struct.unpack('<H', entry[0:2])[0]
|
|
res_number = word & 0x7FF # 11 bits
|
|
res_type = (word >> 11) & 0x1F # 5 bits
|
|
|
|
# Check for terminator (all 1s)
|
|
if word == 0xFFFF:
|
|
dword = struct.unpack('<I', entry[2:6])[0]
|
|
if dword == 0xFFFFFFFF:
|
|
break
|
|
|
|
dword = struct.unpack('<I', entry[2:6])[0]
|
|
offset = dword & 0x3FFFFFF # 26 bits
|
|
package = (dword >> 26) & 0x3F # 6 bits
|
|
|
|
resources.append({
|
|
'number': res_number,
|
|
'type': res_type,
|
|
'type_name': self.RESOURCE_TYPES.get(res_type, f"Unknown({res_type})"),
|
|
'offset': offset,
|
|
'package': package
|
|
})
|
|
|
|
return resources
|
|
|
|
def read_resource_header(self, package: int, offset: int) -> Optional[Dict]:
|
|
"""Read the header of a resource in a package file."""
|
|
resource_file = self.game_dir / f"RESOURCE.{package:03d}"
|
|
|
|
if not resource_file.exists():
|
|
return None
|
|
|
|
with open(resource_file, 'rb') as f:
|
|
f.seek(offset)
|
|
header_data = f.read(8)
|
|
|
|
if len(header_data) < 8:
|
|
return None
|
|
|
|
# SCI0 header format
|
|
word1 = struct.unpack('<H', header_data[0:2])[0]
|
|
res_number = word1 & 0x7FF
|
|
res_type = (word1 >> 11) & 0x1F
|
|
|
|
compressed_size = struct.unpack('<H', header_data[2:4])[0]
|
|
decompressed_size = struct.unpack('<H', header_data[4:6])[0]
|
|
method = struct.unpack('<H', header_data[6:8])[0]
|
|
|
|
return {
|
|
'number': res_number,
|
|
'type': res_type,
|
|
'type_name': self.RESOURCE_TYPES.get(res_type, f"Unknown({res_type})"),
|
|
'compressed_size': compressed_size,
|
|
'decompressed_size': decompressed_size,
|
|
'method': method,
|
|
'method_name': self.COMPRESSION_METHODS.get(method, f"Unknown({method})")
|
|
}
|
|
|
|
def extract_resource_data(self, package: int, offset: int, compressed_size: int,
|
|
decompressed_size: int, method: int) -> Optional[bytes]:
|
|
"""Extract and decompress resource data."""
|
|
resource_file = self.game_dir / f"RESOURCE.{package:03d}"
|
|
|
|
if not resource_file.exists():
|
|
return None
|
|
|
|
with open(resource_file, 'rb') as f:
|
|
f.seek(offset + 8) # Skip header
|
|
# For SCI0, compressed_size includes the 4 bytes for cbDecompressed and iMethod
|
|
data = f.read(compressed_size - 4)
|
|
|
|
if method == 0:
|
|
# No compression
|
|
return data[:decompressed_size]
|
|
elif method == 1:
|
|
# LZW compression
|
|
return decompress_lzw(data, decompressed_size)
|
|
elif method == 2:
|
|
# Huffman compression
|
|
return decompress_huffman(data, decompressed_size)
|
|
else:
|
|
print(f"Warning: Unsupported compression method {method}")
|
|
return None
|
|
|
|
def parse_text_resource(self, data: bytes) -> List[str]:
|
|
"""Parse a simple text resource (type 3)."""
|
|
strings = []
|
|
i = 0
|
|
|
|
while i < len(data):
|
|
# Find null-terminated string
|
|
start = i
|
|
while i < len(data) and data[i] != 0:
|
|
i += 1
|
|
|
|
if i > start:
|
|
try:
|
|
text = data[start:i].decode('latin-1')
|
|
strings.append(text)
|
|
except:
|
|
pass
|
|
|
|
i += 1 # Skip null terminator
|
|
|
|
return strings
|
|
|
|
def parse_message_resource(self, data: bytes) -> List[TextEntry]:
|
|
"""Parse a message resource (type 10 or embedded text in scripts)."""
|
|
entries = []
|
|
|
|
if len(data) < 2:
|
|
return entries
|
|
|
|
# Check for version indicator
|
|
msg_version = struct.unpack('<H', data[0:2])[0]
|
|
|
|
if msg_version <= 0x835: # 2101
|
|
# Version 2102 format
|
|
entries = self._parse_message_v2102(data)
|
|
elif msg_version <= 0xd53: # 3411
|
|
# Version 3411 format
|
|
entries = self._parse_message_v3411(data)
|
|
else:
|
|
# Version 4000+ format
|
|
entries = self._parse_message_v4000(data)
|
|
|
|
return entries
|
|
|
|
def _parse_message_v2102(self, data: bytes) -> List[TextEntry]:
|
|
"""Parse message resource version 2102 (SCI0/early SCI1)."""
|
|
entries = []
|
|
|
|
if len(data) < 4:
|
|
return entries
|
|
|
|
msg_version = struct.unpack('<H', data[0:2])[0]
|
|
|
|
# Skip version and unknown word
|
|
pos = 4
|
|
|
|
if len(data) < pos + 2:
|
|
return entries
|
|
|
|
message_count = struct.unpack('<H', data[pos:pos+2])[0]
|
|
pos += 2
|
|
|
|
for i in range(message_count):
|
|
if pos + 6 > len(data):
|
|
break
|
|
|
|
entry = TextEntry()
|
|
entry.noun = data[pos]
|
|
entry.verb = data[pos + 1]
|
|
pos += 2
|
|
|
|
text_offset = struct.unpack('<H', data[pos:pos+2])[0]
|
|
pos += 4 # Skip text offset and 2 unknown bytes
|
|
|
|
# Read text at offset
|
|
if text_offset < len(data):
|
|
text_end = text_offset
|
|
while text_end < len(data) and data[text_end] != 0:
|
|
text_end += 1
|
|
try:
|
|
entry.text = data[text_offset:text_end].decode('latin-1')
|
|
except:
|
|
entry.text = "<decode error>"
|
|
|
|
entries.append(entry)
|
|
|
|
return entries
|
|
|
|
def _parse_message_v3411(self, data: bytes) -> List[TextEntry]:
|
|
"""Parse message resource version 3411."""
|
|
entries = []
|
|
|
|
if len(data) < 6:
|
|
return entries
|
|
|
|
# Skip first 2 bytes (ptr to end of text data)
|
|
pos = 2
|
|
|
|
message_count = struct.unpack('<H', data[pos:pos+2])[0]
|
|
pos += 2
|
|
|
|
for i in range(message_count):
|
|
if pos + 11 > len(data):
|
|
break
|
|
|
|
entry = TextEntry()
|
|
entry.noun = data[pos]
|
|
entry.verb = data[pos + 1]
|
|
entry.condition = data[pos + 2]
|
|
entry.sequence = data[pos + 3]
|
|
entry.talker = data[pos + 4]
|
|
pos += 5
|
|
|
|
text_offset = struct.unpack('<H', data[pos:pos+2])[0]
|
|
pos += 5 # Skip text offset + 3 unknown bytes
|
|
|
|
# Read text at offset
|
|
if text_offset < len(data):
|
|
text_end = text_offset
|
|
while text_end < len(data) and data[text_end] != 0:
|
|
text_end += 1
|
|
try:
|
|
entry.text = data[text_offset:text_end].decode('latin-1')
|
|
except:
|
|
entry.text = "<decode error>"
|
|
|
|
entries.append(entry)
|
|
|
|
return entries
|
|
|
|
def _parse_message_v4000(self, data: bytes) -> List[TextEntry]:
|
|
"""Parse message resource version 4000+ (SCI1.1+)."""
|
|
entries = []
|
|
|
|
if len(data) < 6:
|
|
return entries
|
|
|
|
# Skip offset to end and mystery number
|
|
pos = 4
|
|
|
|
message_count = struct.unpack('<H', data[pos:pos+2])[0]
|
|
pos += 2
|
|
|
|
for i in range(message_count):
|
|
if pos + 14 > len(data):
|
|
break
|
|
|
|
entry = TextEntry()
|
|
entry.noun = data[pos]
|
|
entry.verb = data[pos + 1]
|
|
entry.condition = data[pos + 2]
|
|
entry.sequence = data[pos + 3]
|
|
entry.talker = data[pos + 4]
|
|
pos += 5
|
|
|
|
text_offset = struct.unpack('<H', data[pos:pos+2])[0]
|
|
pos += 2
|
|
|
|
entry.style = struct.unpack('<I', data[pos:pos+4])[0]
|
|
pos += 4
|
|
|
|
# Read text at offset
|
|
if text_offset < len(data):
|
|
text_end = text_offset
|
|
while text_end < len(data) and data[text_end] != 0:
|
|
text_end += 1
|
|
try:
|
|
entry.text = data[text_offset:text_end].decode('latin-1')
|
|
except:
|
|
entry.text = "<decode error>"
|
|
|
|
entries.append(entry)
|
|
|
|
return entries
|
|
|
|
def extract_all_text(self, output_dir: str):
|
|
"""Extract all text resources and save to output directory."""
|
|
output_path = Path(output_dir)
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Read resource map
|
|
resources = self.read_resource_map()
|
|
print(f"Found {len(resources)} resources in RESOURCE.MAP")
|
|
|
|
# Group by type
|
|
text_resources = [r for r in resources if r['type'] == 3] # Text type
|
|
message_resources = [r for r in resources if r['type'] == 10] # Message type (if exists)
|
|
script_resources = [r for r in resources if r['type'] == 2] # Scripts
|
|
|
|
print(f"Found {len(text_resources)} text resources")
|
|
print(f"Found {len(message_resources)} message resources")
|
|
print(f"Found {len(script_resources)} script resources")
|
|
|
|
total_strings = 0
|
|
|
|
# Process text resources
|
|
for res in text_resources:
|
|
header = self.read_resource_header(res['package'], res['offset'])
|
|
if not header:
|
|
continue
|
|
|
|
print(f"Processing Text resource {res['number']} (package {res['package']}, offset {res['offset']}, method {header['method_name']})")
|
|
|
|
data = self.extract_resource_data(
|
|
res['package'],
|
|
res['offset'],
|
|
header['compressed_size'],
|
|
header['decompressed_size'],
|
|
header['method']
|
|
)
|
|
|
|
if data:
|
|
strings = self.parse_text_resource(data)
|
|
total_strings += len(strings)
|
|
|
|
# Write to file
|
|
output_file = output_path / f"text_{res['number']:03d}.txt"
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
for i, s in enumerate(strings):
|
|
f.write(f"[{i:04d}] {s}\n")
|
|
|
|
print(f" Extracted {len(strings)} strings to {output_file}")
|
|
|
|
# Process script resources for embedded text
|
|
print(f"\nScanning {len(script_resources)} script resources for embedded text...")
|
|
|
|
for res in script_resources:
|
|
header = self.read_resource_header(res['package'], res['offset'])
|
|
if not header:
|
|
continue
|
|
|
|
data = self.extract_resource_data(
|
|
res['package'],
|
|
res['offset'],
|
|
header['compressed_size'],
|
|
header['decompressed_size'],
|
|
header['method']
|
|
)
|
|
|
|
if data:
|
|
# Try to extract readable strings from script
|
|
strings = self._extract_strings_from_binary(data, min_length=5)
|
|
if strings:
|
|
total_strings += len(strings)
|
|
output_file = output_path / f"script_{res['number']:03d}_strings.txt"
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
for i, s in enumerate(strings):
|
|
f.write(f"[{i:04d}] {s}\n")
|
|
|
|
print(f"\nTotal strings extracted: {total_strings}")
|
|
|
|
# Create a master index
|
|
index_file = output_path / "_index.txt"
|
|
with open(index_file, 'w', encoding='utf-8') as f:
|
|
f.write("SCI Resource Text Extraction Index\n")
|
|
f.write("=" * 50 + "\n\n")
|
|
f.write(f"Game directory: {self.game_dir}\n")
|
|
f.write(f"Total resources: {len(resources)}\n")
|
|
f.write(f"Text resources: {len(text_resources)}\n")
|
|
f.write(f"Script resources: {len(script_resources)}\n")
|
|
f.write(f"Total strings extracted: {total_strings}\n\n")
|
|
|
|
f.write("Resource List:\n")
|
|
f.write("-" * 50 + "\n")
|
|
for r in resources:
|
|
f.write(f"{r['type_name']:12s} #{r['number']:03d} -> Package {r['package']}, Offset {r['offset']}\n")
|
|
|
|
print(f"\nIndex written to {index_file}")
|
|
|
|
def _extract_strings_from_binary(self, data: bytes, min_length: int = 5) -> List[str]:
|
|
"""Extract readable strings from binary data."""
|
|
strings = []
|
|
i = 0
|
|
|
|
while i < len(data):
|
|
# Look for printable ASCII sequences
|
|
if 32 <= data[i] <= 126:
|
|
start = i
|
|
while i < len(data) and 32 <= data[i] <= 126:
|
|
i += 1
|
|
|
|
if i - start >= min_length:
|
|
try:
|
|
s = data[start:i].decode('ascii')
|
|
strings.append(s)
|
|
except:
|
|
pass
|
|
else:
|
|
i += 1
|
|
|
|
return strings
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
# Find the game directory
|
|
game_dirs = [
|
|
"King's Quest IV - The Perils of Rosella (1988)/KQ4",
|
|
"King's Quest IV - The Perils of Rosella (1988)",
|
|
]
|
|
|
|
game_dir = None
|
|
for d in game_dirs:
|
|
if os.path.exists(d):
|
|
game_dir = d
|
|
break
|
|
|
|
if not game_dir:
|
|
print("Error: Could not find game directory")
|
|
sys.exit(1)
|
|
|
|
print(f"Extracting text from: {game_dir}")
|
|
|
|
extractor = SCIResourceExtractor(game_dir)
|
|
extractor.extract_all_text("strings")
|
|
|
|
print("\nExtraction complete!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|