"""Functions to play and display phrases/events and produce scores""" import os, socket, threading, queue, time, subprocess def trysend(sock, string): "Ignore socket errors if reading socket has closed" try: sock.send(string.encode()) except socket.error: pass def screen(phrase, count, sock, t_zero): """displays phrases nicely on-screen""" sep = '\n ' if any(len(event) > 1 for event in phrase) else ' ' msg = [sep, str(count), ":", sep] + [str((event.pitches, event.duration)) + sep for event in phrase] delay = (phrase[0].start + t_zero) / 1000 - time.time() if delay > 0: threading.Timer(delay, trysend, (sock, ''.join(msg))).start() else: trysend(sock, ''.join(msg)) def fluid_args(event, bank, channel, font, sock): """Build args to play with fluidsynth""" octave = event.octave highest = max(event.pitches) lowest = min(event.pitches) if highest > 48: adjust = -((highest - 48) // octave - 1) * octave elif lowest < -60: adjust = -((lowest + 60) // octave) * octave else: adjust = 0 event.pitches = [note + adjust for note in event.pitches] instrument = str(event.instrument) if event.volume <= 0: velocity = '0' else: velocity = str(min(event.volume, 10) * 10 // (len(event) ** 0.2)) message = ["select ", channel, " ", font, " ", bank, " ", instrument, "\n", "settuning ", channel, " ", bank, " ", instrument, "\n"] for note in event.pitches: pitch = str((note * 12 / octave + 60) * 100) message.extend([ "tune ", bank, " ", instrument, " ", str(note + 60), " ", pitch, "\n", "noteon ", channel, " ", str(note + 60), " ", velocity, "\n"]) playtime = not event.pedal and event.tempo * event.playtime / 1000 return sock, message, playtime def fluid_send(sock, message, playtime): """Send to fluidsynth; noteoff if required""" sock.send(''.join(message).encode()) if playtime: off = ''.join(message[-7:]).replace('on', 'off') threading.Timer(playtime, sock.send, [off.encode()]).start() def sample_args(event, samples, modify): """Build args for sox play""" if modify: samples = dir2list(samples) if len(event) > 1: default_vol = 3 else: default_vol = -6 volume = event.volume if volume >= 10: velocity = str(default_vol) + "db" elif volume <= 0: velocity = "-100db" else: velocity = str(default_vol - ((10 - volume) ** 1.5)) + "db" arglist = ["play", '-q', "-V1"] if len(event) > 1: arglist.append('-m') arglist += [samples[i % len(samples)] for i in event.pitches] dur = event.tempo * event.playtime / 1000 arglist += ['silence', '1', '5', '2%'] if event.pedal: fade = event.tempo * 3 / 1000 arglist += ['fade', '0', str(fade + dur), str(fade)] else: arglist += ['trim', '0' , str(dur)] arglist += ["vol", velocity] return (arglist,) def synth_args(event, sound): """Build args for sox synth""" if sound == 'pinknoise': default_vol = -3 else: default_vol = -15 if event.volume >= 10: velocity = str(default_vol) + "db" elif event.volume <= 0: velocity = "-100db" else: velocity = str(default_vol - ((10 - event.volume) ** 1.5)) + "db" if event.pedal: event.playtime += 3 fade = str(event.tempo * 3 / 1000) dur = str(event.tempo * event.playtime / 1000) synthargs = ["play", "-V1", "-q", "-n", "synth", dur] for i in event.pitches: if sound == 'pinknoise': synthargs += ["pinknoise", "pitch", str(i * 1200 / event.octave)] else: synthargs += ["sin", "%" + str(12.0 * (i) /event.octave )] if event.pedal: synthargs += ['fade', '0', dur, fade] synthargs += ["vol", velocity] return (synthargs,) def phlay(phrase, t_zero, playfunc, arg_func, *playargs): """Play a phrase""" for event in phrase: if event.playtime: args = arg_func(event, *playargs) sleeptime = (event.start + t_zero)/1000 - time.time() if sleeptime >= 0 : time.sleep(sleeptime) playfunc(*args) else: #pass print('Note dropped') def instrument_list(argfunc, *playargs): """Show available instruments""" msg = "\n Available instruments:\n\n" if argfunc == fluid_args: fluid = playargs[-1] fluid.send(b"inst 1 \n") time.sleep(.1) inst = fluid.recv(10000) msg += '\n' + inst.decode() + '\n' elif argfunc == sample_args: mesg = ['\n'] samples = os.listdir(playargs[0]) if playargs[1] else playargs[0] for num, sample in enumerate(samples): mesg.extend([str(num), ") ", os.path.basename(sample), '\n']) msg += ''.join(mesg) elif argfunc == synth_args: msg += '\nSox synth: ' + playargs[1] + '\n' print(msg) def dir2list(directory, blacklist=('dat', 'sl', 'la', 'al', 'ul'), formats=[], verbose=False): "Make dict of audio files found recursively in given directory" if os.path.isdir(directory): if verbose: print("Getting audio files") if not formats: sox = subprocess.Popen(['sox','-h'], stdout=subprocess.PIPE) sox.wait() for line in sox.stdout: line = line.decode() if line.startswith('AUDIO FILE FORMATS:'): formats[:] = line.split()[3:] for badext in blacklist: formats.remove(badext) break samples = [] for path, _, files in os.walk(directory): if '.' not in path: for i in files: if os.path.splitext(i)[1][1:].lower() in formats: samples.append(os.path.join(path, i)) if samples: if verbose: print(len(samples), " audio files found.") return samples mesg = '\n ' + directory + ": no audio files found" else: mesg = '\n ' + directory + ": no such directory" raise OSError(mesg) def play(fluidsynth=None, channel=0, bank='0', driver=None, font='1', channels=[9], fluidproc=[], samples=None, directory=None, sox=False, orchestra=False): "Process opts arguments" playargs = None if fluidsynth: if not fluidproc: fluidproc.append(subprocess.Popen(["fluidsynth", "-siln", "-g0.5", "-C1", "-R1", "-a", driver, "-j", fluidsynth.name], stdout=open(os.devnull), stderr=subprocess.STDOUT)) if channel != 9: while channel in channels: channel += 1 channels.append(channel) fluidsock = socket.socket() while 1: try: fluidsock.connect(('', 9800)) break except socket.error: time.sleep(.01) playargs = (fluid_send, fluid_args, bank, str(channel), font, fluidsock) elif samples: playargs = (subprocess.Popen, sample_args, dir2list(samples, verbose=True), False) elif directory: playargs = (subprocess.Popen, sample_args, directory, True) elif sox: playargs = (subprocess.Popen, synth_args, sox) if playargs and orchestra: instrument_list(*playargs[1:]) return playargs, fluidproc[0] if fluidproc else None def lilyprint(printlist, score=True, midi=False, filename='phraser', overwrite=False, verbose=True, percussion=False): """Translate output into Lilypond text and print a score or midi file""" pitch_dict = {0:" c", 1:" cis", 2:" d", 3:" dis", 4:" e", 5:" f", 6:" fis", 7:" g", 8:" gis", 9:" a", 10:" ais", 11:" b", "rest":" r"} duration_dict = {1:' 16', 2:' 8', 3:' 8.' , 4:' 4' , 6:' 4.', 7:' 4..', 8:' 2', 12:' 2.', 14:' 2..'} last_tempo = "" last_time_sig = "" last_octave = "" last_clef = "" getversion = subprocess.Popen(['lilypond', '-v'], stdout=subprocess.PIPE) getversion.wait() version = getversion.stdout.readline().strip().split()[-1].decode() scorelist = ["\n\\version \"" + version + "\" { "] for phrase in printlist: tempo = (" \n \\tempo " + "4 = " + str(int(15000 / phrase[0].tempo)) + " \\once \\override Score.MetronomeMark #'transparent = ##t \n") if tempo == last_tempo: tempo = "" else: last_tempo = tempo highest = max([max(event.pitches) for event in phrase]) lowest = min([min(event.pitches) for event in phrase]) middle = (highest + lowest) // 2 if middle >= 30 : octave = 2 elif middle in range(18, 30): octave = 1 elif middle in range(-18, 18): octave = 0 elif middle in range(-30, -18): octave = -1 elif middle < -30 : octave = -2 octave =" #(set-octavation " + str(octave) +")" if octave == last_octave: octave = "" else: last_octave = octave if percussion: clef = "percussion" elif middle < 0: clef = "bass" else: clef = "treble" clef = "\n \\clef " + clef if clef == last_clef: clef = "" else: last_clef = clef numerator = sum([event.duration for event in phrase]) denominator = 16 while not numerator % 2 and denominator > 4: numerator //= 2 denominator //= 2 time_sig = " \n \\time " + str(numerator) + "/" + str(denominator) if time_sig == last_time_sig: time_sig = "" else: last_time_sig = time_sig newbar = ["\n", time_sig, octave, clef, tempo, "\n"] for event in phrase: duration = event.duration if duration == 0: continue if event.volume <= 0 or event.playtime <= 0: pitches = ['r'] else: if len(event) > 1: openchord , closechord = " < ", " > " else: openchord , closechord = "", "" pitches = [openchord] for i in event.pitches : tag = [""] if i >= 0: for _ in range(((i)//12 + 1)) : tag.append("'") else: for _ in range(abs(((i)//12 + 1))) : tag.append(",") pitches += [pitch_dict[(i) % 12]] + tag pitches.append(closechord) pitches = ''.join(pitches) semibreves, remainder = divmod(duration, 16) if semibreves: newbar += [pitches, " 1"] for _ in range(1, semibreves): newbar += [" ~", pitches, " 1"] if remainder: newbar += [" ~"] if remainder: if remainder == 5: newbar += [pitches, " 4", " ~", pitches, "16"] elif remainder == 9: newbar += [pitches, " 2", " ~", pitches, "16"] elif remainder == 10: newbar += [pitches, " 2", " ~", pitches, "8"] elif remainder == 11: newbar += [pitches, " 2", " ~", pitches, "8."] elif remainder == 13: newbar += [pitches, " 2.", " ~", pitches, "16"] elif remainder == 15: newbar += [pitches, " 2.", " ~", pitches, "8."] else: newbar += [pitches, duration_dict[remainder]] scorelist += newbar scorestring = ''.join(scorelist + ["\n }"]) score_path = os.path.expanduser("~") + "/" + filename while os.path.exists(score_path): score_path += 'O' if not overwrite: tag = 1 while (os.path.exists(score_path + ".pdf") or os.path.exists(score_path + ".mid")): try: int(score_path[-1]) score_path = score_path[:-1] except ValueError: score_path = score_path + str(tag) tag = tag + 1 strings = [] if score: if verbose: print(("\n Creating " + score_path + ".pdf...")) strings.append(scorestring) if midi: if verbose: print(("\nCreating " + score_path + ".mid...")) strings.append(" \\score {\n " + scorestring + "\n \\midi \n{ }\n} ") for string in strings: subprocess.Popen(["lilypond", "-ddelete-intermediate-files", "-o", score_path, "-" ], stdin=subprocess.PIPE, stderr=open(os.devnull)).communicate(string.encode()) if verbose: print('Done.\n') def disp(phrase, num, displist): """Create a refreshing pdf of the last num bars""" displist.append(phrase) if len(displist) == num: name = threading.current_thread().name + 'display' threading.Thread(target=lilyprint, args=(displist[:],), kwargs={'filename':name, 'overwrite':True, 'verbose':False}, name=name).start() displist[:] = [] def start_clock(phrase): """Set Phrase class time to current time""" bardur = phrase[0].tempo * sum(event.duration for event in phrase) round_t = int(time.time() * 1000) return round_t + bardur - round_t % bardur def fill_q(que, gen, flag): """Fill a queue""" for item in gen: if not flag.is_set(): break que.put(item) def work(flag, phrases, output_terminal, score_opts, playargs, qsize=1, display=None): """Executes all other functions in this module""" phrase_q = queue.Queue(qsize) q_filler = threading.Thread(target=fill_q, args=(phrase_q, phrases, flag), name='q_filler') q_filler.start() printlist, displist = [], [] counter = tries = t_zero = 0 while 1: try: phrase = phrase_q.get(True, .001) except queue.Empty: if not q_filler.is_alive(): break tries += 1 if tries == 300: output_terminal.send(b'Calculating phrases...') else: if flag.is_set(): tries = 0 counter += 1 screen(phrase, counter, output_terminal, t_zero) if playargs: t_zero = t_zero or start_clock(phrase) phlay(phrase, t_zero, *playargs) if display: disp(phrase, display, displist) if score_opts: printlist.append(phrase) else: break if printlist: lilyprint(printlist, **score_opts)