0

8かける8の画像認識

0
0
$$$$

画像認識

手書き数字の認識をしています。
8かける8のマスで0から9のラベルを判別します。
使うのは10枚の訓練データのみです。
性能は正解率が60パーセントです。

      from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
import random
from sklearn.datasets import load_digits
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
# =========================================================
# 1. 環境とデータの準備(手書き数字 64次元データ)
# =========================================================
np.random.seed(42)
digits = load_digits()
X_raw, y_raw = digits.data, digits.target
# 生態系の変化を見やすくするため、400サンプルを抽出
total_indices = np.random.choice(len(X_raw), 400, replace=False)
X_400, y_400 = X_raw[total_indices], y_raw[total_indices]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_400)
# 少数サンプルを訓練用に(残りをテスト環境に)
train_idx = []
for c in range(10):
    c_idxs = np.where(y_400 == c)[0]
    train_idx.extend(c_idxs[:1]) # 各クラス1枚ずつ
test_idx = [i for i in range(400) if i not in train_idx]
X_train, y_train = X_scaled[train_idx], y_400[train_idx]
X_test, y_test = X_scaled[test_idx], y_400[test_idx]
NUM_CLASSES = 10
INPUT_DIM = 64
# --- ハイパーパラメータ ---
INIT_CELLS = 680
MAX_CELLS = 10000
MIN_CELLS = 80
THOUGHT_STEPS = 3
lr_base = 0.30
lr_link_base = 0.10
DIVISION_THRESHOLD = 0.85     # 構造過学習を防ぐため、少し厳格化
DEATH_THRESHOLD = 0.08        # 淘汰圧を少し強める
GRAPH_DECAY = 0.90
EDGE_PRUNE_THRESHOLD = 0.02
epochs =6                   # 現象を観察するため、5サイクル回します
# =========================================================
# 2. 視覚記憶細胞(VisionMemoryCell)の定義
# =========================================================
class VisionMemoryCell:
    def __init__(self, cell_id):
        self.id = cell_id
        self.w_in = np.random.normal(0, 1.0, INPUT_DIM)  # 蜘蛛の巣初期化で再上書きされます
        self.A = np.random.normal(0, 0.1, NUM_CLASSES)
        self.links = {}
        self.visits = 0.0
        self.energy = 0.5
        self.recent_activity = 0.0
def cosine_similarity(w, x):
    return np.dot(w, x) / (np.linalg.norm(w) * np.linalg.norm(x) + 1e-9)
def softmax(x, temp=1.0):
    x = np.array(x) / temp
    x = x - np.max(x)
    e = np.exp(x)
    return e / (np.sum(e) + 1e-12)
# =========================================================
# 🧬 3. 代謝・新陳代謝コア(分裂コストの厳格化)
# =========================================================
def evolve_and_prune_space(memory_space, outer_graph):
    # 睡眠・自然減衰
    for cell in memory_space:
        cell.energy *= 0.92
        cell.recent_activity *= 0.50
        if cell.energy < 0.01: cell.energy = 0.01
        
    # --- 分裂フェーズ(ユーザーさんの提案によるコスト厳格化) ---
    new_cells = []
    current_max_id = max([c.id for c in memory_space]) if memory_space else 0
    for cell in memory_space:
        if len(memory_space) + len(new_cells) >= MAX_CELLS: break
        
        # 分裂条件をエネルギーかつ、 visits累積に設定
        if cell.energy > DIVISION_THRESHOLD and cell.visits > 15:
            child = VisionMemoryCell(current_max_id + len(new_cells) + 1)
            
            # 子は親の特性に少し変異(ノイズ)を乗せる
            child.w_in = np.clip(cell.w_in + np.random.normal(0, 0.05, INPUT_DIM), -3.0, 3.0)
            child.A = cell.A.copy()
            for k, v in cell.links.items(): child.links[k] = v * 0.5
            
            # 【ユーザーさんの設計思想】親の資源を大きく削り、子を未熟な状態からスタートさせる
            cell.energy *= 0.3
            cell.visits *= 0.5
            child.energy = 0.5  # 子の初期エネルギーは低い
            
            new_cells.append(child)
            
    memory_space.extend(new_cells)
    
    # --- 淘汰 & 外側マクログラフの再マッピング ---
    survived_space = []
    id_map = {}
    for cell in memory_space:
        if len(survived_space) > MIN_CELLS:
            if cell.energy < DEATH_THRESHOLD and cell.visits < 1.5:
                continue
        new_id = len(survived_space)
        id_map[cell.id] = new_id
        cell.id = new_id
        survived_space.append(cell)
        
    for cell in survived_space:
        cell.links = {id_map[old_id]: w * 0.85 for old_id, w in cell.links.items() if old_id in id_map and w > 0.05}
        
    new_graph = defaultdict(dict)
    for u, neighbors in outer_graph.items():
        new_u = ("cell", id_map[u[1]]) if u[0] == "cell" and u[1] in id_map else u
        if u[0] == "cell" and u[1] not in id_map: continue
        
        for v, weight in neighbors.items():
            decayed_weight = weight * GRAPH_DECAY
            if decayed_weight < EDGE_PRUNE_THRESHOLD: continue
            
            new_v = ("cell", id_map[v[1]]) if v[0] == "cell" and v[1] in id_map else v
            if v[0] == "cell" and v[1] not in id_map: continue
            
            new_graph[new_u][new_v] = decayed_weight
            
    return survived_space, new_graph
# =========================================================
# 🔄 4. 融合生命ダイナミクス(思考の歩行・側頭抑制)
# =========================================================
def live_vision_cycle(memory_space, outer_graph, x_64d=None, target_class=None, target_recall_class=None,
                      sensory_drive=1.0, intrinsic_drive=0.0, top_down_drive=0.0, learning_gate=1.0):
    if len(memory_space) == 0: return np.zeros(NUM_CLASSES), np.zeros(INPUT_DIM)
    id_to_cell = {c.id: c for c in memory_space}
    
    start_probabilities = np.zeros(len(memory_space))
    
    if x_64d is not None and sensory_drive > 0.0:
        sims = np.array([cosine_similarity(cell.w_in, x_64d) for cell in memory_space])
        start_probabilities += softmax(sims, temp=0.1) * sensory_drive
        
    if intrinsic_drive > 0.0:
        activity_weights = np.array([cell.energy * (cell.recent_activity + 0.05) for cell in memory_space])
        start_probabilities += (activity_weights / (np.sum(activity_weights) + 1e-12)) * intrinsic_drive
        
    if target_recall_class is not None and top_down_drive > 0.0:
        c_node = ("class", target_recall_class)
        for next_node, weight in outer_graph.get(c_node, {}).items():
            if next_node[0] == "cell" and next_node[1] in id_to_cell:
                start_probabilities[next_node[1]] += weight * top_down_drive
                
    if np.sum(start_probabilities) == 0: 
        start_probabilities = np.ones(len(memory_space)) / len(memory_space)
    start_probabilities /= np.sum(start_probabilities)
    
    current_cell_id = np.random.choice(list(id_to_cell.keys()), p=start_probabilities)
    path = [current_cell_id]
    
    collected_signals = [id_to_cell[current_cell_id].A.copy()]
    imagined_pixels = [id_to_cell[current_cell_id].w_in.copy()]
    
    # --- 思考がグラフを歩く(Graph Walk) ---
    for step in range(THOUGHT_STEPS - 1):
        current_cell = id_to_cell[current_cell_id]
        current_cell.recent_activity += sensory_drive * 1.0
        
        all_ids = list(id_to_cell.keys())
        candidates = set(np.random.choice(all_ids, min(25, len(all_ids)), replace=False))
        candidates.update(current_cell.links.keys())
        
        scores = []
        c_ids = []
        for cid in candidates:
            if cid in path or cid not in id_to_cell: continue
            cell = id_to_cell[cid]
            
            score = current_cell.links.get(cid, 0.0) * 2.5
            if x_64d is not None:
                score += 2.0 * cosine_similarity(cell.w_in, x_64d) * sensory_drive
            score += 0.5 * cosine_similarity(current_cell.A, cell.A)
            
            scores.append(score)
            c_ids.append(cid)
            
        if not scores: break
        probs = softmax(scores, temp=0.3)
        current_cell_id = np.random.choice(c_ids, p=probs)
        path.append(current_cell_id)
        collected_signals.append(id_to_cell[current_cell_id].A.copy())
        imagined_pixels.append(id_to_cell[current_cell_id].w_in.copy())
        
    final_perception = np.mean(np.stack(collected_signals), axis=0) if collected_signals else np.zeros(NUM_CLASSES)
    final_image = np.mean(np.stack(imagined_pixels), axis=0) if imagined_pixels else np.zeros(INPUT_DIM)
    
    sensory_factor = sensory_drive * learning_gate
    intrinsic_factor = intrinsic_drive * learning_gate
    
    inferred_class = np.argmax(final_perception)
    target = target_class if target_class is not None else inferred_class
    
    target_vec = np.zeros(NUM_CLASSES)
    target_vec[target] = 1.0
    
    # 報酬の決定(厳格化:専門化を促す)
    reward = min(1.0 / (np.linalg.norm(target_vec - final_perception) + 0.15), 2.5)
    
    # --- ヘブ則的な学習と、専門化のための側頭抑制(Lateral Inhibition) ---
    if x_64d is not None and sensory_factor > 0.0:
        # 1. 経路上の細胞の学習
        for cid in path:
            if cid in id_to_cell:
                cell = id_to_cell[cid]
                cell.visits += sensory_factor
                cell.energy = min(1.0, cell.energy + reward * 0.15 * sensory_factor)
                cell.w_in += (lr_base * 0.5 * sensory_factor) * (x_64d - cell.w_in)
                cell.A = (1.0 - lr_base * sensory_factor) * cell.A + (lr_base * sensory_factor) * target_vec
                
        # 2. 側頭抑制(同じ情報への過剰適合・重複増殖を防ぐ)
        # 歩いた細胞の近隣リンクにありながら、今回選ばれなかった細胞のエネルギーを少し削る
        for cid in path:
            if cid in id_to_cell:
                for neighbor_id in id_to_cell[cid].links.keys():
                    if neighbor_id not in path and neighbor_id in id_to_cell:
                        id_to_cell[neighbor_id].energy *= 0.95 # 側頭抑制によるエネルギー減衰
                
    total_learning_power = (sensory_factor * reward + intrinsic_factor * 0.2)
    if total_learning_power > 0.0:
        for i in range(len(path) - 1):
            u, v = path[i], path[i+1]
            if u in id_to_cell and v in id_to_cell:
                id_to_cell[u].links[v] = id_to_cell[u].links.get(v, 0.1) + (lr_link_base * total_learning_power)
                
        c_node = ("class", int(target))
        first_cell_node = ("cell", path[0])
        last_cell_node = ("cell", path[-1])
        
        for i in range(len(path) - 1):
            f_node = ("cell", path[i])
            t_node = ("cell", path[i+1])
            outer_graph[f_node][t_node] = outer_graph[f_node].get(t_node, 0.0) + total_learning_power
            outer_graph[t_node][f_node] = outer_graph[t_node].get(f_node, 0.0) + total_learning_power
            
        # クラスから細胞、細胞からクラスへの「双方向エッジ」を正確にマッピング
        outer_graph[first_cell_node][c_node] = outer_graph[first_cell_node].get(c_node, 0.0) + total_learning_power
        outer_graph[c_node][first_cell_node] = outer_graph[c_node].get(first_cell_node, 0.0) + total_learning_power
        
        outer_graph[last_cell_node][c_node] = outer_graph[last_cell_node].get(c_node, 0.0) + total_learning_power
        outer_graph[c_node][last_cell_node] = outer_graph[c_node].get(last_cell_node, 0.0) + total_learning_power
        
    return final_perception, final_image
# =========================================================
# 👥 5. アンサンブル・蜘蛛の巣トポロジー管理(ミーム交配の実装)
# =========================================================
class EnsembleVisionEcosystem:
    def __init__(self, n_estimators=5, total_cells=1000):
        self.n_estimators = n_estimators
        self.models = []
        NUM_RINGS = 5  # 蜘蛛の巣の階層数
        
        for _ in range(n_estimators):
            space = []
            graph = defaultdict(dict)
            
            # 密度の調整:外側(感覚層)ほど細胞を多く、内側(概念層)ほど少なく
            cells_per_ring = []
            remaining = total_cells
            for r in range(NUM_RINGS):
                if r == NUM_RINGS - 1:
                    cells_per_ring.append(remaining)
                else:
                    count = int(total_cells * (0.45 / (1.6**r)))
                    cells_per_ring.append(count)
                    remaining -= count
                    
            cell_id_counter = 0
            ring_nodes = {}
            
            # --- 1. 蜘蛛の巣の細胞配置と受容野初期化 ---
            for ring_idx, n_cells in enumerate(cells_per_ring):
                ring_nodes[ring_idx] = []
                for i in range(n_cells):
                    cell = VisionMemoryCell(cell_id_counter)
                    
                    # 外周(感覚層)ほど実際の数字に近い初期受容野を持たせる
                    sensory_proximity = 1.0 - (ring_idx / NUM_RINGS)
                    if sensory_proximity > 0.2:
                        ref_idx = np.random.choice(len(X_train))
                        base_pattern = X_train[ref_idx]
                        cell.w_in = base_pattern * sensory_proximity + np.random.normal(0, 0.3, INPUT_DIM) * (1.0 - sensory_proximity)
                        cell.A[y_train[ref_idx]] = 0.5 * sensory_proximity
                    
                    space.append(cell)
                    ring_nodes[ring_idx].append(cell_id_counter)
                    cell_id_counter += 1
                    
            # --- 2. 蜘蛛の巣の糸(リンク)の構築 ---
            id_to_cell = {c.id: c for c in space}
            for ring_idx in range(NUM_RINGS):
                nodes = ring_nodes[ring_idx]
                num_nodes = len(nodes)
                if num_nodes == 0: continue
                
                for i in range(num_nodes):
                    curr_id = nodes[i]
                    
                    # (a) 同心円の横の糸(横方向リンク)
                    next_id = nodes[(i + 1) % num_nodes]
                    id_to_cell[curr_id].links[next_id] = 0.4
                    id_to_cell[next_id].links[curr_id] = 0.4
                    
                    # (b) 放射方向の縦の糸(抽象化への階層リンク)
                    if ring_idx < NUM_RINGS - 1:
                        inner_nodes = ring_nodes[ring_idx + 1]
                        if len(inner_nodes) > 0:
                            inner_idx = int(i * len(inner_nodes) / num_nodes)
                            inner_id = inner_nodes[inner_idx]
                            
                            id_to_cell[curr_id].links[inner_id] = 0.6
                            id_to_cell[inner_id].links[curr_id] = 0.6
            self.models.append({'space': space, 'graph': graph})
    def train_cycle(self, X, y, phase="day"):
        for model in self.models:
            # 脳の個性を引き出すブートストラップサンプリング
            indices = np.random.choice(len(X), len(X), replace=True)
            for idx in indices:
                if phase == "day":
                    live_vision_cycle(model['space'], model['graph'], x_64d=X[idx], 
                                      target_class=y[idx], sensory_drive=1.0, intrinsic_drive=0.0)
                elif phase == "evening":
                    live_vision_cycle(model['space'], model['graph'], x_64d=X[idx], 
                                      target_class=y[idx], sensory_drive=0.3, intrinsic_drive=0.7)
    def sleep_cycle(self, steps=60):
        for model in self.models:
            for _ in range(steps):
                live_vision_cycle(model['space'], model['graph'], x_64d=None, 
                                  target_class=None, sensory_drive=0.0, intrinsic_drive=1.0)
    def cultural_crossover(self, crossover_rate=0.2):
        """異なる脳の間で、最も洗練されたイデア(ミーム)を交換する"""
        elite_cells_per_model = []
        
        for m_idx, model in enumerate(self.models):
            space = model['space']
            if not space: continue
            
            # 最深部(Ring 4に相当する後半10%のコア層)からエリートを選別
            core_start_idx = int(len(space) * 0.9)
            core_cells = space[core_start_idx:]
            if not core_cells: continue
            
            # エネルギーが高く、キャラクター(特定のクラスへの確信)が立っている細胞をスカウト
            elite_cell = max(core_cells, key=lambda c: (c.energy * np.max(c.A)))
            elite_cells_per_model.append((m_idx, elite_cell))
            
        # 集団間での文化伝播(シナプス感染)
        for src_idx, src_cell in elite_cells_per_model:
            target_indices = [i for i in range(self.n_estimators) if i != src_idx]
            if not target_indices: continue
            dst_idx = random.choice(target_indices)
            dst_model = self.models[dst_idx]
            
            dst_core_cells = dst_model['space'][int(len(dst_model['space']) * 0.9):]
            if not dst_core_cells: continue
            
            # 伝播先の脳の中で、最も迷っている(エネルギーの低い)細胞にミームをブレンドする
            weak_cell = min(dst_core_cells, key=lambda c: c.energy)
            
            # 知識の混ざり合い
            weak_cell.w_in = (1.0 - crossover_rate) * weak_cell.w_in + crossover_rate * src_cell.w_in
            weak_cell.A = (1.0 - crossover_rate) * weak_cell.A + crossover_rate * src_cell.A
            weak_cell.energy = max(weak_cell.energy, src_cell.energy * 0.8) # 文化刺激による活性化
    def evolve(self):
        for model in self.models:
            model['space'], model['graph'] = evolve_and_prune_space(model['space'], model['graph'])
    def predict(self, X, run_evolution=True):
        """
        推論を実行しつつ、その入力データにリアルタイムに適応して
        細胞を更新・分裂・淘汰させる(リアルタイム・オンライン学習モード)
        """
        y_pred = []
        for idx in range(len(X)):
            ensemble_perceptions = []
            
            # 各ベースモデル(脳)がデータに反応し、同時に学習・更新を行う
            for model in self.models:
                # 【変更点】
                # 1. target_class=None にすることで、自分の推論結果をターゲット(疑似ラベル)にする
                # 2. learning_gate=1.0 にして、推論時もヘブ則や側頭抑制を完全に作動させる
                perception, _ = live_vision_cycle(
                    model['space'], model['graph'], x_64d=X[idx], 
                    target_class=None, # 自身の推論(inferred_class)で自己学習
                    sensory_drive=1.0, 
                    intrinsic_drive=0.1, 
                    learning_gate=1.0  # ゲートを開放!推論時も動的に更新
                )
                ensemble_perceptions.append(perception)
            
            # アンサンブルとしての最終決定
            avg_perception = np.mean(ensemble_perceptions, axis=0)
            final_class = np.argmax(avg_perception)
            y_pred.append(final_class)
            
            # 【変更点】細胞を増やす・間引く(新陳代謝の実行)
            # サンプルごと(または数サンプルごと)に代謝を回すことで、推論中にリアルタイムに細胞が増減します
            if run_evolution and (idx + 1) % 10 == 0: # ここでは10サンプルごとに新陳代謝を実行
                self.evolve()
                
        return y_pred
    def get_average_cell_count(self):
        return int(np.mean([len(m['space']) for m in self.models]))
# =========================================================
# 🚀 6. 巡礼ループの実行
# =========================================================
print("=== 蜘蛛の巣ウェブ型・側頭抑制&ミーム交配システム 起動 ===")
ensemble = EnsembleVisionEcosystem(n_estimators=5, total_cells=INIT_CELLS)
for cycle in range(epochs):
    # ☀️ 昼
    ensemble.train_cycle(X_train, y_train, phase="day")
    # 🌆 夕
    ensemble.train_cycle(X_train, y_train, phase="evening")
    # 🌙 夜(夢)
    ensemble.sleep_cycle(steps=60)
    
    # 🌌 夢の最深部で「ミーム文化的交配」が発生
    ensemble.cultural_crossover(crossover_rate=0.2)
    
    # 🧬 新陳代謝(分裂コスト制限 & 淘汰)
    ensemble.evolve()
    
    # 📊 テスト
    y_pred = ensemble.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    
    avg_cells = ensemble.get_average_cell_count()
    print(f"Cycle {cycle+1:02d}/{epochs} | 平均現存細胞数: {avg_cells:>3} | アンサンブル正解率: {acc:.4f}")
# =========================================================
# 7. ダッシュボード:【 空想状態(Daydream) 】の可視化
# =========================================================
print("\n[Generating Daydream Images from the First Evolved Ecosystem...]")
fig, axes = plt.subplots(2, 5, figsize=(12, 5))
fig.suptitle("Daydreaming Mode (Top-Down Drive via Double Edges) - Model #1\nVisualizing the 'Ideals' of digits polished by Structural Control", fontsize=14)
first_model = ensemble.models[0]
for cls in range(10):
    _, daydream_image = live_vision_cycle(first_model['space'], first_model['graph'], 
                                          sensory_drive=0.0, intrinsic_drive=0.2, top_down_drive=1.0, 
                                          target_recall_class=cls, learning_gate=0.0)
    ax = axes[cls // 5, cls % 5]
    ax.imshow(daydream_image.reshape(8, 8), cmap="bone")
    ax.set_title(f"Concept: {cls}")
    ax.axis("off")
plt.tight_layout()
plt.show()
    

第二版(このコードは精度を保証しません)

      from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
import random
from sklearn.datasets import load_digits
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
# =========================================================
# 1. 環境とデータの準備(手書き数字 64次元データ)
# =========================================================
np.random.seed(42)
digits = load_digits()
X_raw, y_raw = digits.data, digits.target
# 生態系の変化を見やすくするため、400サンプルを抽出
total_indices = np.random.choice(len(X_raw), 400, replace=False)
X_400, y_400 = X_raw[total_indices], y_raw[total_indices]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_400)
# 少数サンプルを訓練用に(残りをテスト環境に)
train_idx = []
for c in range(10):
    c_idxs = np.where(y_400 == c)[0]
    train_idx.extend(c_idxs[:1]) # 各クラス1枚ずつ
test_idx = [i for i in range(400) if i not in train_idx]
X_train, y_train = X_scaled[train_idx], y_400[train_idx]
X_test, y_test = X_scaled[test_idx], y_400[test_idx]
NUM_CLASSES = 10
INPUT_DIM = 64
# --- ハイパーパラメータ ---
INIT_CELLS = 690
MAX_CELLS = 10000
MIN_CELLS = 80
THOUGHT_STEPS = 8
lr_base = 0.30
lr_link_base = 0.10
DIVISION_THRESHOLD = 0.85     # 構造過学習を防ぐため、少し厳格化
DEATH_THRESHOLD = 0.08        # 淘汰圧を少し強める
GRAPH_DECAY = 0.90
EDGE_PRUNE_THRESHOLD = 0.02
epochs = 6                   # 現象を観察するため、5サイクル回します
# =========================================================
# 2. 視覚記憶細胞(VisionMemoryCell)の定義
# =========================================================
class VisionMemoryCell:
    def __init__(self, cell_id):
        self.id = cell_id
        self.w_in = np.random.normal(0, 1.0, INPUT_DIM)  # 蜘蛛の巣初期化で再上書きされます
        self.A = np.random.normal(0, 0.1, NUM_CLASSES)
        self.links = {}
        self.visits = 0.0
        self.energy = 0.5
        self.recent_activity = 0.0
