ssiv/src/ssiv/image_loader_generator.py

371 lines
14 KiB
Python

# This file is part of ssiv.
#
# ssiv is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ssiv is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with ssiv. If not, see <https://www.gnu.org/licenses/>.
from io import BytesIO
from math import gcd
from struct import pack,unpack,calcsize
from zlib import crc32
from .image_loader_frame import *
from .tools import *
__all__=[
'generator_Signle','generator_Anime','generator_APNG',
'open_APNG','isAPNG','get_shrink_size',
]
# head of PNG format
PNG_HEAD=b'\x89PNG\r\n\x1a\n'
# width, height, depth, color type, compression method, filter method, interlace method
PNG_IHDR_FMT='>IIBBBBB'
# width, height, offset x, offset y, delay_num, delay_den, dispose_op, blend_op
APNG_FCTL_FMT='>IIIIHHBB'
DISPOSE_MAP={
'undefined':DISPOSE_NONE,
'none':DISPOSE_NONE,
'background':DISPOSE_BACKGROUND,
'previous':DISPOSE_PREVIOUS,
}
def _makeblob(im,width,height,blen=None,fmt=None):
if blen is not None and 0<blen<max(width,height):
buf=bytearray()
for x,y,w,h in generate_crop_points(width,height,blen):
with im[x:x+w,y:y+h] as piece:
buf.extend(piece.make_blob(format=fmt))
return buf,blen
return im.make_blob(format=fmt),0
def _loadimg(im,blen=None):
# load all useful and only useful infomation from Image object.
# should called after Image.read() operation in the same process and thread.
if im.depth-8:
# set depth per color
im.depth=8
if im.format not in ('RGBA','RGB'):
# set to raw format according to alpha channel
im.format='RGBA' if im.alpha_channel else 'RGB'
# width, height, page width, page height, offset x, offset y
# unsigned long long, 'Q', for very very large image
width,height=im.size
page_width,page_height,*offset=im.page
geometry=width,height,page_width or width,page_height or height,*offset
# color format name
# string
fmt=im.format
# bytes per pixel
# unsigned char, 'B'
bpp=4 if fmt=='RGBA' else 3
# bits per color
# unsigned char, 'B'
depth=im.depth
# raw pixels
# bytearray
pixels,croplen=_makeblob(im,width,height,blen=blen)
palette=b''
length=len(pixels)
size=width*height*bpp
pixeldepth=depth*bpp
# assert: pixels amount
assert length==size,f'pixel amount not match {length} {size}'
# assert: color format and pixel depth
assert (fmt,pixeldepth) in (('RGB',24),('RGBA',32)),\
f'unknown pixfmt {pixeldepth} {fmt}'
return geometry,bpp,depth,fmt,croplen,DataBuffer(pixels,palette)
def py_padding_rgb24(pixels,width):
src_pitch=width*3
padding_len=(4-src_pitch)%4
with (BytesIO(pixels) as src,BytesIO() as dst):
srcread=src.read
dstwrite=dst.write
dstseek=dst.seek
while True:
if not dstwrite(srcread(src_pitch)):break
dstseek(padding_len,1)
return dst.getvalue()
def get_shrink_size(size,page,max_size):
shrink=None
width,height=size
page_width,page_height,offset_x,offset_y=page
max_w,max_h=(0,0) if max_size is None else max_size
width,height=max(width+offset_x,page_width),max(height+offset_y,page_height)
if (max_w+max_h)*((width>max_w)+(height>max_h)):
shrink=(max_w,max_h)
return shrink
def shrink_image(im,shrink,first_size=None):
# return (scale_num, scale_den) and do resize if shrink is really needed
# (mainly for high accuracy offset in animation)
if shrink is None:return 1,1
firstw,firsth=im.size if first_size is None else first_size
sw,sh=shrink
if not (sw<firstw or sh<firsth):return 1,1
scale_num,scale_den=(sw,firstw) if (sw/firstw<sh/firsth) else (sh,firsth)
divisor=gcd(scale_num,scale_den)
scale_num=scale_num//divisor
scale_den=scale_den//divisor
if first_size is not None:
shrink=tuple(map(round,(length*scale_num/scale_den for length in im.size)))
im.transform(resize='{}x{}>'.format(*shrink))
return scale_num,scale_den
def isAPNG(data):
# return True for valid multi-frame APNG,
# return False for single-frame APNG, invalid APNG, standard PNG or any others
# document: https://wiki.mozilla.org/APNG_Specification
with BytesIO(data) as fp:
if fp.read(8)!=PNG_HEAD:
return False
is_apng=False
sequence=-1
width=height=0
loop=-1
n_fctl=n_fdat=0
while True:
try:
chunk_length,chunk_type=unpack('>L4s',fp.read(8))
chunk=fp.read(chunk_length)
if f'{crc32(chunk,crc32(chunk_type)):08x}'!=fp.read(4).hex():
return False
match (chunk_type,is_apng):
case (b'IEND',apng_flag):
return apng_flag and n_fctl==n_fdat==n_frames==sequence/2+1
case (b'acTL',True):
# duplicate 'acTL'
return False
case (b'acTL',False):
is_apng=True
n_frames,loop=unpack('>II',chunk)
if n_frames<2:
# APNG with only one frame is not APNG
return False
case (b'IHDR',True):
# 'acTL' before 'IHDR'
return False
case (b'IHDR',False):
if calcsize(PNG_IHDR_FMT)!=len(chunk):
return False
width,height,*_=unpack(PNG_IHDR_FMT,chunk)
if not width*height:
return False
case (b'IDAT',False):
# no 'acTL' before 'IDAT'
return False
case (b'IDAT',True):
if not width*height:
return False
n_fdat+=1
case (b'fdAT',False):
# no 'acTL' before 'fdAT'
return False
case (b'fdAT',True):
if not width*height:
return False
if unpack('>L',chunk[:4])[0]!=(sequence:=sequence+1):
return False
n_fdat+=1
case (b'fcTL',False):
# no 'acTL' before 'fcTL'
return False
case (b'fcTL',True):
if not width*height:
return False
if unpack('>L',chunk[:4])[0]!=(sequence:=sequence+1):
return False
if calcsize(APNG_FCTL_FMT)!=len(chunk[4:]):
return False
fw,fh,offx,offy,*_,dispose,blend=unpack(APNG_FCTL_FMT,chunk[4:])
if dispose not in DISPOSE_MAP.values():
return False
if not blend in (BLEND_NONE,BLEND_BLEND):
return False
if fw+offx>width or fh+offy>height:
return False
if not sequence:
if offx+offy:
return False
if (fw,fh)!=(width,height):
return False
n_fctl+=1
except:
return False
def load_single_frame(im,shrink=None,first_size=None,blen=None):
with im:
if first_size is not None:
_,_,*offset=im.page
scale_num,scale_den=shrink_image(im,shrink,first_size=first_size)
geometry,bpp,depth,fmt,croplen,databuffer=_loadimg(im,blen=blen)
delay=int(im.delay*10) # 10 milliseconds for each ticks
dispose=DISPOSE_MAP[im.dispose]
blend=BLEND_BLEND if im.alpha_channel else BLEND_NONE
if first_size is not None and scale_den>1:
# high accuracy offset (for animation)
*size,_,_=geometry
geometry=*size,*map(round,(pos*scale_num/scale_den for pos in offset))
frameprop=FrameProperty(delay,dispose,blend,bpp,depth,fmt,croplen,*geometry)
return frameprop,databuffer
def open_APNG(data):
# open APNG, return n_loop,n_frames,chunkdict
assert (p:=Pref.start(name=f'{__name__}: open_APNG'))
assert trace(__name__,'open_APNG: start')
chunkdict=dict(before=[],after=[],chunk=[],fctl=[])
# data is already verified as APNG, no need to assert
with BytesIO(data[len(PNG_HEAD):]) as fp: # skip PNG head
chunkpos_key='before'
while True:
chunk_length,chunk_type=unpack('>L4s',fp.read(8))
chunk=fp.read(chunk_length)
fp.read(4) # skip crc
match chunk_type:
case b'IEND':
break
case b'acTL':
n_frames,n_loop=unpack('>II',chunk)
case b'fcTL':
chunkdict['fctl'].append(chunk[4:])
case b'IDAT':
chunkdict['chunk'].append(chunk)
chunkpos_key='after'
case b'fdAT':
chunkdict['chunk'].append(chunk[4:])
case _:
chunkdict[chunkpos_key].append((chunk_type,chunk))
# crc already checked in APNG detection, ignore to speed up decoding
assert p.record(name='end')
assert trace(__name__,'open_APNG: end')
assert p.log()
return n_loop,n_frames,chunkdict
def PNG_repacker(width,height,idat,before=(),after=()):
yield PNG_HEAD
for chunk_type,chunk in (*before,(b'IDAT',idat),*after,(b'IEND',b'')):
if chunk_type==b'IHDR':
chunk=pack('>II',width,height)+chunk[8:]
yield from (pack('>L4s',len(chunk),chunk_type),chunk,
bytes.fromhex(f'{crc32(chunk,crc32(chunk_type)):08x}'))
def generator_APNG(chunkdict,options=None,shrink=None,cls=None):
# generate each frame in APNG
assert (p:=Pref.start(name=f'{__name__}: generator_APNG'))
assert trace(__name__,'generator_APNG: start')
befores=chunkdict['before']
afters=chunkdict['after']
fctls=chunkdict['fctl']
chunks=chunkdict['chunk']
assert len(fctls)==len(chunks)
chunkdict.clear()
n_frames=len(fctls)
first_size=None
n=0
while fctls and chunks:
n,fctl,chunk=n+1,fctls.pop(0),chunks.pop(0)
fw,fh,offx,offy,delayn,delayd,dispose,blend=unpack(APNG_FCTL_FMT,fctl)
delay=int(delayn*1000//(delayd if delayd else 100))
if n==1 and shrink is not None:
# use size and page from first frame
first_size=fw,fh
shrink=get_shrink_size(first_size,(fw,fh,offx,offy),shrink)
assert trace(__name__,f'generator_APNG: loading: {n}/{n_frames}')
repacked=b''.join(PNG_repacker(fw,fh,chunk,before=befores,after=afters))
frameprop,databuffer=load_single_frame(
cls(blob=repacked,options=options),
shrink=shrink,first_size=first_size)
frameprop.offset_x+=int(offx*frameprop.width/fw)
frameprop.offset_y+=int(offy*frameprop.height/fh)
frameprop.delay=delay
frameprop.dispose=dispose
frameprop.blend=blend
assert p.record(name=f'APNG frame {n}/{n_frames}')
yield frameprop,databuffer
befores.clear()
afters.clear()
assert p.record(name='APNG end')
assert trace(__name__,'generator_APNG: end')
assert p.log()
def generator_Anime(im,options=None,shrink=None):
# generate each frame
assert (p:=Pref.start(name=f'{__name__}: generator_Anime'))
assert trace(__name__,'generator_Anime: start')
n_frames=im.iterator_length()
first_size=None
im.iterator_reset()
n=0
has_next=True
while has_next:
n+=1
frame=im.image_get()
if not im.iterator_next():
im.close()
has_next=False
if n==1 and shrink is not None:
# use value instead of reference
first_size=im.width,im.height
shrink=get_shrink_size(first_size,(*im.page,),shrink)
# frame will be closed by load_single_frame
assert trace(__name__,f'generator_Anime: loading: {n}/{n_frames}')
frameprop,databuffer=load_single_frame(
frame,shrink=shrink,first_size=first_size)
assert p.record(name=f'Anime frame {n}/{n_frames}')
yield frameprop,databuffer
assert p.record(name='Anime end')
assert trace(__name__,'generator_Anime: end')
assert p.log()
def generator_Signle(im,options=None,shrink=None,blen=None):
# generator of (0, 1) and image raw data
assert (p:=Pref.start(name=f'{__name__}: generator'))
assert trace(__name__,'generator: start')
if im.iterator_length()>1:
# only load first frame
im.iterator_reset()
frame=im.image_get()
im.close()
assert p.record(name='get first frame')
# frame will be closed by load_single_frame
yield load_single_frame(frame,shrink=shrink,blen=blen)
else:
# image will be closed by load_single_frame
yield load_single_frame(im,shrink=shrink,blen=blen)
assert p.record(name='end')
assert trace(__name__,'generator: end')
assert p.log()
# Local Variables:
# coding: utf-8
# mode: python
# python-indent-offset: 4
# indent-tabs-mode: nil
# End: