## answers 04: _the adventure of the erratic nucleotide_

In [1]:
import numpy as np

rng = np.random.default_rng(2024)    # fixed RNG seed so I can talk about specific random simulation results in part 2

## 1. implement the global alignment model

In [2]:
def global_score(X, R, match_score, mismatch_score, gap_score):
    """Optimal global alignment of actual sequence X to erratic read R; return optimal score.

    X:              actual source DNA sequence X[0..L-1] as a string; A|C|G|T characters only.
    R:              observed read sequence R[0..M-1], string of A|C|G|T chars
    match_score:    score for an A|C|G identity      (example: +1.85 bits for 90% sequencing accuracy)
    mismatch_score: score for mismatch to A|C|G in X (example: -2.91 bits for 0.10/3 prob of each of 3 substitutions)
    gap_score:      score for a insertion/deletion   (example: -5 bits, for 1/32=0.03125 probs of insertion and deletion)

    Returns: optimal global alignment score

    (We don't calculate an alignment traceback - we weren't asked for one.)
    """
    L = len(X)   # X is the correct phage source sequence 
    M = len(R)   # R is the errant read

    X = '*'+X    # X and R are 0-based, but it's convenient to index them 1..L and 1..M in dynamic programming alignment,
    R = '*'+R    # so we can use row and column 0 for initialization. Prepending an unused char to each string does the trick.

    S = np.empty( (L+1,M+1) )   # Allocate for our DP matrix. 

    # Initialization
    # S(0,0) = 0; the global alignment starts here
    # S(0,j) top row is for deletions. We can only delete T's from the start of X.
    # S(i,0) left col is for insertions. We can only insert T's at the start of R.
    S[0][0] = 0
    for i in range(1,L+1): S[i][0] = S[i-1][0] + (gap_score if X[i] == 'T' else -np.inf)  #
    for j in range(1,M+1): S[0][j] = S[0][j-1] + (gap_score if R[j] == 'T' else -np.inf)  #

    # DP recursion
    for i in range(1,L+1):
        for j in range(1,M+1):
            delete =  S[i-1][j] + (gap_score if X[i] == 'T' else -np.inf)   # leading deletions (from X)
            insert =  S[i][j-1] + (gap_score if R[j] == 'T' else -np.inf)   # leading insertions (in R)
            if X[i] == 'T': match = S[i-1][j-1] + 0
            else:           match = S[i-1][j-1] + (match_score if X[i] == R[j] else mismatch_score)

            S[i][j] = np.max([match, insert, delete])
    return S[L][M]

The trickiest thing about that might be getting the correct logic for the delete and insert options. We can only delete if base X[i] is a T. We can only insert if base R[j] is a T. Among the various things we might do to check that this routine is working properly, the pset offered us the sequences in the pset example figure as a positive control that should score 33.34 bits:

In [3]:
X = "TCACCTACCGCGGTCGGCGCGTCTTCGGCCCG"
R = "ACACCACCGCGGGTGGCGCGCCCCCGGCTCCG"
global_score(X, R, 1.85, -2.91, -5)

np.float64(33.34000000000001)

## 2. check that the -5 gap score seems reasonable

First a function to sample reads from an input sequence, given the probability model.

In [4]:
def sample(rng, X, accuracy, gap_prob):
    """Given an actual sequence X, sample an errant read and return it.

    rng:      numpy random number generator
    X:        input DNA sequence X[0..L-1] as a string; A|C|G|T characters only.
    accuracy: accuracy of sequencing A|C|G  [0.90, for example]
    gap_prob: probability of inserting T or deleting T [1/32 = 0.03125, for example]

    Returns observed read R as a string, A|C|G|T characters. 
    """
    L = len(X)        
    i = 0           # position in X[0..i..L-1]
    R = [ ]         # build the errant read as a list, convert to string at the end

    # Define the asymmetric substitution process
    # A|C|G are read with <accuracy>
    # T is read uniformly randomly (including T itself), accuracy=0.25
    #
    accmx = { 'A': [       accuracy, (1-accuracy)/3, (1-accuracy)/3,  (1-accuracy)/3 ],
              'C': [ (1-accuracy)/3,       accuracy, (1-accuracy)/3,  (1-accuracy)/3 ],
              'G': [ (1-accuracy)/3, (1-accuracy)/3,       accuracy,  (1-accuracy)/3 ],
              'T': [            1/4,            1/4,            1/4,             1/4 ] }

    # Generate the read.
    # Continue until we've consumed all of the actual read X,
    # (while allowing for insertions at the very end).
    #
    while 1:
        if   rng.random() <= gap_prob: R.append('T')                # insert T on read. At the end i==L, this is all we may do
        else:
            if i >= L: break                                        # we're at i=L and done; nothing to delete or substitute
            if X[i] != 'T' or rng.random() > gap_prob:              # substitution
                R.append(rng.choice(list('ACGT'), p=accmx[X[i]]))   # else: deletion of a T, just by skipping ahead in i.
            i += 1
    # Convert the read from list to string, and return it.
    return ''.join(R)