def cosine_similarity(w, x):
    return np.dot(w, x) / (np.linalg.norm(w) * np.linalg.norm(x) + 1e-9)
def softmax(x, temp=1.0):
    x = np.array(x) / temp
    x = x - np.max(x)
    e = np.exp(x)
    return e / (np.sum(e) + 1e-12)
# =========================================================
# 🧬 3. 代謝・新陳代謝コア(分裂コストの厳格化)
# =========================================================
def evolve_and_prune_space(memory_space, outer_graph):
    # 睡眠・自然減衰
    for cell in memory_space:
        cell.energy *= 0.92
        cell.recent_activity *= 0.50
        if cell.energy < 0.01: cell.energy = 0.01
        
    # --- 分裂フェーズ(ユーザーさんの提案によるコスト厳格化) ---
    new_cells = []
    current_max_id = max([c.id for c in memory_space]) if memory_space else 0
    for cell in memory_space:
        if len(memory_space) + len(new_cells) >= MAX_CELLS: break
        
        # 分裂条件をエネルギーかつ、 visits累積に設定
        if cell.energy > DIVISION_THRESHOLD and cell.visits > 15:
            child = VisionMemoryCell(current_max_id + len(new_cells) + 1)
            
            # 子は親の特性に少し変異(ノイズ)を乗せる
            child.w_in = np.clip(cell.w_in + np.random.normal(0, 0.05, INPUT_DIM), -3.0, 3.0)
            child.A = cell.A.copy()
            for k, v in cell.links.items(): child.links[k] = v * 0.5
            
            # 【ユーザーさんの設計思想】親の資源を大きく削り、子を未熟な状態からスタートさせる
            cell.energy *= 0.3
            cell.visits *= 0.5
            child.energy = 0.4  # 子の初期エネルギーは低い
            
            new_cells.append(child)
            
    memory_space.extend(new_cells)
    
    # --- 淘汰 & 外側マクログラフの再マッピング ---
    survived_space = []
    id_map = {}
    for cell in memory_space:
        if len(survived_space) > MIN_CELLS:
            if cell.energy < DEATH_THRESHOLD and cell.visits < 1.5:
                continue
        new_id = len(survived_space)
        id_map[cell.id] = new_id
        cell.id = new_id
        survived_space.append(cell)
        
    for cell in survived_space:
        cell.links = {id_map[old_id]: w * 0.85 for old_id, w in cell.links.items() if old_id in id_map and w > 0.05}
        
    new_graph = defaultdict(dict)
    for u, neighbors in outer_graph.items():
        new_u = ("cell", id_map[u[1]]) if u[0] == "cell" and u[1] in id_map else u
        if u[0] == "cell" and u[1] not in id_map: continue
        
        for v, weight in neighbors.items():
            decayed_weight = weight * GRAPH_DECAY
            if decayed_weight < EDGE_PRUNE_THRESHOLD: continue
            
            new_v = ("cell", id_map[v[1]]) if v[0] == "cell" and v[1] in id_map else v
            if v[0] == "cell" and v[1] not in id_map: continue
            
            new_graph[new_u][new_v] = decayed_weight
            
    return survived_space, new_graph
# =========================================================
# 🔄 4. 融合生命ダイナミクス(思考の歩行・側頭抑制)
# =========================================================
def live_vision_cycle(memory_space, outer_graph, x_64d=None, target_class=None, target_recall_class=None,
                      sensory_drive=1.0, intrinsic_drive=0.0, top_down_drive=0.0, learning_gate=1.0):
    if len(memory_space) == 0: return np.zeros(NUM_CLASSES), np.zeros(INPUT_DIM)
    id_to_cell = {c.id: c for c in memory_space}
    
    start_probabilities = np.zeros(len(memory_space))
    
    if x_64d is not None and sensory_drive > 0.0:
        sims = np.array([cosine_similarity(cell.w_in, x_64d) for cell in memory_space])
        start_probabilities += softmax(sims, temp=0.1) * sensory_drive
        
    if intrinsic_drive > 0.0:
        activity_weights = np.array([cell.energy * (cell.recent_activity + 0.05) for cell in memory_space])
        start_probabilities += (activity_weights / (np.sum(activity_weights) + 1e-12)) * intrinsic_drive
        
    if target_recall_class is not None and top_down_drive > 0.0:
        c_node = ("class", target_recall_class)
        for next_node, weight in outer_graph.get(c_node, {}).items():
            if next_node[0] == "cell" and next_node[1] in id_to_cell:
                start_probabilities[next_node[1]] += weight * top_down_drive
                
    if np.sum(start_probabilities) == 0: 
        start_probabilities = np.ones(len(memory_space)) / len(memory_space)
    start_probabilities /= np.sum(start_probabilities)
    
    current_cell_id = np.random.choice(list(id_to_cell.keys()), p=start_probabilities)
    path = [current_cell_id]
    
    collected_signals = [id_to_cell[current_cell_id].A.copy()]
    imagined_pixels = [id_to_cell[current_cell_id].w_in.copy()]
    
    # --- 思考がグラフを歩く(Graph Walk) ---
    for step in range(THOUGHT_STEPS - 1):
        current_cell = id_to_cell[current_cell_id]
        current_cell.recent_activity += sensory_drive * 1.0
        
        all_ids = list(id_to_cell.keys())
        candidates = set(np.random.choice(all_ids, min(25, len(all_ids)), replace=False))
        candidates.update(current_cell.links.keys())
        
        scores = []
        c_ids = []
        for cid in candidates:
            if cid in path or cid not in id_to_cell: continue
            cell = id_to_cell[cid]
            
            score = current_cell.links.get(cid, 0.0) * 2.5
            if x_64d is not None:
                score += 2.0 * cosine_similarity(cell.w_in, x_64d) * sensory_drive
            score += 0.5 * cosine_similarity(current_cell.A, cell.A)
            
            scores.append(score)
            c_ids.append(cid)
            
        if not scores: break
        probs = softmax(scores, temp=0.3)
        current_cell_id = np.random.choice(c_ids, p=probs)
        path.append(current_cell_id)
        collected_signals.append(id_to_cell[current_cell_id].A.copy())
        imagined_pixels.append(id_to_cell[current_cell_id].w_in.copy())
        
    final_perception = np.mean(np.stack(collected_signals), axis=0) if collected_signals else np.zeros(NUM_CLASSES)
    final_image = np.mean(np.stack(imagined_pixels), axis=0) if imagined_pixels else np.zeros(INPUT_DIM)
    
    sensory_factor = sensory_drive * learning_gate
    intrinsic_factor = intrinsic_drive * learning_gate
    
    inferred_class = np.argmax(final_perception)
    target = target_class if target_class is not None else inferred_class
    
    target_vec = np.zeros(NUM_CLASSES)
    target_vec[target] = 1.0
    
    # 報酬の決定(厳格化:専門化を促す)
    reward = min(1.0 / (np.linalg.norm(target_vec - final_perception) + 0.15), 2.5)
    
    # --- ヘブ則的な学習と、専門化のための側頭抑制(Lateral Inhibition) ---
    if x_64d is not None and sensory_factor > 0.0:
        # 1. 経路上の細胞の学習
        for cid in path:
            if cid in id_to_cell:
                cell = id_to_cell[cid]
                cell.visits += sensory_factor
                cell.energy = min(1.0, cell.energy + reward * 0.15 * sensory_factor)
                cell.w_in += (lr_base * 0.5 * sensory_factor) * (x_64d - cell.w_in)
                cell.A = (1.0 - lr_base * sensory_factor) * cell.A + (lr_base * sensory_factor) * target_vec
                
        # 2. 側頭抑制(同じ情報への過剰適合・重複増殖を防ぐ)
        # 歩いた細胞の近隣リンクにありながら、今回選ばれなかった細胞のエネルギーを少し削る
        for cid in path:
            if cid in id_to_cell:
                for neighbor_id in id_to_cell[cid].links.keys():
                    if neighbor_id not in path and neighbor_id in id_to_cell:
                        id_to_cell[neighbor_id].energy *= 0.95 # 側頭抑制によるエネルギー減衰
                
    total_learning_power = (sensory_factor * reward + intrinsic_factor * 0.2)
    if total_learning_power > 0.0:
        for i in range(len(path) - 1):
            u, v = path[i], path[i+1]
            if u in id_to_cell and v in id_to_cell:
                id_to_cell[u].links[v] = id_to_cell[u].links.get(v, 0.1) + (lr_link_base * total_learning_power)
                
        c_node = ("class", int(target))
        first_cell_node = ("cell", path[0])
        last_cell_node = ("cell", path[-1])
        
        for i in range(len(path) - 1):
            f_node = ("cell", path[i])
            t_node = ("cell", path[i+1])
            outer_graph[f_node][t_node] = outer_graph[f_node].get(t_node, 0.0) + total_learning_power
            outer_graph[t_node][f_node] = outer_graph[t_node].get(f_node, 0.0) + total_learning_power
            
        # クラスから細胞、細胞からクラスへの「双方向エッジ」を正確にマッピング
        outer_graph[first_cell_node][c_node] = outer_graph[first_cell_node].get(c_node, 0.0) + total_learning_power
        outer_graph[c_node][first_cell_node] = outer_graph[c_node].get(first_cell_node, 0.0) + total_learning_power
        
        outer_graph[last_cell_node][c_node] = outer_graph[last_cell_node].get(c_node, 0.0) + total_learning_power
        outer_graph[c_node][last_cell_node] = outer_graph[c_node].get(last_cell_node, 0.0) + total_learning_power
        
    return final_perception, final_image