Again the trickiest thing is the logic of the inserts and deletes.

That's for synthesizing positives, and negatives are easy to sample, so I'm ready to go.

I'll organize the results in a data table with rows for different gap scores, and columns as gap score, mean positive score, mean negative score, and difference of the means; then just dump the table out.

In [5]:
ntrials  = 2000
seqlen   = 32
gapcosts = [ 0, -2, -3, -4, -5, -6, -7, -8, -10 ]      # I didn't ask for -3 or -7, but I'm doing them here anyway

tbl = np.zeros( (len(gapcosts), 4))

for g, gap_score in enumerate(gapcosts):
    tot_pos = 0
    tot_neg = 0
    for i in range(ntrials):
        X = ''.join(rng.choice(list('ACGT'), seqlen))
        R = sample(rng, X, 0.90, 1/32)
        tot_pos += global_score(X,R, +1.85, -2.91, gap_score)

        R = ''.join(rng.choice(list('ACGT'), seqlen))
        tot_neg += global_score(X,R, +1.85, -2.91, gap_score)

    tbl[g][0] = gap_score
    tbl[g][1] = tot_pos / ntrials
    tbl[g][2] = tot_neg / ntrials
    tbl[g][3] = tbl[g][1] - tbl[g][2]

print(tbl)

[[  0.        33.767135 -17.083945  50.85108 ]
 [ -2.        30.417205 -29.083615  59.50082 ]
 [ -3.        28.9571   -32.03146   60.98856 ]
 [ -4.        28.204125 -34.779895  62.98402 ]
 [ -5.        27.013465 -37.111995  64.12546 ]
 [ -6.        25.529415 -37.868205  63.39762 ]
 [ -7.        24.66718  -39.24156   63.90874 ]
 [ -8.        23.623375 -39.695485  63.31886 ]
 [-10.        21.29678  -40.37112   61.6679  ]]


That takes a while! (There's a reason that sequence alignment tools aren't usually written in Python.)

Yes, -5 is optimal for the best separation of mean score of positives - mean score of negatives. Holmes must know something.

With 2000 samples, these differences are only accurate to about +/- 1 bit, so it's a little lucky that the optimum was right at -5. From -4 to -8 they're statistically about the same. 

## 3. implement the alignment model for genome scanning


Almost identical to the global alignment version. I've only changed a piece of the initialization, and added a few extra lines to keep the best score seen at any end point.

In [6]:
def scan(X, R, match_score, mismatch_score, gap_score):
    """Scan genome sequence X with erratic read R; return position and score of best alignment.

    Alignment is global with respect to the erratic read, local with
    respect to the genome sequence. Locality in the genome sequence is
    implemented simply by initializing column 0 to 0's, and taking the
    max in column M.

    The reported position is the end point of the alignment on the
    genome sequence.  We don't report the start; obtaining that would
    require a traceback, which we weren't asked to do.

    X:              actual source DNA sequence X[0..L-1] as a string; A|C|G|T characters only.
    R:              observed read sequence R[0..M-1], string of A|C|G|T chars
    match_score:    score for an A|C|G identity      (example: +1.85 bits for 90% sequencing accuracy)
    mismatch_score: score for mismatch to A|C|G in X (example: -2.91 bits for 0.10/3 prob of each of 3 substitutions)
    gap_score:      score for a insertion/deletion   (example: -5 bits, for 1/32=0.03125 probs of insertion and deletion)

    Returns: tuple of (best_endpos, best_score).
             best_endpos is in human 1..L coords on genome sequence X (not Python 0..L-1 coords)
    """
    L = len(X)   # X is the correct phage source sequence 
    M = len(R)   # R is the errant read

    X = '*'+X    # X and R are 0-based, but it's convenient to index them 1..L and 1..R in dynamic programming alignment,
    R = '*'+R    # so we can use row and column 0 for initialization. Prepending an unused char to each string does the trick.

    S = np.empty( (L+1,M+1) )   # Allocate for our DP matrix. 

    # Initialization
    # We only change two things relative to the global_score code, and
    # one of them is here: init col 0 to 0, making leading deletions of X residues free
    S[0][0] = 0
    for i in range(1,L+1): S[i][0] = 0                                                    # leading deletions (from X) 
    for j in range(1,M+1): S[0][j] = S[0][j-1] + (gap_score if R[j] == 'T' else -np.inf)  # leading insertions (in R) 

    # DP recursion
    best_pos = 0
    best_sc  = -np.inf
    for i in range(1,L+1):
        for j in range(1,M+1):
            delete =  S[i-1][j] + (gap_score if X[i] == 'T' else -np.inf)
            insert =  S[i][j-1] + (gap_score if R[j] == 'T' else -np.inf)
            if X[i] == 'T': match = S[i-1][j-1] + 0
            else:           match = S[i-1][j-1] + (match_score if X[i] == R[j] else mismatch_score)

            S[i][j] = np.max([match, insert, delete])
        # The second change is here: keep track of the maximum pos, score in column M
        if S[i][M] > best_sc:
            best_sc  = S[i][M]
            best_pos = i

    return (best_pos, best_sc)


## 4. demonstrate proof of principle

I'll need to be able to read the phage genome files:

In [7]:
def read_fasta(seqfile):
    """Read one sequence from FASTA <seqfile> and return it (as a string) in uppercase.

    A very basic and simple parser. Assumes there is only one sequence
    in the file.  Doesn't check the sequence for valid chars: takes
    any non-whitespace char and converts to upper case.
    """
    with open(seqfile) as f:
        line = f.readline()               # skip the first line that starts with > and has the name, description
        seq = ''
        for line in f:
            seq += line.strip().upper()   # .strip() removes whitespace; .upper() converts to upper case
    return seq

After downloading the three genomes and the example read file, I can read them in:

In [8]:
example_read = read_fasta("example_read.fa")
citius       = read_fasta("Citius")
tpa4         = read_fasta("TPA4")
vulture      = read_fasta("Vulture")

Now scan the read against each genome in turn:

In [9]:
(citius_best_endpos,  citius_best_score)  = scan(citius,  example_read, +1.85, -2.91, -5)
(tpa4_best_endpos,    tpa4_best_score)    = scan(tpa4,    example_read, +1.85, -2.91, -5)
(vulture_best_endpos, vulture_best_score) = scan(vulture, example_read, +1.85, -2.91, -5)

print("Citius:  {:.2f} bits".format(citius_best_score))
print("TPA4:    {:.2f} bits".format(tpa4_best_score))
print("Vulture: {:.2f} bits".format(vulture_best_score))
     

Citius:  33.10 bits
TPA4:    9.78 bits
Vulture: 9.23 bits


(That also takes a while, though not quite as long as part 2 did.)

The hit in Citius is the strongest by some ways, plus that score is in the range of what we saw for positive controls. 

The best hits in TPA4 and Vulture (around 9-10 bits) are much higher than what we saw for negative controls (around -37 bits) but those were from global alignments to a single fixed-length target sequence. Now we're taking the max over a large number of possible global alignments to different start points in the genome and to different lengths of target sequence (in effect - though we're doing it efficiently and recursively with dynamic programming). What's that expected score distribution? We would try to figure it out (extreme value distribution, blah blah blah, but we haven't built up any machinery for this in the course yet.  But we _do_ know how to empirically score randomized genomes as a negative control. Let's do all three just to demonstrate. The NumPy `rng.shuffle()` takes a list not a string, so I make a little string-shuffling function.


In [10]:
def shuffled(s):
    shuf = list(s)
    rng.shuffle(shuf)
    return ''.join(shuf)
    
(c_shuf_best_endpos, c_shuf_best_score) = scan(shuffled(citius),  example_read, +1.85, -2.91, -5)
(t_shuf_best_endpos, t_shuf_best_score) = scan(shuffled(tpa4),    example_read, +1.85, -2.91, -5)
(v_shuf_best_endpos, v_shuf_best_score) = scan(shuffled(vulture), example_read, +1.85, -2.91, -5)

print("Citius:  {:.2f} bits".format(c_shuf_best_score))
print("TPA4:    {:.2f} bits".format(t_shuf_best_score))
print("Vulture: {:.2f} bits".format(v_shuf_best_score))


Citius:  6.87 bits
TPA4:    7.66 bits
Vulture: 15.60 bits


So we can get scores as high as 7-15 bits easily by chance. That's evidence in favor of the TPA4 and Vulture best hits being noise, and the Citius hit being real.

We bet on Citius, and that's right. Moriarty owes another box of donuts.

In [11]:
%load_ext watermark
%watermark -v -m -p numpy,jupyter

Python implementation: CPython
Python version       : 3.12.5
IPython version      : 8.26.0

numpy  : 2.1.0
jupyter: 1.0.0

Compiler    : Clang 13.0.0 (clang-1300.0.29.30)
OS          : Darwin
Release     : 23.6.0
Machine     : x86_64
Processor   : i386
CPU cores   : 16
Architecture: 64bit