# =========================================================
# 👥 5. アンサンブル・蜘蛛の巣トポロジー管理(ミーム交配の実装)
# =========================================================
class EnsembleVisionEcosystem:
    def __init__(self, n_estimators=5, total_cells=1000):
        self.n_estimators = n_estimators
        self.models = []
        NUM_RINGS = 5  # 蜘蛛の巣の階層数
        
        for _ in range(n_estimators):
            space = []
            graph = defaultdict(dict)
            
            # 密度の調整:外側(感覚層)ほど細胞を多く、内側(概念層)ほど少なく
            cells_per_ring = []
            remaining = total_cells
            for r in range(NUM_RINGS):
                if r == NUM_RINGS - 1:
                    cells_per_ring.append(remaining)
                else:
                    count = int(total_cells * (0.45 / (1.6**r)))
                    cells_per_ring.append(count)
                    remaining -= count
                    
            cell_id_counter = 0
            ring_nodes = {}
            
            # --- 1. 蜘蛛の巣の細胞配置と受容野初期化 ---
            for ring_idx, n_cells in enumerate(cells_per_ring):
                ring_nodes[ring_idx] = []
                for i in range(n_cells):
                    cell = VisionMemoryCell(cell_id_counter)
                    
                    # 外周(感覚層)ほど実際の数字に近い初期受容野を持たせる
                    sensory_proximity = 1.0 - (ring_idx / NUM_RINGS)
                    if sensory_proximity > 0.2:
                        ref_idx = np.random.choice(len(X_train))
                        base_pattern = X_train[ref_idx]
                        cell.w_in = base_pattern * sensory_proximity + np.random.normal(0, 0.3, INPUT_DIM) * (1.0 - sensory_proximity)
                        cell.A[y_train[ref_idx]] = 0.5 * sensory_proximity
                    
                    space.append(cell)
                    ring_nodes[ring_idx].append(cell_id_counter)
                    cell_id_counter += 1
                    
            # --- 2. 蜘蛛の巣の糸(リンク)の構築 ---
            id_to_cell = {c.id: c for c in space}
            for ring_idx in range(NUM_RINGS):
                nodes = ring_nodes[ring_idx]
                num_nodes = len(nodes)
                if num_nodes == 0: continue
                
                for i in range(num_nodes):
                    curr_id = nodes[i]
                    
                    # (a) 同心円の横の糸(横方向リンク)
                    next_id = nodes[(i + 1) % num_nodes]
                    id_to_cell[curr_id].links[next_id] = 0.4
                    id_to_cell[next_id].links[curr_id] = 0.4
                    
                    # (b) 放射方向の縦の糸(抽象化への階層リンク)
                    if ring_idx < NUM_RINGS - 1:
                        inner_nodes = ring_nodes[ring_idx + 1]
                        if len(inner_nodes) > 0:
                            inner_idx = int(i * len(inner_nodes) / num_nodes)
                            inner_id = inner_nodes[inner_idx]
                            
                            id_to_cell[curr_id].links[inner_id] = 0.6
                            id_to_cell[inner_id].links[curr_id] = 0.6
            self.models.append({'space': space, 'graph': graph})
    def train_cycle(self, X, y, phase="day"):
        for model in self.models:
            # 脳の個性を引き出すブートストラップサンプリング
            indices = np.random.choice(len(X), len(X), replace=True)
            for idx in indices:
                if phase == "day":
                    live_vision_cycle(model['space'], model['graph'], x_64d=X[idx], 
                                      target_class=y[idx], sensory_drive=1.0, intrinsic_drive=0.0)
                elif phase == "evening":
                    live_vision_cycle(model['space'], model['graph'], x_64d=X[idx], 
                                      target_class=y[idx], sensory_drive=0.3, intrinsic_drive=0.7)
    def sleep_cycle(self, steps=60):
        for model in self.models:
            for _ in range(steps):
                live_vision_cycle(model['space'], model['graph'], x_64d=None, 
                                  target_class=None, sensory_drive=0.0, intrinsic_drive=1.0)
    def cultural_crossover(self, crossover_rate=0.2):
        """異なる脳の間で、最も洗練されたイデア(ミーム)を交換する"""
        elite_cells_per_model = []
        
        for m_idx, model in enumerate(self.models):
            space = model['space']
            if not space: continue
            
            # 最深部(Ring 4に相当する後半10%のコア層)からエリートを選別
            core_start_idx = int(len(space) * 0.9)
            core_cells = space[core_start_idx:]
            if not core_cells: continue
            
            # エネルギーが高く、キャラクター(特定のクラスへの確信)が立っている細胞をスカウト
            elite_cell = max(core_cells, key=lambda c: (c.energy * np.max(c.A)))
            elite_cells_per_model.append((m_idx, elite_cell))
            
        # 集団間での文化伝播(シナプス感染)
        for src_idx, src_cell in elite_cells_per_model:
            target_indices = [i for i in range(self.n_estimators) if i != src_idx]
            if not target_indices: continue
            dst_idx = random.choice(target_indices)
            dst_model = self.models[dst_idx]
            
            dst_core_cells = dst_model['space'][int(len(dst_model['space']) * 0.9):]
            if not dst_core_cells: continue
            
            # 伝播先の脳の中で、最も迷っている(エネルギーの低い)細胞にミームをブレンドする
            weak_cell = min(dst_core_cells, key=lambda c: c.energy)
            
            # 知識の混ざり合い
            weak_cell.w_in = (1.0 - crossover_rate) * weak_cell.w_in + crossover_rate * src_cell.w_in
            weak_cell.A = (1.0 - crossover_rate) * weak_cell.A + crossover_rate * src_cell.A
            weak_cell.energy = max(weak_cell.energy, src_cell.energy * 0.8) # 文化刺激による活性化
    def evolve(self):
        for model in self.models:
            model['space'], model['graph'] = evolve_and_prune_space(model['space'], model['graph'])
    def predict(self, X, run_evolution=True):
        """
        推論を実行しつつ、その入力データにリアルタイムに適応して
        細胞を更新・分裂・淘汰させる(リアルタイム・オンライン学習モード)
        """
        y_pred = []
        for idx in range(len(X)):
            ensemble_perceptions = []
            
            # 各ベースモデル(脳)がデータに反応し、同時に学習・更新を行う
            for model in self.models:
                # 【変更点】
                # 1. target_class=None にすることで、自分の推論結果をターゲット(疑似ラベル)にする
                # 2. learning_gate=1.0 にして、推論時もヘブ則や側頭抑制を完全に作動させる
                perception, _ = live_vision_cycle(
                    model['space'], model['graph'], x_64d=X[idx], 
                    target_class=None, # 自身の推論(inferred_class)で自己学習
                    sensory_drive=1.0, 
                    intrinsic_drive=0.1, 
                    learning_gate=1.0  # ゲートを開放!推論時も動的に更新
                )
                ensemble_perceptions.append(perception)
            
            # アンサンブルとしての最終決定
            avg_perception = np.mean(ensemble_perceptions, axis=0)
            final_class = np.argmax(avg_perception)
            y_pred.append(final_class)
            
            # 【変更点】細胞を増やす・間引く(新陳代謝の実行)
            # サンプルごと(または数サンプルごと)に代謝を回すことで、推論中にリアルタイムに細胞が増減します
            if run_evolution and (idx + 1) % 10 == 0: # ここでは10サンプルごとに新陳代謝を実行
                self.evolve()
                
        return y_pred
    def get_average_cell_count(self):
        return int(np.mean([len(m['space']) for m in self.models]))
# =========================================================
# 🚀 6. 交互巡礼・適応ループの実行(訓練 ⇄ 推論の交互サイクル)
# =========================================================
print("=== 蜘蛛の巣ウェブ型・交互生態系システム 起動 ===")
ensemble = EnsembleVisionEcosystem(n_estimators=5, total_cells=INIT_CELLS)
# 全体のマクロサイクル数
MACRO_CYCLES = 1 
for m_cycle in range(MACRO_CYCLES):
    print(f"\n==================================================")
    print(f"🌀 マクロサイクル {m_cycle+1}/{MACRO_CYCLES} 開始")
    print(f"==================================================")
    
    # -----------------------------------------------------
    # 【フェーズA】訓練サイクル(基底知識のアップデート)
    # -----------------------------------------------------
    print("\n--- [Phase A: Training] 基礎訓練・睡眠・ミーム交配 ---")
    # ☀️ 昼
    ensemble.train_cycle(X_train, y_train, phase="day")
    # 🌆 夕
    ensemble.train_cycle(X_train, y_train, phase="evening")
    # 🌙 夜(夢)
    ensemble.sleep_cycle(steps=600)
    # 🌌 ミーム文化的交配
    ensemble.cultural_crossover(crossover_rate=0.2)
    # 🧬 新陳代謝
    ensemble.evolve()
    
    avg_cells = ensemble.get_average_cell_count()
    print(f"訓練完了 | 平均現存細胞数: {avg_cells}")
    # -----------------------------------------------------
    # 【フェーズB】推論&リアルタイム適応(5回連続パス)
    # -----------------------------------------------------
    print("\n--- [Phase B: Inference & Adaptation] 5回連続推論実験 ---")
    
    # このマクロサイクル内での推論ログ
    pass_logs = []
    
    for pass_idx in range(3):
        # 推論を実行しつつ、リアルタイムに自己更新(learning_gate=1.0)と代謝を実行
        y_pred = ensemble.predict(X_test, run_evolution=True)
        acc = accuracy_score(y_test, y_pred)
        
        avg_cells_infer = ensemble.get_average_cell_count()
        
        pass_logs.append({
            'Pass': pass_idx + 1,
            'Accuracy': acc,
            'Avg Cells': avg_cells_infer,
        })
        print(f"  Pass {pass_idx+1}/5 | 正解率: {acc:.4f} | 平均細胞数: {avg_cells_infer}")
        
    # --- 現在のサイクルのサマリー表示 ---
    print(f"\n【マクロサイクル {m_cycle+1} の実験結果】")
    print(f"  Pass  | Accuracy   | Avg Cells ")
    print("  -------------------------------")
    for log in pass_logs:
        print(f"   {log['Pass']:<4} | {log['Accuracy']:<10.4f} | {log['Avg Cells']:<10}")
print("\n=== すべての交互サイクルが完了しました ===")
# 🔄 6.5. 実験:同じテストデータを5回連続で流す(生態追跡)
# =========================================================
print("\n=== 実験: 同一テストセットを5回連続で推論(動的適応の観察) ===")
# 進化ログの格納用
pass_logs = []
# テストデータをシャffleせず、完全に同じ順序で2回流す
for pass_idx in range(2):
    # predict内部で学習(learning_gate=1.0)と、10サンプルごとのevolve()が走る
    y_pred = ensemble.predict(X_test, run_evolution=True)
    acc = accuracy_score(y_test, y_pred)
    
    # 現在のアンサンブルの平均細胞数を取得
    avg_cells = ensemble.get_average_cell_count()
    
    # グラフの総エッジ数(リンク数)を簡易計算
    total_edges = sum(len(m['graph']) for m in ensemble.models)
    
    pass_logs.append({
        'Pass': pass_idx + 1,
        'Accuracy': acc,
        'Avg Cells': avg_cells,
    })
    
    print(f"Pass {pass_idx+1}/5 | 正解率: {acc:.4f} | 平均細胞数: {avg_cells}")
# --- 結果のテーブル表示 ---
print("\n【実験結果サマリー】")
print(f"{'Pass':<6} | {'Accuracy':<10} | {'Avg Cells':<10}")
print("-" * 34)
for log in pass_logs:
    print(f"{log['Pass']:<6} | {log['Accuracy']:<10.4f} | {log['Avg Cells']:<10}")
# =========================================================
# 7. ダッシュボード:【 空想状態(Daydream) 】の可視化
# =========================================================
print("\n[Generating Daydream Images from the First Evolved Ecosystem...]")
fig, axes = plt.subplots(2, 5, figsize=(12, 5))
fig.suptitle("Daydreaming Mode (Top-Down Drive via Double Edges) - Model #1\nVisualizing the 'Ideals' of digits polished by Structural Control", fontsize=14)
first_model = ensemble.models[0]
for cls in range(10):
    _, daydream_image = live_vision_cycle(first_model['space'], first_model['graph'], 
                                          sensory_drive=0.0, intrinsic_drive=0.2, top_down_drive=1.0, 
                                          target_recall_class=cls, learning_gate=0.0)
    ax = axes[cls // 5, cls % 5]
    ax.imshow(daydream_image.reshape(8, 8), cmap="bone")
    ax.set_title(f"Concept: {cls}")
    ax.axis("off")
plt.tight_layout()
plt.show()

    
投稿日:20時間前
更新日:3時間前
数学の力で現場を変える アルゴリズムエンジニア募集 - Mathlog served by OptHub

この記事を高評価した人

高評価したユーザはいません

この記事に送られたバッジ

バッジはありません。

投稿者

Owl
1
210

コメント

他の人のコメント

コメントはありません。
読み込み中...
読み込み中